headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
  16    ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    context_server_store::ContextServerStore,
  20    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  21    git_store::GitStore,
  22    image_store::ImageId,
  23    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  24    project_settings::SettingsObserver,
  25    search::SearchQuery,
  26    task_store::TaskStore,
  27    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  28    worktree_store::{WorktreeIdCounter, WorktreeStore},
  29};
  30use rpc::{
  31    AnyProtoClient, TypedEnvelope,
  32    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  33};
  34
  35use settings::initial_server_settings_content;
  36use std::{
  37    num::NonZeroU64,
  38    path::{Path, PathBuf},
  39    sync::{
  40        Arc,
  41        atomic::{AtomicU64, AtomicUsize, Ordering},
  42    },
  43};
  44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  46use worktree::Worktree;
  47
  48pub struct HeadlessProject {
  49    pub fs: Arc<dyn Fs>,
  50    pub session: AnyProtoClient,
  51    pub worktree_store: Entity<WorktreeStore>,
  52    pub buffer_store: Entity<BufferStore>,
  53    pub lsp_store: Entity<LspStore>,
  54    pub task_store: Entity<TaskStore>,
  55    pub dap_store: Entity<DapStore>,
  56    pub breakpoint_store: Entity<BreakpointStore>,
  57    pub agent_server_store: Entity<AgentServerStore>,
  58    pub context_server_store: Entity<ContextServerStore>,
  59    pub settings_observer: Entity<SettingsObserver>,
  60    pub next_entry_id: Arc<AtomicUsize>,
  61    pub languages: Arc<LanguageRegistry>,
  62    pub extensions: Entity<HeadlessExtensionStore>,
  63    pub git_store: Entity<GitStore>,
  64    pub environment: Entity<ProjectEnvironment>,
  65    // Used mostly to keep alive the toolchain store for RPC handlers.
  66    // Local variant is used within LSP store, but that's a separate entity.
  67    pub _toolchain_store: Entity<ToolchainStore>,
  68}
  69
  70pub struct HeadlessAppState {
  71    pub session: AnyProtoClient,
  72    pub fs: Arc<dyn Fs>,
  73    pub http_client: Arc<dyn HttpClient>,
  74    pub node_runtime: NodeRuntime,
  75    pub languages: Arc<LanguageRegistry>,
  76    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  77}
  78
  79impl HeadlessProject {
  80    pub fn init(cx: &mut App) {
  81        settings::init(cx);
  82        log_store::init(true, cx);
  83    }
  84
  85    pub fn new(
  86        HeadlessAppState {
  87            session,
  88            fs,
  89            http_client,
  90            node_runtime,
  91            languages,
  92            extension_host_proxy: proxy,
  93        }: HeadlessAppState,
  94        init_worktree_trust: bool,
  95        cx: &mut Context<Self>,
  96    ) -> Self {
  97        debug_adapter_extension::init(proxy.clone(), cx);
  98        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
  99
 100        let worktree_store = cx.new(|cx| {
 101            let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
 102            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 103            store
 104        });
 105
 106        if init_worktree_trust {
 107            project::trusted_worktrees::track_worktree_trust(
 108                worktree_store.clone(),
 109                None::<RemoteHostLocation>,
 110                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 111                None,
 112                cx,
 113            );
 114        }
 115
 116        let environment =
 117            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 118        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 119        let toolchain_store = cx.new(|cx| {
 120            ToolchainStore::local(
 121                languages.clone(),
 122                worktree_store.clone(),
 123                environment.clone(),
 124                manifest_tree.clone(),
 125                fs.clone(),
 126                cx,
 127            )
 128        });
 129
 130        let buffer_store = cx.new(|cx| {
 131            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 132            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 133            buffer_store
 134        });
 135
 136        let breakpoint_store = cx.new(|_| {
 137            let mut breakpoint_store =
 138                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 139            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 140
 141            breakpoint_store
 142        });
 143
 144        let dap_store = cx.new(|cx| {
 145            let mut dap_store = DapStore::new_local(
 146                http_client.clone(),
 147                node_runtime.clone(),
 148                fs.clone(),
 149                environment.clone(),
 150                toolchain_store.read(cx).as_language_toolchain_store(),
 151                worktree_store.clone(),
 152                breakpoint_store.clone(),
 153                true,
 154                cx,
 155            );
 156            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 157            dap_store
 158        });
 159
 160        let git_store = cx.new(|cx| {
 161            let mut store = GitStore::local(
 162                &worktree_store,
 163                buffer_store.clone(),
 164                environment.clone(),
 165                fs.clone(),
 166                cx,
 167            );
 168            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 169            store
 170        });
 171
 172        let prettier_store = cx.new(|cx| {
 173            PrettierStore::new(
 174                node_runtime.clone(),
 175                fs.clone(),
 176                languages.clone(),
 177                worktree_store.clone(),
 178                cx,
 179            )
 180        });
 181
 182        let task_store = cx.new(|cx| {
 183            let mut task_store = TaskStore::local(
 184                buffer_store.downgrade(),
 185                worktree_store.clone(),
 186                toolchain_store.read(cx).as_language_toolchain_store(),
 187                environment.clone(),
 188                cx,
 189            );
 190            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 191            task_store
 192        });
 193        let settings_observer = cx.new(|cx| {
 194            let mut observer = SettingsObserver::new_local(
 195                fs.clone(),
 196                worktree_store.clone(),
 197                task_store.clone(),
 198                true,
 199                cx,
 200            );
 201            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 202            observer
 203        });
 204
 205        let lsp_store = cx.new(|cx| {
 206            let mut lsp_store = LspStore::new_local(
 207                buffer_store.clone(),
 208                worktree_store.clone(),
 209                prettier_store.clone(),
 210                toolchain_store
 211                    .read(cx)
 212                    .as_local_store()
 213                    .expect("Toolchain store to be local")
 214                    .clone(),
 215                environment.clone(),
 216                manifest_tree,
 217                languages.clone(),
 218                http_client.clone(),
 219                fs.clone(),
 220                cx,
 221            );
 222            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 223            lsp_store
 224        });
 225
 226        let agent_server_store = cx.new(|cx| {
 227            let mut agent_server_store = AgentServerStore::local(
 228                node_runtime.clone(),
 229                fs.clone(),
 230                environment.clone(),
 231                http_client.clone(),
 232                cx,
 233            );
 234            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 235            agent_server_store
 236        });
 237
 238        let context_server_store = cx.new(|cx| {
 239            let mut context_server_store =
 240                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 241            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 242            context_server_store
 243        });
 244
 245        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 246        language_extension::init(
 247            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 248            proxy.clone(),
 249            languages.clone(),
 250        );
 251
 252        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 253            if let BufferStoreEvent::BufferAdded(buffer) = event {
 254                cx.subscribe(buffer, Self::on_buffer_event).detach();
 255            }
 256        })
 257        .detach();
 258
 259        let extensions = HeadlessExtensionStore::new(
 260            fs.clone(),
 261            http_client.clone(),
 262            paths::remote_extensions_dir().to_path_buf(),
 263            proxy,
 264            node_runtime,
 265            cx,
 266        );
 267
 268        // local_machine -> ssh handlers
 269        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 270        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 271        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 272        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 273        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 274        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 275        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 276        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 280        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 281
 282        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 283        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 284        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 285        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 286        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 287
 288        session.add_entity_request_handler(Self::handle_add_worktree);
 289        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 290
 291        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 292        session.add_entity_request_handler(Self::handle_open_new_buffer);
 293        session.add_entity_request_handler(Self::handle_find_search_candidates);
 294        session.add_entity_request_handler(Self::handle_open_server_settings);
 295        session.add_entity_request_handler(Self::handle_get_directory_environment);
 296        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 297        session.add_entity_request_handler(Self::handle_open_image_by_path);
 298        session.add_entity_request_handler(Self::handle_trust_worktrees);
 299        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 300
 301        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 302        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 303        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 304
 305        session.add_request_handler(
 306            extensions.downgrade(),
 307            HeadlessExtensionStore::handle_sync_extensions,
 308        );
 309        session.add_request_handler(
 310            extensions.downgrade(),
 311            HeadlessExtensionStore::handle_install_extension,
 312        );
 313
 314        BufferStore::init(&session);
 315        WorktreeStore::init(&session);
 316        SettingsObserver::init(&session);
 317        LspStore::init(&session);
 318        TaskStore::init(Some(&session));
 319        ToolchainStore::init(&session);
 320        DapStore::init(&session, cx);
 321        // todo(debugger): Re init breakpoint store when we set it up for collab
 322        BreakpointStore::init(&session);
 323        GitStore::init(&session);
 324        AgentServerStore::init_headless(&session);
 325        ContextServerStore::init_headless(&session);
 326
 327        HeadlessProject {
 328            next_entry_id: Default::default(),
 329            session,
 330            settings_observer,
 331            fs,
 332            worktree_store,
 333            buffer_store,
 334            lsp_store,
 335            task_store,
 336            dap_store,
 337            breakpoint_store,
 338            agent_server_store,
 339            context_server_store,
 340            languages,
 341            extensions,
 342            git_store,
 343            environment,
 344            _toolchain_store: toolchain_store,
 345        }
 346    }
 347
 348    fn on_buffer_event(
 349        &mut self,
 350        buffer: Entity<Buffer>,
 351        event: &BufferEvent,
 352        cx: &mut Context<Self>,
 353    ) {
 354        if let BufferEvent::Operation {
 355            operation,
 356            is_local: true,
 357        } = event
 358        {
 359            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 360                project_id: REMOTE_SERVER_PROJECT_ID,
 361                buffer_id: buffer.read(cx).remote_id().to_proto(),
 362                operations: vec![serialize_operation(operation)],
 363            }))
 364            .detach()
 365        }
 366    }
 367
 368    fn on_lsp_store_event(
 369        &mut self,
 370        lsp_store: Entity<LspStore>,
 371        event: &LspStoreEvent,
 372        cx: &mut Context<Self>,
 373    ) {
 374        match event {
 375            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 376                let log_store = cx
 377                    .try_global::<GlobalLogStore>()
 378                    .map(|lsp_logs| lsp_logs.0.clone());
 379                if let Some(log_store) = log_store {
 380                    log_store.update(cx, |log_store, cx| {
 381                        log_store.add_language_server(
 382                            LanguageServerKind::LocalSsh {
 383                                lsp_store: self.lsp_store.downgrade(),
 384                            },
 385                            *id,
 386                            Some(name.clone()),
 387                            *worktree_id,
 388                            lsp_store.read(cx).language_server_for_id(*id),
 389                            cx,
 390                        );
 391                    });
 392                }
 393            }
 394            LspStoreEvent::LanguageServerRemoved(id) => {
 395                let log_store = cx
 396                    .try_global::<GlobalLogStore>()
 397                    .map(|lsp_logs| lsp_logs.0.clone());
 398                if let Some(log_store) = log_store {
 399                    log_store.update(cx, |log_store, cx| {
 400                        log_store.remove_language_server(*id, cx);
 401                    });
 402                }
 403            }
 404            LspStoreEvent::LanguageServerUpdate {
 405                language_server_id,
 406                name,
 407                message,
 408            } => {
 409                self.session
 410                    .send(proto::UpdateLanguageServer {
 411                        project_id: REMOTE_SERVER_PROJECT_ID,
 412                        server_name: name.as_ref().map(|name| name.to_string()),
 413                        language_server_id: language_server_id.to_proto(),
 414                        variant: Some(message.clone()),
 415                    })
 416                    .log_err();
 417            }
 418            LspStoreEvent::Notification(message) => {
 419                self.session
 420                    .send(proto::Toast {
 421                        project_id: REMOTE_SERVER_PROJECT_ID,
 422                        notification_id: "lsp".to_string(),
 423                        message: message.clone(),
 424                    })
 425                    .log_err();
 426            }
 427            LspStoreEvent::LanguageServerPrompt(prompt) => {
 428                let request = self.session.request(proto::LanguageServerPromptRequest {
 429                    project_id: REMOTE_SERVER_PROJECT_ID,
 430                    actions: prompt
 431                        .actions
 432                        .iter()
 433                        .map(|action| action.title.to_string())
 434                        .collect(),
 435                    level: Some(prompt_to_proto(prompt)),
 436                    lsp_name: prompt.lsp_name.clone(),
 437                    message: prompt.message.clone(),
 438                });
 439                let prompt = prompt.clone();
 440                cx.background_spawn(async move {
 441                    let response = request.await?;
 442                    if let Some(action_response) = response.action_response {
 443                        prompt.respond(action_response as usize).await;
 444                    }
 445                    anyhow::Ok(())
 446                })
 447                .detach();
 448            }
 449            _ => {}
 450        }
 451    }
 452
 453    pub async fn handle_add_worktree(
 454        this: Entity<Self>,
 455        message: TypedEnvelope<proto::AddWorktree>,
 456        mut cx: AsyncApp,
 457    ) -> Result<proto::AddWorktreeResponse> {
 458        use client::ErrorCodeExt;
 459        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 460        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 461
 462        let canonicalized = match fs.canonicalize(&path).await {
 463            Ok(path) => path,
 464            Err(e) => {
 465                let mut parent = path
 466                    .parent()
 467                    .ok_or(e)
 468                    .with_context(|| format!("{path:?} does not exist"))?;
 469                if parent == Path::new("") {
 470                    parent = util::paths::home_dir();
 471                }
 472                let parent = fs.canonicalize(parent).await.map_err(|_| {
 473                    anyhow!(
 474                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 475                            .with_tag("path", path.to_string_lossy().as_ref())
 476                    )
 477                })?;
 478                if let Some(file_name) = path.file_name() {
 479                    parent.join(file_name)
 480                } else {
 481                    parent
 482                }
 483            }
 484        };
 485        let next_worktree_id = this
 486            .update(&mut cx, |this, cx| {
 487                this.worktree_store
 488                    .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
 489            })
 490            .await?;
 491        let worktree = this
 492            .read_with(&cx.clone(), |this, _| {
 493                Worktree::local(
 494                    Arc::from(canonicalized.as_path()),
 495                    message.payload.visible,
 496                    this.fs.clone(),
 497                    this.next_entry_id.clone(),
 498                    true,
 499                    next_worktree_id,
 500                    &mut cx,
 501                )
 502            })
 503            .await?;
 504
 505        let response = this.read_with(&cx, |_, cx| {
 506            let worktree = worktree.read(cx);
 507            proto::AddWorktreeResponse {
 508                worktree_id: worktree.id().to_proto(),
 509                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 510            }
 511        });
 512
 513        // We spawn this asynchronously, so that we can send the response back
 514        // *before* `worktree_store.add()` can send out UpdateProject requests
 515        // to the client about the new worktree.
 516        //
 517        // That lets the client manage the reference/handles of the newly-added
 518        // worktree, before getting interrupted by an UpdateProject request.
 519        //
 520        // This fixes the problem of the client sending the AddWorktree request,
 521        // headless project sending out a project update, client receiving it
 522        // and immediately dropping the reference of the new client, causing it
 523        // to be dropped on the headless project, and the client only then
 524        // receiving a response to AddWorktree.
 525        cx.spawn(async move |cx| {
 526            this.update(cx, |this, cx| {
 527                this.worktree_store.update(cx, |worktree_store, cx| {
 528                    worktree_store.add(&worktree, cx);
 529                });
 530            });
 531        })
 532        .detach();
 533
 534        Ok(response)
 535    }
 536
 537    pub async fn handle_remove_worktree(
 538        this: Entity<Self>,
 539        envelope: TypedEnvelope<proto::RemoveWorktree>,
 540        mut cx: AsyncApp,
 541    ) -> Result<proto::Ack> {
 542        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 543        this.update(&mut cx, |this, cx| {
 544            this.worktree_store.update(cx, |worktree_store, cx| {
 545                worktree_store.remove_worktree(worktree_id, cx);
 546            });
 547        });
 548        Ok(proto::Ack {})
 549    }
 550
 551    pub async fn handle_open_buffer_by_path(
 552        this: Entity<Self>,
 553        message: TypedEnvelope<proto::OpenBufferByPath>,
 554        mut cx: AsyncApp,
 555    ) -> Result<proto::OpenBufferResponse> {
 556        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 557        let path = RelPath::from_proto(&message.payload.path)?;
 558        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 559            let buffer_store = this.buffer_store.clone();
 560            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 561                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 562            });
 563            (buffer_store, buffer)
 564        });
 565
 566        let buffer = buffer.await?;
 567        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 568        buffer_store.update(&mut cx, |buffer_store, cx| {
 569            buffer_store
 570                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 571                .detach_and_log_err(cx);
 572        });
 573
 574        Ok(proto::OpenBufferResponse {
 575            buffer_id: buffer_id.to_proto(),
 576        })
 577    }
 578
 579    pub async fn handle_open_image_by_path(
 580        this: Entity<Self>,
 581        message: TypedEnvelope<proto::OpenImageByPath>,
 582        mut cx: AsyncApp,
 583    ) -> Result<proto::OpenImageResponse> {
 584        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 585        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 586        let path = RelPath::from_proto(&message.payload.path)?;
 587        let project_id = message.payload.project_id;
 588        use proto::create_image_for_peer::Variant;
 589
 590        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 591            (this.worktree_store.clone(), this.session.clone())
 592        });
 593
 594        let worktree = worktree_store
 595            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 596            .context("worktree not found")?;
 597
 598        let load_task = worktree.update(&mut cx, |worktree, cx| {
 599            worktree.load_binary_file(path.as_ref(), cx)
 600        });
 601
 602        let loaded_file = load_task.await?;
 603        let content = loaded_file.content;
 604        let file = loaded_file.file;
 605
 606        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 607        let image_id =
 608            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 609
 610        let format = image::guess_format(&content)
 611            .map(|f| format!("{:?}", f).to_lowercase())
 612            .unwrap_or_else(|_| "unknown".to_string());
 613
 614        let state = proto::ImageState {
 615            id: image_id.to_proto(),
 616            file: Some(proto_file),
 617            content_size: content.len() as u64,
 618            format,
 619        };
 620
 621        session.send(proto::CreateImageForPeer {
 622            project_id,
 623            peer_id: Some(REMOTE_SERVER_PEER_ID),
 624            variant: Some(Variant::State(state)),
 625        })?;
 626
 627        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 628        for chunk in content.chunks(CHUNK_SIZE) {
 629            session.send(proto::CreateImageForPeer {
 630                project_id,
 631                peer_id: Some(REMOTE_SERVER_PEER_ID),
 632                variant: Some(Variant::Chunk(proto::ImageChunk {
 633                    image_id: image_id.to_proto(),
 634                    data: chunk.to_vec(),
 635                })),
 636            })?;
 637        }
 638
 639        Ok(proto::OpenImageResponse {
 640            image_id: image_id.to_proto(),
 641        })
 642    }
 643
 644    pub async fn handle_trust_worktrees(
 645        this: Entity<Self>,
 646        envelope: TypedEnvelope<proto::TrustWorktrees>,
 647        mut cx: AsyncApp,
 648    ) -> Result<proto::Ack> {
 649        let trusted_worktrees = cx
 650            .update(|cx| TrustedWorktrees::try_get_global(cx))
 651            .context("missing trusted worktrees")?;
 652        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 653        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 654            trusted_worktrees.trust(
 655                &worktree_store,
 656                envelope
 657                    .payload
 658                    .trusted_paths
 659                    .into_iter()
 660                    .filter_map(PathTrust::from_proto)
 661                    .collect(),
 662                cx,
 663            );
 664        });
 665        Ok(proto::Ack {})
 666    }
 667
 668    pub async fn handle_restrict_worktrees(
 669        this: Entity<Self>,
 670        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 671        mut cx: AsyncApp,
 672    ) -> Result<proto::Ack> {
 673        let trusted_worktrees = cx
 674            .update(|cx| TrustedWorktrees::try_get_global(cx))
 675            .context("missing trusted worktrees")?;
 676        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 677        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 678            let restricted_paths = envelope
 679                .payload
 680                .worktree_ids
 681                .into_iter()
 682                .map(WorktreeId::from_proto)
 683                .map(PathTrust::Worktree)
 684                .collect::<HashSet<_>>();
 685            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 686        });
 687        Ok(proto::Ack {})
 688    }
 689
 690    pub async fn handle_open_new_buffer(
 691        this: Entity<Self>,
 692        _message: TypedEnvelope<proto::OpenNewBuffer>,
 693        mut cx: AsyncApp,
 694    ) -> Result<proto::OpenBufferResponse> {
 695        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 696            let buffer_store = this.buffer_store.clone();
 697            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 698                buffer_store.create_buffer(None, true, cx)
 699            });
 700            (buffer_store, buffer)
 701        });
 702
 703        let buffer = buffer.await?;
 704        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 705        buffer_store.update(&mut cx, |buffer_store, cx| {
 706            buffer_store
 707                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 708                .detach_and_log_err(cx);
 709        });
 710
 711        Ok(proto::OpenBufferResponse {
 712            buffer_id: buffer_id.to_proto(),
 713        })
 714    }
 715
 716    async fn handle_toggle_lsp_logs(
 717        _: Entity<Self>,
 718        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 719        cx: AsyncApp,
 720    ) -> Result<()> {
 721        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 722        cx.update(|cx| {
 723            let log_store = cx
 724                .try_global::<GlobalLogStore>()
 725                .map(|global_log_store| global_log_store.0.clone())
 726                .context("lsp logs store is missing")?;
 727            let toggled_log_kind =
 728                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 729                    .context("invalid log type")?
 730                {
 731                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 732                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 733                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 734                };
 735            log_store.update(cx, |log_store, _| {
 736                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 737            });
 738            anyhow::Ok(())
 739        })?;
 740
 741        Ok(())
 742    }
 743
 744    async fn handle_open_server_settings(
 745        this: Entity<Self>,
 746        _: TypedEnvelope<proto::OpenServerSettings>,
 747        mut cx: AsyncApp,
 748    ) -> Result<proto::OpenBufferResponse> {
 749        let settings_path = paths::settings_file();
 750        let (worktree, path) = this
 751            .update(&mut cx, |this, cx| {
 752                this.worktree_store.update(cx, |worktree_store, cx| {
 753                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 754                })
 755            })
 756            .await?;
 757
 758        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 759            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 760                buffer_store.open_buffer(
 761                    ProjectPath {
 762                        worktree_id: worktree.read(cx).id(),
 763                        path,
 764                    },
 765                    cx,
 766                )
 767            });
 768
 769            (buffer, this.buffer_store.clone())
 770        });
 771
 772        let buffer = buffer.await?;
 773
 774        let buffer_id = cx.update(|cx| {
 775            if buffer.read(cx).is_empty() {
 776                buffer.update(cx, |buffer, cx| {
 777                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 778                });
 779            }
 780
 781            let buffer_id = buffer.read(cx).remote_id();
 782
 783            buffer_store.update(cx, |buffer_store, cx| {
 784                buffer_store
 785                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 786                    .detach_and_log_err(cx);
 787            });
 788
 789            buffer_id
 790        });
 791
 792        Ok(proto::OpenBufferResponse {
 793            buffer_id: buffer_id.to_proto(),
 794        })
 795    }
 796
 797    async fn handle_find_search_candidates(
 798        this: Entity<Self>,
 799        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 800        mut cx: AsyncApp,
 801    ) -> Result<proto::Ack> {
 802        use futures::stream::StreamExt as _;
 803
 804        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 805        let message = envelope.payload;
 806        let query = SearchQuery::from_proto(
 807            message.query.context("missing query field")?,
 808            PathStyle::local(),
 809        )?;
 810
 811        let project_id = message.project_id;
 812        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 813        let handle = message.handle;
 814        let _buffer_store = buffer_store.clone();
 815        let client = this.read_with(&cx, |this, _| this.session.clone());
 816        let task = cx.spawn(async move |cx| {
 817            let results = this.update(cx, |this, cx| {
 818                project::Search::local(
 819                    this.fs.clone(),
 820                    this.buffer_store.clone(),
 821                    this.worktree_store.clone(),
 822                    message.limit as _,
 823                    cx,
 824                )
 825                .into_handle(query, cx)
 826                .matching_buffers(cx)
 827            });
 828            let (batcher, batches) =
 829                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 830            let mut new_matches = Box::pin(results.rx);
 831
 832            let sender_task = cx.background_executor().spawn({
 833                let client = client.clone();
 834                async move {
 835                    let mut batches = std::pin::pin!(batches);
 836                    while let Some(buffer_ids) = batches.next().await {
 837                        client
 838                            .request(proto::FindSearchCandidatesChunk {
 839                                handle,
 840                                peer_id: Some(peer_id),
 841                                project_id,
 842                                variant: Some(
 843                                    proto::find_search_candidates_chunk::Variant::Matches(
 844                                        proto::FindSearchCandidatesMatches { buffer_ids },
 845                                    ),
 846                                ),
 847                            })
 848                            .await?;
 849                    }
 850                    anyhow::Ok(())
 851                }
 852            });
 853
 854            while let Some(buffer) = new_matches.next().await {
 855                let _ = buffer_store
 856                    .update(cx, |this, cx| {
 857                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 858                    })
 859                    .await;
 860                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 861                batcher.push(buffer_id).await;
 862            }
 863            batcher.flush().await;
 864
 865            sender_task.await?;
 866
 867            client
 868                .request(proto::FindSearchCandidatesChunk {
 869                    handle,
 870                    peer_id: Some(peer_id),
 871                    project_id,
 872                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 873                        proto::FindSearchCandidatesDone {},
 874                    )),
 875                })
 876                .await?;
 877            anyhow::Ok(())
 878        });
 879        _buffer_store.update(&mut cx, |this, _| {
 880            this.register_ongoing_project_search((peer_id, handle), task);
 881        });
 882
 883        Ok(proto::Ack {})
 884    }
 885
 886    // Goes from client to host.
 887    async fn handle_find_search_candidates_cancel(
 888        this: Entity<Self>,
 889        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 890        mut cx: AsyncApp,
 891    ) -> Result<()> {
 892        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 893        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 894    }
 895
 896    async fn handle_list_remote_directory(
 897        this: Entity<Self>,
 898        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
 899        cx: AsyncApp,
 900    ) -> Result<proto::ListRemoteDirectoryResponse> {
 901        use smol::stream::StreamExt;
 902        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 903        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 904        let check_info = envelope
 905            .payload
 906            .config
 907            .as_ref()
 908            .is_some_and(|config| config.is_dir);
 909
 910        let mut entries = Vec::new();
 911        let mut entry_info = Vec::new();
 912        let mut response = fs.read_dir(&expanded).await?;
 913        while let Some(path) = response.next().await {
 914            let path = path?;
 915            if let Some(file_name) = path.file_name() {
 916                entries.push(file_name.to_string_lossy().into_owned());
 917                if check_info {
 918                    let is_dir = fs.is_dir(&path).await;
 919                    entry_info.push(proto::EntryInfo { is_dir });
 920                }
 921            }
 922        }
 923        Ok(proto::ListRemoteDirectoryResponse {
 924            entries,
 925            entry_info,
 926        })
 927    }
 928
 929    async fn handle_get_path_metadata(
 930        this: Entity<Self>,
 931        envelope: TypedEnvelope<proto::GetPathMetadata>,
 932        cx: AsyncApp,
 933    ) -> Result<proto::GetPathMetadataResponse> {
 934        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 935        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 936
 937        let metadata = fs.metadata(&expanded).await?;
 938        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
 939
 940        Ok(proto::GetPathMetadataResponse {
 941            exists: metadata.is_some(),
 942            is_dir,
 943            path: expanded.to_string_lossy().into_owned(),
 944        })
 945    }
 946
 947    async fn handle_shutdown_remote_server(
 948        _this: Entity<Self>,
 949        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
 950        cx: AsyncApp,
 951    ) -> Result<proto::Ack> {
 952        cx.spawn(async move |cx| {
 953            cx.update(|cx| {
 954                // TODO: This is a hack, because in a headless project, shutdown isn't executed
 955                // when calling quit, but it should be.
 956                cx.shutdown();
 957                cx.quit();
 958            })
 959        })
 960        .detach();
 961
 962        Ok(proto::Ack {})
 963    }
 964
 965    pub async fn handle_ping(
 966        _this: Entity<Self>,
 967        _envelope: TypedEnvelope<proto::Ping>,
 968        _cx: AsyncApp,
 969    ) -> Result<proto::Ack> {
 970        log::debug!("Received ping from client");
 971        Ok(proto::Ack {})
 972    }
 973
 974    async fn handle_get_processes(
 975        _this: Entity<Self>,
 976        _envelope: TypedEnvelope<proto::GetProcesses>,
 977        _cx: AsyncApp,
 978    ) -> Result<proto::GetProcessesResponse> {
 979        let mut processes = Vec::new();
 980        let refresh_kind = RefreshKind::nothing().with_processes(
 981            ProcessRefreshKind::nothing()
 982                .without_tasks()
 983                .with_cmd(UpdateKind::Always),
 984        );
 985
 986        for process in System::new_with_specifics(refresh_kind)
 987            .processes()
 988            .values()
 989        {
 990            let name = process.name().to_string_lossy().into_owned();
 991            let command = process
 992                .cmd()
 993                .iter()
 994                .map(|s| s.to_string_lossy().into_owned())
 995                .collect::<Vec<_>>();
 996
 997            processes.push(proto::ProcessInfo {
 998                pid: process.pid().as_u32(),
 999                name,
1000                command,
1001            });
1002        }
1003
1004        processes.sort_by_key(|p| p.name.clone());
1005
1006        Ok(proto::GetProcessesResponse { processes })
1007    }
1008
1009    async fn handle_get_directory_environment(
1010        this: Entity<Self>,
1011        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1012        mut cx: AsyncApp,
1013    ) -> Result<proto::DirectoryEnvironment> {
1014        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1015        let directory = PathBuf::from(envelope.payload.directory);
1016        let environment = this
1017            .update(&mut cx, |this, cx| {
1018                this.environment.update(cx, |environment, cx| {
1019                    environment.local_directory_environment(&shell, directory.into(), cx)
1020                })
1021            })
1022            .await
1023            .context("failed to get directory environment")?
1024            .into_iter()
1025            .collect();
1026        Ok(proto::DirectoryEnvironment { environment })
1027    }
1028}
1029
1030fn prompt_to_proto(
1031    prompt: &project::LanguageServerPromptRequest,
1032) -> proto::language_server_prompt_request::Level {
1033    match prompt.level {
1034        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1035            proto::language_server_prompt_request::Info {},
1036        ),
1037        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1038            proto::language_server_prompt_request::Warning {},
1039        ),
1040        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1041            proto::language_server_prompt_request::Critical {},
1042        ),
1043    }
1044}