headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
  16    ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    context_server_store::ContextServerStore,
  20    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  21    git_store::GitStore,
  22    image_store::ImageId,
  23    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  24    project_settings::SettingsObserver,
  25    search::SearchQuery,
  26    task_store::TaskStore,
  27    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  28    worktree_store::{WorktreeIdCounter, WorktreeStore},
  29};
  30use rpc::{
  31    AnyProtoClient, TypedEnvelope,
  32    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  33};
  34
  35use settings::initial_server_settings_content;
  36use std::{
  37    num::NonZeroU64,
  38    path::{Path, PathBuf},
  39    sync::{
  40        Arc,
  41        atomic::{AtomicU64, AtomicUsize, Ordering},
  42    },
  43};
  44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  46use worktree::Worktree;
  47
  48pub struct HeadlessProject {
  49    pub fs: Arc<dyn Fs>,
  50    pub session: AnyProtoClient,
  51    pub worktree_store: Entity<WorktreeStore>,
  52    pub buffer_store: Entity<BufferStore>,
  53    pub lsp_store: Entity<LspStore>,
  54    pub task_store: Entity<TaskStore>,
  55    pub dap_store: Entity<DapStore>,
  56    pub breakpoint_store: Entity<BreakpointStore>,
  57    pub agent_server_store: Entity<AgentServerStore>,
  58    pub context_server_store: Entity<ContextServerStore>,
  59    pub settings_observer: Entity<SettingsObserver>,
  60    pub next_entry_id: Arc<AtomicUsize>,
  61    pub languages: Arc<LanguageRegistry>,
  62    pub extensions: Entity<HeadlessExtensionStore>,
  63    pub git_store: Entity<GitStore>,
  64    pub environment: Entity<ProjectEnvironment>,
  65    // Used mostly to keep alive the toolchain store for RPC handlers.
  66    // Local variant is used within LSP store, but that's a separate entity.
  67    pub _toolchain_store: Entity<ToolchainStore>,
  68}
  69
  70pub struct HeadlessAppState {
  71    pub session: AnyProtoClient,
  72    pub fs: Arc<dyn Fs>,
  73    pub http_client: Arc<dyn HttpClient>,
  74    pub node_runtime: NodeRuntime,
  75    pub languages: Arc<LanguageRegistry>,
  76    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  77}
  78
  79impl HeadlessProject {
  80    pub fn init(cx: &mut App) {
  81        settings::init(cx);
  82        log_store::init(true, cx);
  83    }
  84
  85    pub fn new(
  86        HeadlessAppState {
  87            session,
  88            fs,
  89            http_client,
  90            node_runtime,
  91            languages,
  92            extension_host_proxy: proxy,
  93        }: HeadlessAppState,
  94        init_worktree_trust: bool,
  95        cx: &mut Context<Self>,
  96    ) -> Self {
  97        debug_adapter_extension::init(proxy.clone(), cx);
  98        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
  99
 100        let worktree_store = cx.new(|cx| {
 101            let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
 102            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 103            store
 104        });
 105
 106        if init_worktree_trust {
 107            project::trusted_worktrees::track_worktree_trust(
 108                worktree_store.clone(),
 109                None::<RemoteHostLocation>,
 110                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 111                None,
 112                cx,
 113            );
 114        }
 115
 116        let environment =
 117            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 118        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 119        let toolchain_store = cx.new(|cx| {
 120            ToolchainStore::local(
 121                languages.clone(),
 122                worktree_store.clone(),
 123                environment.clone(),
 124                manifest_tree.clone(),
 125                fs.clone(),
 126                cx,
 127            )
 128        });
 129
 130        let buffer_store = cx.new(|cx| {
 131            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 132            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 133            buffer_store
 134        });
 135
 136        let breakpoint_store = cx.new(|_| {
 137            let mut breakpoint_store =
 138                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 139            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 140
 141            breakpoint_store
 142        });
 143
 144        let dap_store = cx.new(|cx| {
 145            let mut dap_store = DapStore::new_local(
 146                http_client.clone(),
 147                node_runtime.clone(),
 148                fs.clone(),
 149                environment.clone(),
 150                toolchain_store.read(cx).as_language_toolchain_store(),
 151                worktree_store.clone(),
 152                breakpoint_store.clone(),
 153                true,
 154                cx,
 155            );
 156            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 157            dap_store
 158        });
 159
 160        let git_store = cx.new(|cx| {
 161            let mut store = GitStore::local(
 162                &worktree_store,
 163                buffer_store.clone(),
 164                environment.clone(),
 165                fs.clone(),
 166                cx,
 167            );
 168            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 169            store
 170        });
 171
 172        let prettier_store = cx.new(|cx| {
 173            PrettierStore::new(
 174                node_runtime.clone(),
 175                fs.clone(),
 176                languages.clone(),
 177                worktree_store.clone(),
 178                cx,
 179            )
 180        });
 181
 182        let task_store = cx.new(|cx| {
 183            let mut task_store = TaskStore::local(
 184                buffer_store.downgrade(),
 185                worktree_store.clone(),
 186                toolchain_store.read(cx).as_language_toolchain_store(),
 187                environment.clone(),
 188                cx,
 189            );
 190            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 191            task_store
 192        });
 193        let settings_observer = cx.new(|cx| {
 194            let mut observer = SettingsObserver::new_local(
 195                fs.clone(),
 196                worktree_store.clone(),
 197                task_store.clone(),
 198                true,
 199                cx,
 200            );
 201            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 202            observer
 203        });
 204
 205        let lsp_store = cx.new(|cx| {
 206            let mut lsp_store = LspStore::new_local(
 207                buffer_store.clone(),
 208                worktree_store.clone(),
 209                prettier_store.clone(),
 210                toolchain_store
 211                    .read(cx)
 212                    .as_local_store()
 213                    .expect("Toolchain store to be local")
 214                    .clone(),
 215                environment.clone(),
 216                manifest_tree,
 217                languages.clone(),
 218                http_client.clone(),
 219                fs.clone(),
 220                cx,
 221            );
 222            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 223            lsp_store
 224        });
 225
 226        let agent_server_store = cx.new(|cx| {
 227            let mut agent_server_store = AgentServerStore::local(
 228                node_runtime.clone(),
 229                fs.clone(),
 230                environment.clone(),
 231                http_client.clone(),
 232                cx,
 233            );
 234            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 235            agent_server_store
 236        });
 237
 238        let context_server_store = cx.new(|cx| {
 239            let mut context_server_store =
 240                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 241            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 242            context_server_store
 243        });
 244
 245        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 246        language_extension::init(
 247            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 248            proxy.clone(),
 249            languages.clone(),
 250        );
 251
 252        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 253            if let BufferStoreEvent::BufferAdded(buffer) = event {
 254                cx.subscribe(buffer, Self::on_buffer_event).detach();
 255            }
 256        })
 257        .detach();
 258
 259        let extensions = HeadlessExtensionStore::new(
 260            fs.clone(),
 261            http_client.clone(),
 262            paths::remote_extensions_dir().to_path_buf(),
 263            proxy,
 264            node_runtime,
 265            cx,
 266        );
 267
 268        // local_machine -> ssh handlers
 269        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 270        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 271        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 272        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 273        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 274        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 275        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 276        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 280        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 281
 282        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 283        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 284        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 285        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 286        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 287
 288        session.add_entity_request_handler(Self::handle_add_worktree);
 289        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 290
 291        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 292        session.add_entity_request_handler(Self::handle_open_new_buffer);
 293        session.add_entity_request_handler(Self::handle_find_search_candidates);
 294        session.add_entity_request_handler(Self::handle_open_server_settings);
 295        session.add_entity_request_handler(Self::handle_get_directory_environment);
 296        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 297        session.add_entity_request_handler(Self::handle_open_image_by_path);
 298        session.add_entity_request_handler(Self::handle_trust_worktrees);
 299        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 300        session.add_entity_request_handler(Self::handle_download_file_by_path);
 301
 302        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 303        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 304        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 305
 306        session.add_request_handler(
 307            extensions.downgrade(),
 308            HeadlessExtensionStore::handle_sync_extensions,
 309        );
 310        session.add_request_handler(
 311            extensions.downgrade(),
 312            HeadlessExtensionStore::handle_install_extension,
 313        );
 314
 315        BufferStore::init(&session);
 316        WorktreeStore::init(&session);
 317        SettingsObserver::init(&session);
 318        LspStore::init(&session);
 319        TaskStore::init(Some(&session));
 320        ToolchainStore::init(&session);
 321        DapStore::init(&session, cx);
 322        // todo(debugger): Re init breakpoint store when we set it up for collab
 323        BreakpointStore::init(&session);
 324        GitStore::init(&session);
 325        AgentServerStore::init_headless(&session);
 326        ContextServerStore::init_headless(&session);
 327
 328        HeadlessProject {
 329            next_entry_id: Default::default(),
 330            session,
 331            settings_observer,
 332            fs,
 333            worktree_store,
 334            buffer_store,
 335            lsp_store,
 336            task_store,
 337            dap_store,
 338            breakpoint_store,
 339            agent_server_store,
 340            context_server_store,
 341            languages,
 342            extensions,
 343            git_store,
 344            environment,
 345            _toolchain_store: toolchain_store,
 346        }
 347    }
 348
 349    fn on_buffer_event(
 350        &mut self,
 351        buffer: Entity<Buffer>,
 352        event: &BufferEvent,
 353        cx: &mut Context<Self>,
 354    ) {
 355        if let BufferEvent::Operation {
 356            operation,
 357            is_local: true,
 358        } = event
 359        {
 360            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 361                project_id: REMOTE_SERVER_PROJECT_ID,
 362                buffer_id: buffer.read(cx).remote_id().to_proto(),
 363                operations: vec![serialize_operation(operation)],
 364            }))
 365            .detach()
 366        }
 367    }
 368
 369    fn on_lsp_store_event(
 370        &mut self,
 371        lsp_store: Entity<LspStore>,
 372        event: &LspStoreEvent,
 373        cx: &mut Context<Self>,
 374    ) {
 375        match event {
 376            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 377                let log_store = cx
 378                    .try_global::<GlobalLogStore>()
 379                    .map(|lsp_logs| lsp_logs.0.clone());
 380                if let Some(log_store) = log_store {
 381                    log_store.update(cx, |log_store, cx| {
 382                        log_store.add_language_server(
 383                            LanguageServerKind::LocalSsh {
 384                                lsp_store: self.lsp_store.downgrade(),
 385                            },
 386                            *id,
 387                            Some(name.clone()),
 388                            *worktree_id,
 389                            lsp_store.read(cx).language_server_for_id(*id),
 390                            cx,
 391                        );
 392                    });
 393                }
 394            }
 395            LspStoreEvent::LanguageServerRemoved(id) => {
 396                let log_store = cx
 397                    .try_global::<GlobalLogStore>()
 398                    .map(|lsp_logs| lsp_logs.0.clone());
 399                if let Some(log_store) = log_store {
 400                    log_store.update(cx, |log_store, cx| {
 401                        log_store.remove_language_server(*id, cx);
 402                    });
 403                }
 404            }
 405            LspStoreEvent::LanguageServerUpdate {
 406                language_server_id,
 407                name,
 408                message,
 409            } => {
 410                self.session
 411                    .send(proto::UpdateLanguageServer {
 412                        project_id: REMOTE_SERVER_PROJECT_ID,
 413                        server_name: name.as_ref().map(|name| name.to_string()),
 414                        language_server_id: language_server_id.to_proto(),
 415                        variant: Some(message.clone()),
 416                    })
 417                    .log_err();
 418            }
 419            LspStoreEvent::Notification(message) => {
 420                self.session
 421                    .send(proto::Toast {
 422                        project_id: REMOTE_SERVER_PROJECT_ID,
 423                        notification_id: "lsp".to_string(),
 424                        message: message.clone(),
 425                    })
 426                    .log_err();
 427            }
 428            LspStoreEvent::LanguageServerPrompt(prompt) => {
 429                let request = self.session.request(proto::LanguageServerPromptRequest {
 430                    project_id: REMOTE_SERVER_PROJECT_ID,
 431                    actions: prompt
 432                        .actions
 433                        .iter()
 434                        .map(|action| action.title.to_string())
 435                        .collect(),
 436                    level: Some(prompt_to_proto(prompt)),
 437                    lsp_name: prompt.lsp_name.clone(),
 438                    message: prompt.message.clone(),
 439                });
 440                let prompt = prompt.clone();
 441                cx.background_spawn(async move {
 442                    let response = request.await?;
 443                    if let Some(action_response) = response.action_response {
 444                        prompt.respond(action_response as usize).await;
 445                    }
 446                    anyhow::Ok(())
 447                })
 448                .detach();
 449            }
 450            _ => {}
 451        }
 452    }
 453
 454    pub async fn handle_add_worktree(
 455        this: Entity<Self>,
 456        message: TypedEnvelope<proto::AddWorktree>,
 457        mut cx: AsyncApp,
 458    ) -> Result<proto::AddWorktreeResponse> {
 459        use client::ErrorCodeExt;
 460        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 461        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 462
 463        let canonicalized = match fs.canonicalize(&path).await {
 464            Ok(path) => path,
 465            Err(e) => {
 466                let mut parent = path
 467                    .parent()
 468                    .ok_or(e)
 469                    .with_context(|| format!("{path:?} does not exist"))?;
 470                if parent == Path::new("") {
 471                    parent = util::paths::home_dir();
 472                }
 473                let parent = fs.canonicalize(parent).await.map_err(|_| {
 474                    anyhow!(
 475                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 476                            .with_tag("path", path.to_string_lossy().as_ref())
 477                    )
 478                })?;
 479                if let Some(file_name) = path.file_name() {
 480                    parent.join(file_name)
 481                } else {
 482                    parent
 483                }
 484            }
 485        };
 486        let next_worktree_id = this
 487            .update(&mut cx, |this, cx| {
 488                this.worktree_store
 489                    .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
 490            })
 491            .await?;
 492        let worktree = this
 493            .read_with(&cx.clone(), |this, _| {
 494                Worktree::local(
 495                    Arc::from(canonicalized.as_path()),
 496                    message.payload.visible,
 497                    this.fs.clone(),
 498                    this.next_entry_id.clone(),
 499                    true,
 500                    next_worktree_id,
 501                    &mut cx,
 502                )
 503            })
 504            .await?;
 505
 506        let response = this.read_with(&cx, |_, cx| {
 507            let worktree = worktree.read(cx);
 508            proto::AddWorktreeResponse {
 509                worktree_id: worktree.id().to_proto(),
 510                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 511            }
 512        });
 513
 514        // We spawn this asynchronously, so that we can send the response back
 515        // *before* `worktree_store.add()` can send out UpdateProject requests
 516        // to the client about the new worktree.
 517        //
 518        // That lets the client manage the reference/handles of the newly-added
 519        // worktree, before getting interrupted by an UpdateProject request.
 520        //
 521        // This fixes the problem of the client sending the AddWorktree request,
 522        // headless project sending out a project update, client receiving it
 523        // and immediately dropping the reference of the new client, causing it
 524        // to be dropped on the headless project, and the client only then
 525        // receiving a response to AddWorktree.
 526        cx.spawn(async move |cx| {
 527            this.update(cx, |this, cx| {
 528                this.worktree_store.update(cx, |worktree_store, cx| {
 529                    worktree_store.add(&worktree, cx);
 530                });
 531            });
 532        })
 533        .detach();
 534
 535        Ok(response)
 536    }
 537
 538    pub async fn handle_remove_worktree(
 539        this: Entity<Self>,
 540        envelope: TypedEnvelope<proto::RemoveWorktree>,
 541        mut cx: AsyncApp,
 542    ) -> Result<proto::Ack> {
 543        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 544        this.update(&mut cx, |this, cx| {
 545            this.worktree_store.update(cx, |worktree_store, cx| {
 546                worktree_store.remove_worktree(worktree_id, cx);
 547            });
 548        });
 549        Ok(proto::Ack {})
 550    }
 551
 552    pub async fn handle_open_buffer_by_path(
 553        this: Entity<Self>,
 554        message: TypedEnvelope<proto::OpenBufferByPath>,
 555        mut cx: AsyncApp,
 556    ) -> Result<proto::OpenBufferResponse> {
 557        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 558        let path = RelPath::from_proto(&message.payload.path)?;
 559        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 560            let buffer_store = this.buffer_store.clone();
 561            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 562                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 563            });
 564            (buffer_store, buffer)
 565        });
 566
 567        let buffer = buffer.await?;
 568        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 569        buffer_store.update(&mut cx, |buffer_store, cx| {
 570            buffer_store
 571                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 572                .detach_and_log_err(cx);
 573        });
 574
 575        Ok(proto::OpenBufferResponse {
 576            buffer_id: buffer_id.to_proto(),
 577        })
 578    }
 579
 580    pub async fn handle_open_image_by_path(
 581        this: Entity<Self>,
 582        message: TypedEnvelope<proto::OpenImageByPath>,
 583        mut cx: AsyncApp,
 584    ) -> Result<proto::OpenImageResponse> {
 585        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 586        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 587        let path = RelPath::from_proto(&message.payload.path)?;
 588        let project_id = message.payload.project_id;
 589        use proto::create_image_for_peer::Variant;
 590
 591        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 592            (this.worktree_store.clone(), this.session.clone())
 593        });
 594
 595        let worktree = worktree_store
 596            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 597            .context("worktree not found")?;
 598
 599        let load_task = worktree.update(&mut cx, |worktree, cx| {
 600            worktree.load_binary_file(path.as_ref(), cx)
 601        });
 602
 603        let loaded_file = load_task.await?;
 604        let content = loaded_file.content;
 605        let file = loaded_file.file;
 606
 607        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 608        let image_id =
 609            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 610
 611        let format = image::guess_format(&content)
 612            .map(|f| format!("{:?}", f).to_lowercase())
 613            .unwrap_or_else(|_| "unknown".to_string());
 614
 615        let state = proto::ImageState {
 616            id: image_id.to_proto(),
 617            file: Some(proto_file),
 618            content_size: content.len() as u64,
 619            format,
 620        };
 621
 622        session.send(proto::CreateImageForPeer {
 623            project_id,
 624            peer_id: Some(REMOTE_SERVER_PEER_ID),
 625            variant: Some(Variant::State(state)),
 626        })?;
 627
 628        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 629        for chunk in content.chunks(CHUNK_SIZE) {
 630            session.send(proto::CreateImageForPeer {
 631                project_id,
 632                peer_id: Some(REMOTE_SERVER_PEER_ID),
 633                variant: Some(Variant::Chunk(proto::ImageChunk {
 634                    image_id: image_id.to_proto(),
 635                    data: chunk.to_vec(),
 636                })),
 637            })?;
 638        }
 639
 640        Ok(proto::OpenImageResponse {
 641            image_id: image_id.to_proto(),
 642        })
 643    }
 644
 645    pub async fn handle_trust_worktrees(
 646        this: Entity<Self>,
 647        envelope: TypedEnvelope<proto::TrustWorktrees>,
 648        mut cx: AsyncApp,
 649    ) -> Result<proto::Ack> {
 650        let trusted_worktrees = cx
 651            .update(|cx| TrustedWorktrees::try_get_global(cx))
 652            .context("missing trusted worktrees")?;
 653        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 654        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 655            trusted_worktrees.trust(
 656                &worktree_store,
 657                envelope
 658                    .payload
 659                    .trusted_paths
 660                    .into_iter()
 661                    .filter_map(PathTrust::from_proto)
 662                    .collect(),
 663                cx,
 664            );
 665        });
 666        Ok(proto::Ack {})
 667    }
 668
 669    pub async fn handle_restrict_worktrees(
 670        this: Entity<Self>,
 671        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 672        mut cx: AsyncApp,
 673    ) -> Result<proto::Ack> {
 674        let trusted_worktrees = cx
 675            .update(|cx| TrustedWorktrees::try_get_global(cx))
 676            .context("missing trusted worktrees")?;
 677        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 678        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 679            let restricted_paths = envelope
 680                .payload
 681                .worktree_ids
 682                .into_iter()
 683                .map(WorktreeId::from_proto)
 684                .map(PathTrust::Worktree)
 685                .collect::<HashSet<_>>();
 686            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 687        });
 688        Ok(proto::Ack {})
 689    }
 690
 691    pub async fn handle_download_file_by_path(
 692        this: Entity<Self>,
 693        message: TypedEnvelope<proto::DownloadFileByPath>,
 694        mut cx: AsyncApp,
 695    ) -> Result<proto::DownloadFileResponse> {
 696        log::debug!(
 697            "handle_download_file_by_path: received request: {:?}",
 698            message.payload
 699        );
 700
 701        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 702        let path = RelPath::from_proto(&message.payload.path)?;
 703        let project_id = message.payload.project_id;
 704        let file_id = message.payload.file_id;
 705        log::debug!(
 706            "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
 707            worktree_id,
 708            path,
 709            file_id
 710        );
 711        use proto::create_file_for_peer::Variant;
 712
 713        let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
 714            .read_with(&cx, |this, _| {
 715                (this.worktree_store.clone(), this.session.clone())
 716            });
 717
 718        let worktree = worktree_store
 719            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 720            .context("worktree not found")?;
 721
 722        let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
 723            worktree.load_binary_file(path.as_ref(), cx)
 724        });
 725
 726        let downloaded_file = download_task.await?;
 727        let content = downloaded_file.content;
 728        let file = downloaded_file.file;
 729        log::debug!(
 730            "handle_download_file_by_path: file loaded, content_size={}",
 731            content.len()
 732        );
 733
 734        let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
 735        log::debug!(
 736            "handle_download_file_by_path: using client-provided file_id={}",
 737            file_id
 738        );
 739
 740        let state = proto::FileState {
 741            id: file_id,
 742            file: Some(proto_file),
 743            content_size: content.len() as u64,
 744        };
 745
 746        log::debug!("handle_download_file_by_path: sending State message");
 747        session.send(proto::CreateFileForPeer {
 748            project_id,
 749            peer_id: Some(REMOTE_SERVER_PEER_ID),
 750            variant: Some(Variant::State(state)),
 751        })?;
 752
 753        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 754        let num_chunks = content.len().div_ceil(CHUNK_SIZE);
 755        log::debug!(
 756            "handle_download_file_by_path: sending {} chunks",
 757            num_chunks
 758        );
 759        for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
 760            log::trace!(
 761                "handle_download_file_by_path: sending chunk {}/{}, size={}",
 762                i + 1,
 763                num_chunks,
 764                chunk.len()
 765            );
 766            session.send(proto::CreateFileForPeer {
 767                project_id,
 768                peer_id: Some(REMOTE_SERVER_PEER_ID),
 769                variant: Some(Variant::Chunk(proto::FileChunk {
 770                    file_id,
 771                    data: chunk.to_vec(),
 772                })),
 773            })?;
 774        }
 775
 776        log::debug!(
 777            "handle_download_file_by_path: returning file_id={}",
 778            file_id
 779        );
 780        Ok(proto::DownloadFileResponse { file_id })
 781    }
 782
 783    pub async fn handle_open_new_buffer(
 784        this: Entity<Self>,
 785        _message: TypedEnvelope<proto::OpenNewBuffer>,
 786        mut cx: AsyncApp,
 787    ) -> Result<proto::OpenBufferResponse> {
 788        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 789            let buffer_store = this.buffer_store.clone();
 790            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 791                buffer_store.create_buffer(None, true, cx)
 792            });
 793            (buffer_store, buffer)
 794        });
 795
 796        let buffer = buffer.await?;
 797        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 798        buffer_store.update(&mut cx, |buffer_store, cx| {
 799            buffer_store
 800                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 801                .detach_and_log_err(cx);
 802        });
 803
 804        Ok(proto::OpenBufferResponse {
 805            buffer_id: buffer_id.to_proto(),
 806        })
 807    }
 808
 809    async fn handle_toggle_lsp_logs(
 810        _: Entity<Self>,
 811        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 812        cx: AsyncApp,
 813    ) -> Result<()> {
 814        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 815        cx.update(|cx| {
 816            let log_store = cx
 817                .try_global::<GlobalLogStore>()
 818                .map(|global_log_store| global_log_store.0.clone())
 819                .context("lsp logs store is missing")?;
 820            let toggled_log_kind =
 821                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 822                    .context("invalid log type")?
 823                {
 824                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 825                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 826                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 827                };
 828            log_store.update(cx, |log_store, _| {
 829                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 830            });
 831            anyhow::Ok(())
 832        })?;
 833
 834        Ok(())
 835    }
 836
 837    async fn handle_open_server_settings(
 838        this: Entity<Self>,
 839        _: TypedEnvelope<proto::OpenServerSettings>,
 840        mut cx: AsyncApp,
 841    ) -> Result<proto::OpenBufferResponse> {
 842        let settings_path = paths::settings_file();
 843        let (worktree, path) = this
 844            .update(&mut cx, |this, cx| {
 845                this.worktree_store.update(cx, |worktree_store, cx| {
 846                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 847                })
 848            })
 849            .await?;
 850
 851        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 852            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 853                buffer_store.open_buffer(
 854                    ProjectPath {
 855                        worktree_id: worktree.read(cx).id(),
 856                        path,
 857                    },
 858                    cx,
 859                )
 860            });
 861
 862            (buffer, this.buffer_store.clone())
 863        });
 864
 865        let buffer = buffer.await?;
 866
 867        let buffer_id = cx.update(|cx| {
 868            if buffer.read(cx).is_empty() {
 869                buffer.update(cx, |buffer, cx| {
 870                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 871                });
 872            }
 873
 874            let buffer_id = buffer.read(cx).remote_id();
 875
 876            buffer_store.update(cx, |buffer_store, cx| {
 877                buffer_store
 878                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 879                    .detach_and_log_err(cx);
 880            });
 881
 882            buffer_id
 883        });
 884
 885        Ok(proto::OpenBufferResponse {
 886            buffer_id: buffer_id.to_proto(),
 887        })
 888    }
 889
 890    async fn handle_find_search_candidates(
 891        this: Entity<Self>,
 892        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 893        mut cx: AsyncApp,
 894    ) -> Result<proto::Ack> {
 895        use futures::stream::StreamExt as _;
 896
 897        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 898        let message = envelope.payload;
 899        let query = SearchQuery::from_proto(
 900            message.query.context("missing query field")?,
 901            PathStyle::local(),
 902        )?;
 903
 904        let project_id = message.project_id;
 905        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 906        let handle = message.handle;
 907        let _buffer_store = buffer_store.clone();
 908        let client = this.read_with(&cx, |this, _| this.session.clone());
 909        let task = cx.spawn(async move |cx| {
 910            let results = this.update(cx, |this, cx| {
 911                project::Search::local(
 912                    this.fs.clone(),
 913                    this.buffer_store.clone(),
 914                    this.worktree_store.clone(),
 915                    message.limit as _,
 916                    cx,
 917                )
 918                .into_handle(query, cx)
 919                .matching_buffers(cx)
 920            });
 921            let (batcher, batches) =
 922                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 923            let mut new_matches = Box::pin(results.rx);
 924
 925            let sender_task = cx.background_executor().spawn({
 926                let client = client.clone();
 927                async move {
 928                    let mut batches = std::pin::pin!(batches);
 929                    while let Some(buffer_ids) = batches.next().await {
 930                        client
 931                            .request(proto::FindSearchCandidatesChunk {
 932                                handle,
 933                                peer_id: Some(peer_id),
 934                                project_id,
 935                                variant: Some(
 936                                    proto::find_search_candidates_chunk::Variant::Matches(
 937                                        proto::FindSearchCandidatesMatches { buffer_ids },
 938                                    ),
 939                                ),
 940                            })
 941                            .await?;
 942                    }
 943                    anyhow::Ok(())
 944                }
 945            });
 946
 947            while let Some(buffer) = new_matches.next().await {
 948                let _ = buffer_store
 949                    .update(cx, |this, cx| {
 950                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 951                    })
 952                    .await;
 953                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 954                batcher.push(buffer_id).await;
 955            }
 956            batcher.flush().await;
 957
 958            sender_task.await?;
 959
 960            client
 961                .request(proto::FindSearchCandidatesChunk {
 962                    handle,
 963                    peer_id: Some(peer_id),
 964                    project_id,
 965                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 966                        proto::FindSearchCandidatesDone {},
 967                    )),
 968                })
 969                .await?;
 970            anyhow::Ok(())
 971        });
 972        _buffer_store.update(&mut cx, |this, _| {
 973            this.register_ongoing_project_search((peer_id, handle), task);
 974        });
 975
 976        Ok(proto::Ack {})
 977    }
 978
 979    // Goes from client to host.
 980    async fn handle_find_search_candidates_cancel(
 981        this: Entity<Self>,
 982        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 983        mut cx: AsyncApp,
 984    ) -> Result<()> {
 985        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 986        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 987    }
 988
 989    async fn handle_list_remote_directory(
 990        this: Entity<Self>,
 991        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
 992        cx: AsyncApp,
 993    ) -> Result<proto::ListRemoteDirectoryResponse> {
 994        use smol::stream::StreamExt;
 995        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 996        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 997        let check_info = envelope
 998            .payload
 999            .config
1000            .as_ref()
1001            .is_some_and(|config| config.is_dir);
1002
1003        let mut entries = Vec::new();
1004        let mut entry_info = Vec::new();
1005        let mut response = fs.read_dir(&expanded).await?;
1006        while let Some(path) = response.next().await {
1007            let path = path?;
1008            if let Some(file_name) = path.file_name() {
1009                entries.push(file_name.to_string_lossy().into_owned());
1010                if check_info {
1011                    let is_dir = fs.is_dir(&path).await;
1012                    entry_info.push(proto::EntryInfo { is_dir });
1013                }
1014            }
1015        }
1016        Ok(proto::ListRemoteDirectoryResponse {
1017            entries,
1018            entry_info,
1019        })
1020    }
1021
1022    async fn handle_get_path_metadata(
1023        this: Entity<Self>,
1024        envelope: TypedEnvelope<proto::GetPathMetadata>,
1025        cx: AsyncApp,
1026    ) -> Result<proto::GetPathMetadataResponse> {
1027        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1028        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1029
1030        let metadata = fs.metadata(&expanded).await?;
1031        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1032
1033        Ok(proto::GetPathMetadataResponse {
1034            exists: metadata.is_some(),
1035            is_dir,
1036            path: expanded.to_string_lossy().into_owned(),
1037        })
1038    }
1039
1040    async fn handle_shutdown_remote_server(
1041        _this: Entity<Self>,
1042        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1043        cx: AsyncApp,
1044    ) -> Result<proto::Ack> {
1045        cx.spawn(async move |cx| {
1046            cx.update(|cx| {
1047                // TODO: This is a hack, because in a headless project, shutdown isn't executed
1048                // when calling quit, but it should be.
1049                cx.shutdown();
1050                cx.quit();
1051            })
1052        })
1053        .detach();
1054
1055        Ok(proto::Ack {})
1056    }
1057
1058    pub async fn handle_ping(
1059        _this: Entity<Self>,
1060        _envelope: TypedEnvelope<proto::Ping>,
1061        _cx: AsyncApp,
1062    ) -> Result<proto::Ack> {
1063        log::debug!("Received ping from client");
1064        Ok(proto::Ack {})
1065    }
1066
1067    async fn handle_get_processes(
1068        _this: Entity<Self>,
1069        _envelope: TypedEnvelope<proto::GetProcesses>,
1070        _cx: AsyncApp,
1071    ) -> Result<proto::GetProcessesResponse> {
1072        let mut processes = Vec::new();
1073        let refresh_kind = RefreshKind::nothing().with_processes(
1074            ProcessRefreshKind::nothing()
1075                .without_tasks()
1076                .with_cmd(UpdateKind::Always),
1077        );
1078
1079        for process in System::new_with_specifics(refresh_kind)
1080            .processes()
1081            .values()
1082        {
1083            let name = process.name().to_string_lossy().into_owned();
1084            let command = process
1085                .cmd()
1086                .iter()
1087                .map(|s| s.to_string_lossy().into_owned())
1088                .collect::<Vec<_>>();
1089
1090            processes.push(proto::ProcessInfo {
1091                pid: process.pid().as_u32(),
1092                name,
1093                command,
1094            });
1095        }
1096
1097        processes.sort_by_key(|p| p.name.clone());
1098
1099        Ok(proto::GetProcessesResponse { processes })
1100    }
1101
1102    async fn handle_get_directory_environment(
1103        this: Entity<Self>,
1104        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1105        mut cx: AsyncApp,
1106    ) -> Result<proto::DirectoryEnvironment> {
1107        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1108        let directory = PathBuf::from(envelope.payload.directory);
1109        let environment = this
1110            .update(&mut cx, |this, cx| {
1111                this.environment.update(cx, |environment, cx| {
1112                    environment.local_directory_environment(&shell, directory.into(), cx)
1113                })
1114            })
1115            .await
1116            .context("failed to get directory environment")?
1117            .into_iter()
1118            .collect();
1119        Ok(proto::DirectoryEnvironment { environment })
1120    }
1121}
1122
1123fn prompt_to_proto(
1124    prompt: &project::LanguageServerPromptRequest,
1125) -> proto::language_server_prompt_request::Level {
1126    match prompt.level {
1127        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1128            proto::language_server_prompt_request::Info {},
1129        ),
1130        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1131            proto::language_server_prompt_request::Warning {},
1132        ),
1133        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1134            proto::language_server_prompt_request::Critical {},
1135        ),
1136    }
1137}