headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
  16    ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    context_server_store::ContextServerStore,
  20    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  21    git_store::GitStore,
  22    image_store::ImageId,
  23    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  24    project_settings::SettingsObserver,
  25    search::SearchQuery,
  26    task_store::TaskStore,
  27    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  28    worktree_store::WorktreeStore,
  29};
  30use rpc::{
  31    AnyProtoClient, TypedEnvelope,
  32    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  33};
  34
  35use settings::initial_server_settings_content;
  36use std::{
  37    num::NonZeroU64,
  38    path::{Path, PathBuf},
  39    sync::{
  40        Arc,
  41        atomic::{AtomicU64, AtomicUsize, Ordering},
  42    },
  43};
  44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  46use worktree::Worktree;
  47
  48pub struct HeadlessProject {
  49    pub fs: Arc<dyn Fs>,
  50    pub session: AnyProtoClient,
  51    pub worktree_store: Entity<WorktreeStore>,
  52    pub buffer_store: Entity<BufferStore>,
  53    pub lsp_store: Entity<LspStore>,
  54    pub task_store: Entity<TaskStore>,
  55    pub dap_store: Entity<DapStore>,
  56    pub breakpoint_store: Entity<BreakpointStore>,
  57    pub agent_server_store: Entity<AgentServerStore>,
  58    pub context_server_store: Entity<ContextServerStore>,
  59    pub settings_observer: Entity<SettingsObserver>,
  60    pub next_entry_id: Arc<AtomicUsize>,
  61    pub languages: Arc<LanguageRegistry>,
  62    pub extensions: Entity<HeadlessExtensionStore>,
  63    pub git_store: Entity<GitStore>,
  64    pub environment: Entity<ProjectEnvironment>,
  65    // Used mostly to keep alive the toolchain store for RPC handlers.
  66    // Local variant is used within LSP store, but that's a separate entity.
  67    pub _toolchain_store: Entity<ToolchainStore>,
  68}
  69
  70pub struct HeadlessAppState {
  71    pub session: AnyProtoClient,
  72    pub fs: Arc<dyn Fs>,
  73    pub http_client: Arc<dyn HttpClient>,
  74    pub node_runtime: NodeRuntime,
  75    pub languages: Arc<LanguageRegistry>,
  76    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  77}
  78
  79impl HeadlessProject {
  80    pub fn init(cx: &mut App) {
  81        settings::init(cx);
  82        log_store::init(true, cx);
  83    }
  84
  85    pub fn new(
  86        HeadlessAppState {
  87            session,
  88            fs,
  89            http_client,
  90            node_runtime,
  91            languages,
  92            extension_host_proxy: proxy,
  93        }: HeadlessAppState,
  94        init_worktree_trust: bool,
  95        cx: &mut Context<Self>,
  96    ) -> Self {
  97        debug_adapter_extension::init(proxy.clone(), cx);
  98        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
  99
 100        let worktree_store = cx.new(|cx| {
 101            let mut store = WorktreeStore::local(true, fs.clone());
 102            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 103            store
 104        });
 105
 106        if init_worktree_trust {
 107            project::trusted_worktrees::track_worktree_trust(
 108                worktree_store.clone(),
 109                None::<RemoteHostLocation>,
 110                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 111                None,
 112                cx,
 113            );
 114        }
 115
 116        let environment =
 117            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 118        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 119        let toolchain_store = cx.new(|cx| {
 120            ToolchainStore::local(
 121                languages.clone(),
 122                worktree_store.clone(),
 123                environment.clone(),
 124                manifest_tree.clone(),
 125                fs.clone(),
 126                cx,
 127            )
 128        });
 129
 130        let buffer_store = cx.new(|cx| {
 131            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 132            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 133            buffer_store
 134        });
 135
 136        let breakpoint_store = cx.new(|_| {
 137            let mut breakpoint_store =
 138                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 139            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 140
 141            breakpoint_store
 142        });
 143
 144        let dap_store = cx.new(|cx| {
 145            let mut dap_store = DapStore::new_local(
 146                http_client.clone(),
 147                node_runtime.clone(),
 148                fs.clone(),
 149                environment.clone(),
 150                toolchain_store.read(cx).as_language_toolchain_store(),
 151                worktree_store.clone(),
 152                breakpoint_store.clone(),
 153                true,
 154                cx,
 155            );
 156            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 157            dap_store
 158        });
 159
 160        let git_store = cx.new(|cx| {
 161            let mut store = GitStore::local(
 162                &worktree_store,
 163                buffer_store.clone(),
 164                environment.clone(),
 165                fs.clone(),
 166                cx,
 167            );
 168            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 169            store
 170        });
 171
 172        let prettier_store = cx.new(|cx| {
 173            PrettierStore::new(
 174                node_runtime.clone(),
 175                fs.clone(),
 176                languages.clone(),
 177                worktree_store.clone(),
 178                cx,
 179            )
 180        });
 181
 182        let task_store = cx.new(|cx| {
 183            let mut task_store = TaskStore::local(
 184                buffer_store.downgrade(),
 185                worktree_store.clone(),
 186                toolchain_store.read(cx).as_language_toolchain_store(),
 187                environment.clone(),
 188                cx,
 189            );
 190            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 191            task_store
 192        });
 193        let settings_observer = cx.new(|cx| {
 194            let mut observer = SettingsObserver::new_local(
 195                fs.clone(),
 196                worktree_store.clone(),
 197                task_store.clone(),
 198                cx,
 199            );
 200            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 201            observer
 202        });
 203
 204        let lsp_store = cx.new(|cx| {
 205            let mut lsp_store = LspStore::new_local(
 206                buffer_store.clone(),
 207                worktree_store.clone(),
 208                prettier_store.clone(),
 209                toolchain_store
 210                    .read(cx)
 211                    .as_local_store()
 212                    .expect("Toolchain store to be local")
 213                    .clone(),
 214                environment.clone(),
 215                manifest_tree,
 216                languages.clone(),
 217                http_client.clone(),
 218                fs.clone(),
 219                cx,
 220            );
 221            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 222            lsp_store
 223        });
 224
 225        let agent_server_store = cx.new(|cx| {
 226            let mut agent_server_store = AgentServerStore::local(
 227                node_runtime.clone(),
 228                fs.clone(),
 229                environment.clone(),
 230                http_client.clone(),
 231                cx,
 232            );
 233            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 234            agent_server_store
 235        });
 236
 237        let context_server_store = cx.new(|cx| {
 238            let mut context_server_store =
 239                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 240            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 241            context_server_store
 242        });
 243
 244        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 245        language_extension::init(
 246            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 247            proxy.clone(),
 248            languages.clone(),
 249        );
 250
 251        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 252            if let BufferStoreEvent::BufferAdded(buffer) = event {
 253                cx.subscribe(buffer, Self::on_buffer_event).detach();
 254            }
 255        })
 256        .detach();
 257
 258        let extensions = HeadlessExtensionStore::new(
 259            fs.clone(),
 260            http_client.clone(),
 261            paths::remote_extensions_dir().to_path_buf(),
 262            proxy,
 263            node_runtime,
 264            cx,
 265        );
 266
 267        // local_machine -> ssh handlers
 268        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 269        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 270        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 271        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 272        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 273        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 274        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 275        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 276        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 280
 281        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 282        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 283        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 284        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 285        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 286
 287        session.add_entity_request_handler(Self::handle_add_worktree);
 288        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 289
 290        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 291        session.add_entity_request_handler(Self::handle_open_new_buffer);
 292        session.add_entity_request_handler(Self::handle_find_search_candidates);
 293        session.add_entity_request_handler(Self::handle_open_server_settings);
 294        session.add_entity_request_handler(Self::handle_get_directory_environment);
 295        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 296        session.add_entity_request_handler(Self::handle_open_image_by_path);
 297        session.add_entity_request_handler(Self::handle_trust_worktrees);
 298        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 299
 300        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 301        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 302        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 303
 304        session.add_request_handler(
 305            extensions.downgrade(),
 306            HeadlessExtensionStore::handle_sync_extensions,
 307        );
 308        session.add_request_handler(
 309            extensions.downgrade(),
 310            HeadlessExtensionStore::handle_install_extension,
 311        );
 312
 313        BufferStore::init(&session);
 314        WorktreeStore::init(&session);
 315        SettingsObserver::init(&session);
 316        LspStore::init(&session);
 317        TaskStore::init(Some(&session));
 318        ToolchainStore::init(&session);
 319        DapStore::init(&session, cx);
 320        // todo(debugger): Re init breakpoint store when we set it up for collab
 321        BreakpointStore::init(&session);
 322        GitStore::init(&session);
 323        AgentServerStore::init_headless(&session);
 324        ContextServerStore::init_headless(&session);
 325
 326        HeadlessProject {
 327            next_entry_id: Default::default(),
 328            session,
 329            settings_observer,
 330            fs,
 331            worktree_store,
 332            buffer_store,
 333            lsp_store,
 334            task_store,
 335            dap_store,
 336            breakpoint_store,
 337            agent_server_store,
 338            context_server_store,
 339            languages,
 340            extensions,
 341            git_store,
 342            environment,
 343            _toolchain_store: toolchain_store,
 344        }
 345    }
 346
 347    fn on_buffer_event(
 348        &mut self,
 349        buffer: Entity<Buffer>,
 350        event: &BufferEvent,
 351        cx: &mut Context<Self>,
 352    ) {
 353        if let BufferEvent::Operation {
 354            operation,
 355            is_local: true,
 356        } = event
 357        {
 358            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 359                project_id: REMOTE_SERVER_PROJECT_ID,
 360                buffer_id: buffer.read(cx).remote_id().to_proto(),
 361                operations: vec![serialize_operation(operation)],
 362            }))
 363            .detach()
 364        }
 365    }
 366
 367    fn on_lsp_store_event(
 368        &mut self,
 369        lsp_store: Entity<LspStore>,
 370        event: &LspStoreEvent,
 371        cx: &mut Context<Self>,
 372    ) {
 373        match event {
 374            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 375                let log_store = cx
 376                    .try_global::<GlobalLogStore>()
 377                    .map(|lsp_logs| lsp_logs.0.clone());
 378                if let Some(log_store) = log_store {
 379                    log_store.update(cx, |log_store, cx| {
 380                        log_store.add_language_server(
 381                            LanguageServerKind::LocalSsh {
 382                                lsp_store: self.lsp_store.downgrade(),
 383                            },
 384                            *id,
 385                            Some(name.clone()),
 386                            *worktree_id,
 387                            lsp_store.read(cx).language_server_for_id(*id),
 388                            cx,
 389                        );
 390                    });
 391                }
 392            }
 393            LspStoreEvent::LanguageServerRemoved(id) => {
 394                let log_store = cx
 395                    .try_global::<GlobalLogStore>()
 396                    .map(|lsp_logs| lsp_logs.0.clone());
 397                if let Some(log_store) = log_store {
 398                    log_store.update(cx, |log_store, cx| {
 399                        log_store.remove_language_server(*id, cx);
 400                    });
 401                }
 402            }
 403            LspStoreEvent::LanguageServerUpdate {
 404                language_server_id,
 405                name,
 406                message,
 407            } => {
 408                self.session
 409                    .send(proto::UpdateLanguageServer {
 410                        project_id: REMOTE_SERVER_PROJECT_ID,
 411                        server_name: name.as_ref().map(|name| name.to_string()),
 412                        language_server_id: language_server_id.to_proto(),
 413                        variant: Some(message.clone()),
 414                    })
 415                    .log_err();
 416            }
 417            LspStoreEvent::Notification(message) => {
 418                self.session
 419                    .send(proto::Toast {
 420                        project_id: REMOTE_SERVER_PROJECT_ID,
 421                        notification_id: "lsp".to_string(),
 422                        message: message.clone(),
 423                    })
 424                    .log_err();
 425            }
 426            LspStoreEvent::LanguageServerPrompt(prompt) => {
 427                let request = self.session.request(proto::LanguageServerPromptRequest {
 428                    project_id: REMOTE_SERVER_PROJECT_ID,
 429                    actions: prompt
 430                        .actions
 431                        .iter()
 432                        .map(|action| action.title.to_string())
 433                        .collect(),
 434                    level: Some(prompt_to_proto(prompt)),
 435                    lsp_name: prompt.lsp_name.clone(),
 436                    message: prompt.message.clone(),
 437                });
 438                let prompt = prompt.clone();
 439                cx.background_spawn(async move {
 440                    let response = request.await?;
 441                    if let Some(action_response) = response.action_response {
 442                        prompt.respond(action_response as usize).await;
 443                    }
 444                    anyhow::Ok(())
 445                })
 446                .detach();
 447            }
 448            _ => {}
 449        }
 450    }
 451
 452    pub async fn handle_add_worktree(
 453        this: Entity<Self>,
 454        message: TypedEnvelope<proto::AddWorktree>,
 455        mut cx: AsyncApp,
 456    ) -> Result<proto::AddWorktreeResponse> {
 457        use client::ErrorCodeExt;
 458        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 459        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 460
 461        let canonicalized = match fs.canonicalize(&path).await {
 462            Ok(path) => path,
 463            Err(e) => {
 464                let mut parent = path
 465                    .parent()
 466                    .ok_or(e)
 467                    .with_context(|| format!("{path:?} does not exist"))?;
 468                if parent == Path::new("") {
 469                    parent = util::paths::home_dir();
 470                }
 471                let parent = fs.canonicalize(parent).await.map_err(|_| {
 472                    anyhow!(
 473                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 474                            .with_tag("path", path.to_string_lossy().as_ref())
 475                    )
 476                })?;
 477                if let Some(file_name) = path.file_name() {
 478                    parent.join(file_name)
 479                } else {
 480                    parent
 481                }
 482            }
 483        };
 484
 485        let worktree = this
 486            .read_with(&cx.clone(), |this, _| {
 487                Worktree::local(
 488                    Arc::from(canonicalized.as_path()),
 489                    message.payload.visible,
 490                    this.fs.clone(),
 491                    this.next_entry_id.clone(),
 492                    true,
 493                    &mut cx,
 494                )
 495            })
 496            .await?;
 497
 498        let response = this.read_with(&cx, |_, cx| {
 499            let worktree = worktree.read(cx);
 500            proto::AddWorktreeResponse {
 501                worktree_id: worktree.id().to_proto(),
 502                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 503            }
 504        });
 505
 506        // We spawn this asynchronously, so that we can send the response back
 507        // *before* `worktree_store.add()` can send out UpdateProject requests
 508        // to the client about the new worktree.
 509        //
 510        // That lets the client manage the reference/handles of the newly-added
 511        // worktree, before getting interrupted by an UpdateProject request.
 512        //
 513        // This fixes the problem of the client sending the AddWorktree request,
 514        // headless project sending out a project update, client receiving it
 515        // and immediately dropping the reference of the new client, causing it
 516        // to be dropped on the headless project, and the client only then
 517        // receiving a response to AddWorktree.
 518        cx.spawn(async move |cx| {
 519            this.update(cx, |this, cx| {
 520                this.worktree_store.update(cx, |worktree_store, cx| {
 521                    worktree_store.add(&worktree, cx);
 522                });
 523            });
 524        })
 525        .detach();
 526
 527        Ok(response)
 528    }
 529
 530    pub async fn handle_remove_worktree(
 531        this: Entity<Self>,
 532        envelope: TypedEnvelope<proto::RemoveWorktree>,
 533        mut cx: AsyncApp,
 534    ) -> Result<proto::Ack> {
 535        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 536        this.update(&mut cx, |this, cx| {
 537            this.worktree_store.update(cx, |worktree_store, cx| {
 538                worktree_store.remove_worktree(worktree_id, cx);
 539            });
 540        });
 541        Ok(proto::Ack {})
 542    }
 543
 544    pub async fn handle_open_buffer_by_path(
 545        this: Entity<Self>,
 546        message: TypedEnvelope<proto::OpenBufferByPath>,
 547        mut cx: AsyncApp,
 548    ) -> Result<proto::OpenBufferResponse> {
 549        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 550        let path = RelPath::from_proto(&message.payload.path)?;
 551        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 552            let buffer_store = this.buffer_store.clone();
 553            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 554                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 555            });
 556            (buffer_store, buffer)
 557        });
 558
 559        let buffer = buffer.await?;
 560        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 561        buffer_store.update(&mut cx, |buffer_store, cx| {
 562            buffer_store
 563                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 564                .detach_and_log_err(cx);
 565        });
 566
 567        Ok(proto::OpenBufferResponse {
 568            buffer_id: buffer_id.to_proto(),
 569        })
 570    }
 571
 572    pub async fn handle_open_image_by_path(
 573        this: Entity<Self>,
 574        message: TypedEnvelope<proto::OpenImageByPath>,
 575        mut cx: AsyncApp,
 576    ) -> Result<proto::OpenImageResponse> {
 577        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 578        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 579        let path = RelPath::from_proto(&message.payload.path)?;
 580        let project_id = message.payload.project_id;
 581        use proto::create_image_for_peer::Variant;
 582
 583        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 584            (this.worktree_store.clone(), this.session.clone())
 585        });
 586
 587        let worktree = worktree_store
 588            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 589            .context("worktree not found")?;
 590
 591        let load_task = worktree.update(&mut cx, |worktree, cx| {
 592            worktree.load_binary_file(path.as_ref(), cx)
 593        });
 594
 595        let loaded_file = load_task.await?;
 596        let content = loaded_file.content;
 597        let file = loaded_file.file;
 598
 599        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 600        let image_id =
 601            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 602
 603        let format = image::guess_format(&content)
 604            .map(|f| format!("{:?}", f).to_lowercase())
 605            .unwrap_or_else(|_| "unknown".to_string());
 606
 607        let state = proto::ImageState {
 608            id: image_id.to_proto(),
 609            file: Some(proto_file),
 610            content_size: content.len() as u64,
 611            format,
 612        };
 613
 614        session.send(proto::CreateImageForPeer {
 615            project_id,
 616            peer_id: Some(REMOTE_SERVER_PEER_ID),
 617            variant: Some(Variant::State(state)),
 618        })?;
 619
 620        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 621        for chunk in content.chunks(CHUNK_SIZE) {
 622            session.send(proto::CreateImageForPeer {
 623                project_id,
 624                peer_id: Some(REMOTE_SERVER_PEER_ID),
 625                variant: Some(Variant::Chunk(proto::ImageChunk {
 626                    image_id: image_id.to_proto(),
 627                    data: chunk.to_vec(),
 628                })),
 629            })?;
 630        }
 631
 632        Ok(proto::OpenImageResponse {
 633            image_id: image_id.to_proto(),
 634        })
 635    }
 636
 637    pub async fn handle_trust_worktrees(
 638        this: Entity<Self>,
 639        envelope: TypedEnvelope<proto::TrustWorktrees>,
 640        mut cx: AsyncApp,
 641    ) -> Result<proto::Ack> {
 642        let trusted_worktrees = cx
 643            .update(|cx| TrustedWorktrees::try_get_global(cx))
 644            .context("missing trusted worktrees")?;
 645        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 646        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 647            trusted_worktrees.trust(
 648                &worktree_store,
 649                envelope
 650                    .payload
 651                    .trusted_paths
 652                    .into_iter()
 653                    .filter_map(PathTrust::from_proto)
 654                    .collect(),
 655                cx,
 656            );
 657        });
 658        Ok(proto::Ack {})
 659    }
 660
 661    pub async fn handle_restrict_worktrees(
 662        this: Entity<Self>,
 663        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 664        mut cx: AsyncApp,
 665    ) -> Result<proto::Ack> {
 666        let trusted_worktrees = cx
 667            .update(|cx| TrustedWorktrees::try_get_global(cx))
 668            .context("missing trusted worktrees")?;
 669        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 670        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 671            let restricted_paths = envelope
 672                .payload
 673                .worktree_ids
 674                .into_iter()
 675                .map(WorktreeId::from_proto)
 676                .map(PathTrust::Worktree)
 677                .collect::<HashSet<_>>();
 678            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 679        });
 680        Ok(proto::Ack {})
 681    }
 682
 683    pub async fn handle_open_new_buffer(
 684        this: Entity<Self>,
 685        _message: TypedEnvelope<proto::OpenNewBuffer>,
 686        mut cx: AsyncApp,
 687    ) -> Result<proto::OpenBufferResponse> {
 688        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 689            let buffer_store = this.buffer_store.clone();
 690            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 691                buffer_store.create_buffer(None, true, cx)
 692            });
 693            (buffer_store, buffer)
 694        });
 695
 696        let buffer = buffer.await?;
 697        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 698        buffer_store.update(&mut cx, |buffer_store, cx| {
 699            buffer_store
 700                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 701                .detach_and_log_err(cx);
 702        });
 703
 704        Ok(proto::OpenBufferResponse {
 705            buffer_id: buffer_id.to_proto(),
 706        })
 707    }
 708
 709    async fn handle_toggle_lsp_logs(
 710        _: Entity<Self>,
 711        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 712        cx: AsyncApp,
 713    ) -> Result<()> {
 714        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 715        cx.update(|cx| {
 716            let log_store = cx
 717                .try_global::<GlobalLogStore>()
 718                .map(|global_log_store| global_log_store.0.clone())
 719                .context("lsp logs store is missing")?;
 720            let toggled_log_kind =
 721                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 722                    .context("invalid log type")?
 723                {
 724                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 725                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 726                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 727                };
 728            log_store.update(cx, |log_store, _| {
 729                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 730            });
 731            anyhow::Ok(())
 732        })?;
 733
 734        Ok(())
 735    }
 736
 737    async fn handle_open_server_settings(
 738        this: Entity<Self>,
 739        _: TypedEnvelope<proto::OpenServerSettings>,
 740        mut cx: AsyncApp,
 741    ) -> Result<proto::OpenBufferResponse> {
 742        let settings_path = paths::settings_file();
 743        let (worktree, path) = this
 744            .update(&mut cx, |this, cx| {
 745                this.worktree_store.update(cx, |worktree_store, cx| {
 746                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 747                })
 748            })
 749            .await?;
 750
 751        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 752            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 753                buffer_store.open_buffer(
 754                    ProjectPath {
 755                        worktree_id: worktree.read(cx).id(),
 756                        path: path,
 757                    },
 758                    cx,
 759                )
 760            });
 761
 762            (buffer, this.buffer_store.clone())
 763        });
 764
 765        let buffer = buffer.await?;
 766
 767        let buffer_id = cx.update(|cx| {
 768            if buffer.read(cx).is_empty() {
 769                buffer.update(cx, |buffer, cx| {
 770                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 771                });
 772            }
 773
 774            let buffer_id = buffer.read(cx).remote_id();
 775
 776            buffer_store.update(cx, |buffer_store, cx| {
 777                buffer_store
 778                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 779                    .detach_and_log_err(cx);
 780            });
 781
 782            buffer_id
 783        });
 784
 785        Ok(proto::OpenBufferResponse {
 786            buffer_id: buffer_id.to_proto(),
 787        })
 788    }
 789
 790    async fn handle_find_search_candidates(
 791        this: Entity<Self>,
 792        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 793        mut cx: AsyncApp,
 794    ) -> Result<proto::Ack> {
 795        use futures::stream::StreamExt as _;
 796
 797        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 798        let message = envelope.payload;
 799        let query = SearchQuery::from_proto(
 800            message.query.context("missing query field")?,
 801            PathStyle::local(),
 802        )?;
 803
 804        let project_id = message.project_id;
 805        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 806        let handle = message.handle;
 807        let _buffer_store = buffer_store.clone();
 808        let client = this.read_with(&cx, |this, _| this.session.clone());
 809        let task = cx.spawn(async move |cx| {
 810            let results = this.update(cx, |this, cx| {
 811                project::Search::local(
 812                    this.fs.clone(),
 813                    this.buffer_store.clone(),
 814                    this.worktree_store.clone(),
 815                    message.limit as _,
 816                    cx,
 817                )
 818                .into_handle(query, cx)
 819                .matching_buffers(cx)
 820            });
 821            let (batcher, batches) =
 822                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 823            let mut new_matches = Box::pin(results.rx);
 824
 825            let sender_task = cx.background_executor().spawn({
 826                let client = client.clone();
 827                async move {
 828                    let mut batches = std::pin::pin!(batches);
 829                    while let Some(buffer_ids) = batches.next().await {
 830                        client
 831                            .request(proto::FindSearchCandidatesChunk {
 832                                handle,
 833                                peer_id: Some(peer_id),
 834                                project_id,
 835                                variant: Some(
 836                                    proto::find_search_candidates_chunk::Variant::Matches(
 837                                        proto::FindSearchCandidatesMatches { buffer_ids },
 838                                    ),
 839                                ),
 840                            })
 841                            .await?;
 842                    }
 843                    anyhow::Ok(())
 844                }
 845            });
 846
 847            while let Some(buffer) = new_matches.next().await {
 848                let _ = buffer_store
 849                    .update(cx, |this, cx| {
 850                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 851                    })
 852                    .await;
 853                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 854                batcher.push(buffer_id).await;
 855            }
 856            batcher.flush().await;
 857
 858            sender_task.await?;
 859
 860            client
 861                .request(proto::FindSearchCandidatesChunk {
 862                    handle,
 863                    peer_id: Some(peer_id),
 864                    project_id,
 865                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 866                        proto::FindSearchCandidatesDone {},
 867                    )),
 868                })
 869                .await?;
 870            anyhow::Ok(())
 871        });
 872        _buffer_store.update(&mut cx, |this, _| {
 873            this.register_ongoing_project_search((peer_id, handle), task);
 874        });
 875
 876        Ok(proto::Ack {})
 877    }
 878
 879    // Goes from client to host.
 880    async fn handle_find_search_candidates_cancel(
 881        this: Entity<Self>,
 882        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 883        mut cx: AsyncApp,
 884    ) -> Result<()> {
 885        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 886        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 887    }
 888
 889    async fn handle_list_remote_directory(
 890        this: Entity<Self>,
 891        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
 892        cx: AsyncApp,
 893    ) -> Result<proto::ListRemoteDirectoryResponse> {
 894        use smol::stream::StreamExt;
 895        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 896        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 897        let check_info = envelope
 898            .payload
 899            .config
 900            .as_ref()
 901            .is_some_and(|config| config.is_dir);
 902
 903        let mut entries = Vec::new();
 904        let mut entry_info = Vec::new();
 905        let mut response = fs.read_dir(&expanded).await?;
 906        while let Some(path) = response.next().await {
 907            let path = path?;
 908            if let Some(file_name) = path.file_name() {
 909                entries.push(file_name.to_string_lossy().into_owned());
 910                if check_info {
 911                    let is_dir = fs.is_dir(&path).await;
 912                    entry_info.push(proto::EntryInfo { is_dir });
 913                }
 914            }
 915        }
 916        Ok(proto::ListRemoteDirectoryResponse {
 917            entries,
 918            entry_info,
 919        })
 920    }
 921
 922    async fn handle_get_path_metadata(
 923        this: Entity<Self>,
 924        envelope: TypedEnvelope<proto::GetPathMetadata>,
 925        cx: AsyncApp,
 926    ) -> Result<proto::GetPathMetadataResponse> {
 927        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 928        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 929
 930        let metadata = fs.metadata(&expanded).await?;
 931        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
 932
 933        Ok(proto::GetPathMetadataResponse {
 934            exists: metadata.is_some(),
 935            is_dir,
 936            path: expanded.to_string_lossy().into_owned(),
 937        })
 938    }
 939
 940    async fn handle_shutdown_remote_server(
 941        _this: Entity<Self>,
 942        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
 943        cx: AsyncApp,
 944    ) -> Result<proto::Ack> {
 945        cx.spawn(async move |cx| {
 946            cx.update(|cx| {
 947                // TODO: This is a hack, because in a headless project, shutdown isn't executed
 948                // when calling quit, but it should be.
 949                cx.shutdown();
 950                cx.quit();
 951            })
 952        })
 953        .detach();
 954
 955        Ok(proto::Ack {})
 956    }
 957
 958    pub async fn handle_ping(
 959        _this: Entity<Self>,
 960        _envelope: TypedEnvelope<proto::Ping>,
 961        _cx: AsyncApp,
 962    ) -> Result<proto::Ack> {
 963        log::debug!("Received ping from client");
 964        Ok(proto::Ack {})
 965    }
 966
 967    async fn handle_get_processes(
 968        _this: Entity<Self>,
 969        _envelope: TypedEnvelope<proto::GetProcesses>,
 970        _cx: AsyncApp,
 971    ) -> Result<proto::GetProcessesResponse> {
 972        let mut processes = Vec::new();
 973        let refresh_kind = RefreshKind::nothing().with_processes(
 974            ProcessRefreshKind::nothing()
 975                .without_tasks()
 976                .with_cmd(UpdateKind::Always),
 977        );
 978
 979        for process in System::new_with_specifics(refresh_kind)
 980            .processes()
 981            .values()
 982        {
 983            let name = process.name().to_string_lossy().into_owned();
 984            let command = process
 985                .cmd()
 986                .iter()
 987                .map(|s| s.to_string_lossy().into_owned())
 988                .collect::<Vec<_>>();
 989
 990            processes.push(proto::ProcessInfo {
 991                pid: process.pid().as_u32(),
 992                name,
 993                command,
 994            });
 995        }
 996
 997        processes.sort_by_key(|p| p.name.clone());
 998
 999        Ok(proto::GetProcessesResponse { processes })
1000    }
1001
1002    async fn handle_get_directory_environment(
1003        this: Entity<Self>,
1004        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1005        mut cx: AsyncApp,
1006    ) -> Result<proto::DirectoryEnvironment> {
1007        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1008        let directory = PathBuf::from(envelope.payload.directory);
1009        let environment = this
1010            .update(&mut cx, |this, cx| {
1011                this.environment.update(cx, |environment, cx| {
1012                    environment.local_directory_environment(&shell, directory.into(), cx)
1013                })
1014            })
1015            .await
1016            .context("failed to get directory environment")?
1017            .into_iter()
1018            .collect();
1019        Ok(proto::DirectoryEnvironment { environment })
1020    }
1021}
1022
1023fn prompt_to_proto(
1024    prompt: &project::LanguageServerPromptRequest,
1025) -> proto::language_server_prompt_request::Level {
1026    match prompt.level {
1027        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1028            proto::language_server_prompt_request::Info {},
1029        ),
1030        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1031            proto::language_server_prompt_request::Warning {},
1032        ),
1033        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1034            proto::language_server_prompt_request::Critical {},
1035        ),
1036    }
1037}