headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    AgentRegistryStore, LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment,
  16    ProjectPath, ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    context_server_store::ContextServerStore,
  20    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  21    git_store::GitStore,
  22    image_store::ImageId,
  23    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  24    project_settings::SettingsObserver,
  25    search::SearchQuery,
  26    task_store::TaskStore,
  27    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  28    worktree_store::{WorktreeIdCounter, WorktreeStore},
  29};
  30use rpc::{
  31    AnyProtoClient, TypedEnvelope,
  32    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  33};
  34
  35use settings::initial_server_settings_content;
  36use std::{
  37    num::NonZeroU64,
  38    path::{Path, PathBuf},
  39    sync::{
  40        Arc,
  41        atomic::{AtomicU64, AtomicUsize, Ordering},
  42    },
  43};
  44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  46use worktree::Worktree;
  47
  48pub struct HeadlessProject {
  49    pub fs: Arc<dyn Fs>,
  50    pub session: AnyProtoClient,
  51    pub worktree_store: Entity<WorktreeStore>,
  52    pub buffer_store: Entity<BufferStore>,
  53    pub lsp_store: Entity<LspStore>,
  54    pub task_store: Entity<TaskStore>,
  55    pub dap_store: Entity<DapStore>,
  56    pub breakpoint_store: Entity<BreakpointStore>,
  57    pub agent_server_store: Entity<AgentServerStore>,
  58    pub context_server_store: Entity<ContextServerStore>,
  59    pub settings_observer: Entity<SettingsObserver>,
  60    pub next_entry_id: Arc<AtomicUsize>,
  61    pub languages: Arc<LanguageRegistry>,
  62    pub extensions: Entity<HeadlessExtensionStore>,
  63    pub git_store: Entity<GitStore>,
  64    pub environment: Entity<ProjectEnvironment>,
  65    // Used mostly to keep alive the toolchain store for RPC handlers.
  66    // Local variant is used within LSP store, but that's a separate entity.
  67    pub _toolchain_store: Entity<ToolchainStore>,
  68}
  69
  70pub struct HeadlessAppState {
  71    pub session: AnyProtoClient,
  72    pub fs: Arc<dyn Fs>,
  73    pub http_client: Arc<dyn HttpClient>,
  74    pub node_runtime: NodeRuntime,
  75    pub languages: Arc<LanguageRegistry>,
  76    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  77}
  78
  79impl HeadlessProject {
  80    pub fn init(cx: &mut App) {
  81        settings::init(cx);
  82        log_store::init(true, cx);
  83    }
  84
  85    pub fn new(
  86        HeadlessAppState {
  87            session,
  88            fs,
  89            http_client,
  90            node_runtime,
  91            languages,
  92            extension_host_proxy: proxy,
  93        }: HeadlessAppState,
  94        init_worktree_trust: bool,
  95        cx: &mut Context<Self>,
  96    ) -> Self {
  97        debug_adapter_extension::init(proxy.clone(), cx);
  98        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
  99
 100        let worktree_store = cx.new(|cx| {
 101            let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
 102            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 103            store
 104        });
 105
 106        if init_worktree_trust {
 107            project::trusted_worktrees::track_worktree_trust(
 108                worktree_store.clone(),
 109                None::<RemoteHostLocation>,
 110                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 111                None,
 112                cx,
 113            );
 114        }
 115
 116        let environment =
 117            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 118        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 119        let toolchain_store = cx.new(|cx| {
 120            ToolchainStore::local(
 121                languages.clone(),
 122                worktree_store.clone(),
 123                environment.clone(),
 124                manifest_tree.clone(),
 125                fs.clone(),
 126                cx,
 127            )
 128        });
 129
 130        let buffer_store = cx.new(|cx| {
 131            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 132            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 133            buffer_store
 134        });
 135
 136        let breakpoint_store = cx.new(|_| {
 137            let mut breakpoint_store =
 138                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 139            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 140
 141            breakpoint_store
 142        });
 143
 144        let dap_store = cx.new(|cx| {
 145            let mut dap_store = DapStore::new_local(
 146                http_client.clone(),
 147                node_runtime.clone(),
 148                fs.clone(),
 149                environment.clone(),
 150                toolchain_store.read(cx).as_language_toolchain_store(),
 151                worktree_store.clone(),
 152                breakpoint_store.clone(),
 153                true,
 154                cx,
 155            );
 156            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 157            dap_store
 158        });
 159
 160        let git_store = cx.new(|cx| {
 161            let mut store = GitStore::local(
 162                &worktree_store,
 163                buffer_store.clone(),
 164                environment.clone(),
 165                fs.clone(),
 166                cx,
 167            );
 168            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 169            store
 170        });
 171
 172        let prettier_store = cx.new(|cx| {
 173            PrettierStore::new(
 174                node_runtime.clone(),
 175                fs.clone(),
 176                languages.clone(),
 177                worktree_store.clone(),
 178                cx,
 179            )
 180        });
 181
 182        let task_store = cx.new(|cx| {
 183            let mut task_store = TaskStore::local(
 184                buffer_store.downgrade(),
 185                worktree_store.clone(),
 186                toolchain_store.read(cx).as_language_toolchain_store(),
 187                environment.clone(),
 188                cx,
 189            );
 190            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 191            task_store
 192        });
 193        let settings_observer = cx.new(|cx| {
 194            let mut observer = SettingsObserver::new_local(
 195                fs.clone(),
 196                worktree_store.clone(),
 197                task_store.clone(),
 198                true,
 199                cx,
 200            );
 201            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 202            observer
 203        });
 204
 205        let lsp_store = cx.new(|cx| {
 206            let mut lsp_store = LspStore::new_local(
 207                buffer_store.clone(),
 208                worktree_store.clone(),
 209                prettier_store.clone(),
 210                toolchain_store
 211                    .read(cx)
 212                    .as_local_store()
 213                    .expect("Toolchain store to be local")
 214                    .clone(),
 215                environment.clone(),
 216                manifest_tree,
 217                languages.clone(),
 218                http_client.clone(),
 219                fs.clone(),
 220                cx,
 221            );
 222            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 223            lsp_store
 224        });
 225
 226        AgentRegistryStore::init_global(cx, fs.clone(), http_client.clone());
 227
 228        let agent_server_store = cx.new(|cx| {
 229            let mut agent_server_store = AgentServerStore::local(
 230                node_runtime.clone(),
 231                fs.clone(),
 232                environment.clone(),
 233                http_client.clone(),
 234                cx,
 235            );
 236            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 237            agent_server_store
 238        });
 239
 240        let context_server_store = cx.new(|cx| {
 241            let mut context_server_store =
 242                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 243            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 244            context_server_store
 245        });
 246
 247        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 248        language_extension::init(
 249            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 250            proxy.clone(),
 251            languages.clone(),
 252        );
 253
 254        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 255            if let BufferStoreEvent::BufferAdded(buffer) = event {
 256                cx.subscribe(buffer, Self::on_buffer_event).detach();
 257            }
 258        })
 259        .detach();
 260
 261        let extensions = HeadlessExtensionStore::new(
 262            fs.clone(),
 263            http_client.clone(),
 264            paths::remote_extensions_dir().to_path_buf(),
 265            proxy,
 266            node_runtime,
 267            cx,
 268        );
 269
 270        // local_machine -> ssh handlers
 271        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 272        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 273        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 274        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 275        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 276        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 280        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 281        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 282        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 283
 284        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 285        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 286        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 287        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 288        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 289
 290        session.add_entity_request_handler(Self::handle_add_worktree);
 291        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 292
 293        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 294        session.add_entity_request_handler(Self::handle_open_new_buffer);
 295        session.add_entity_request_handler(Self::handle_find_search_candidates);
 296        session.add_entity_request_handler(Self::handle_open_server_settings);
 297        session.add_entity_request_handler(Self::handle_get_directory_environment);
 298        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 299        session.add_entity_request_handler(Self::handle_open_image_by_path);
 300        session.add_entity_request_handler(Self::handle_trust_worktrees);
 301        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 302        session.add_entity_request_handler(Self::handle_download_file_by_path);
 303
 304        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 305        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 306        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 307
 308        session.add_request_handler(
 309            extensions.downgrade(),
 310            HeadlessExtensionStore::handle_sync_extensions,
 311        );
 312        session.add_request_handler(
 313            extensions.downgrade(),
 314            HeadlessExtensionStore::handle_install_extension,
 315        );
 316
 317        BufferStore::init(&session);
 318        WorktreeStore::init(&session);
 319        SettingsObserver::init(&session);
 320        LspStore::init(&session);
 321        TaskStore::init(Some(&session));
 322        ToolchainStore::init(&session);
 323        DapStore::init(&session, cx);
 324        // todo(debugger): Re init breakpoint store when we set it up for collab
 325        BreakpointStore::init(&session);
 326        GitStore::init(&session);
 327        AgentServerStore::init_headless(&session);
 328        ContextServerStore::init_headless(&session);
 329
 330        HeadlessProject {
 331            next_entry_id: Default::default(),
 332            session,
 333            settings_observer,
 334            fs,
 335            worktree_store,
 336            buffer_store,
 337            lsp_store,
 338            task_store,
 339            dap_store,
 340            breakpoint_store,
 341            agent_server_store,
 342            context_server_store,
 343            languages,
 344            extensions,
 345            git_store,
 346            environment,
 347            _toolchain_store: toolchain_store,
 348        }
 349    }
 350
 351    fn on_buffer_event(
 352        &mut self,
 353        buffer: Entity<Buffer>,
 354        event: &BufferEvent,
 355        cx: &mut Context<Self>,
 356    ) {
 357        if let BufferEvent::Operation {
 358            operation,
 359            is_local: true,
 360        } = event
 361        {
 362            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 363                project_id: REMOTE_SERVER_PROJECT_ID,
 364                buffer_id: buffer.read(cx).remote_id().to_proto(),
 365                operations: vec![serialize_operation(operation)],
 366            }))
 367            .detach()
 368        }
 369    }
 370
 371    fn on_lsp_store_event(
 372        &mut self,
 373        lsp_store: Entity<LspStore>,
 374        event: &LspStoreEvent,
 375        cx: &mut Context<Self>,
 376    ) {
 377        match event {
 378            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 379                let log_store = cx
 380                    .try_global::<GlobalLogStore>()
 381                    .map(|lsp_logs| lsp_logs.0.clone());
 382                if let Some(log_store) = log_store {
 383                    log_store.update(cx, |log_store, cx| {
 384                        log_store.add_language_server(
 385                            LanguageServerKind::LocalSsh {
 386                                lsp_store: self.lsp_store.downgrade(),
 387                            },
 388                            *id,
 389                            Some(name.clone()),
 390                            *worktree_id,
 391                            lsp_store.read(cx).language_server_for_id(*id),
 392                            cx,
 393                        );
 394                    });
 395                }
 396            }
 397            LspStoreEvent::LanguageServerRemoved(id) => {
 398                let log_store = cx
 399                    .try_global::<GlobalLogStore>()
 400                    .map(|lsp_logs| lsp_logs.0.clone());
 401                if let Some(log_store) = log_store {
 402                    log_store.update(cx, |log_store, cx| {
 403                        log_store.remove_language_server(*id, cx);
 404                    });
 405                }
 406            }
 407            LspStoreEvent::LanguageServerUpdate {
 408                language_server_id,
 409                name,
 410                message,
 411            } => {
 412                self.session
 413                    .send(proto::UpdateLanguageServer {
 414                        project_id: REMOTE_SERVER_PROJECT_ID,
 415                        server_name: name.as_ref().map(|name| name.to_string()),
 416                        language_server_id: language_server_id.to_proto(),
 417                        variant: Some(message.clone()),
 418                    })
 419                    .log_err();
 420            }
 421            LspStoreEvent::Notification(message) => {
 422                self.session
 423                    .send(proto::Toast {
 424                        project_id: REMOTE_SERVER_PROJECT_ID,
 425                        notification_id: "lsp".to_string(),
 426                        message: message.clone(),
 427                    })
 428                    .log_err();
 429            }
 430            LspStoreEvent::LanguageServerPrompt(prompt) => {
 431                let request = self.session.request(proto::LanguageServerPromptRequest {
 432                    project_id: REMOTE_SERVER_PROJECT_ID,
 433                    actions: prompt
 434                        .actions
 435                        .iter()
 436                        .map(|action| action.title.to_string())
 437                        .collect(),
 438                    level: Some(prompt_to_proto(prompt)),
 439                    lsp_name: prompt.lsp_name.clone(),
 440                    message: prompt.message.clone(),
 441                });
 442                let prompt = prompt.clone();
 443                cx.background_spawn(async move {
 444                    let response = request.await?;
 445                    if let Some(action_response) = response.action_response {
 446                        prompt.respond(action_response as usize).await;
 447                    }
 448                    anyhow::Ok(())
 449                })
 450                .detach();
 451            }
 452            _ => {}
 453        }
 454    }
 455
 456    pub async fn handle_add_worktree(
 457        this: Entity<Self>,
 458        message: TypedEnvelope<proto::AddWorktree>,
 459        mut cx: AsyncApp,
 460    ) -> Result<proto::AddWorktreeResponse> {
 461        use client::ErrorCodeExt;
 462        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 463        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 464
 465        let canonicalized = match fs.canonicalize(&path).await {
 466            Ok(path) => path,
 467            Err(e) => {
 468                let mut parent = path
 469                    .parent()
 470                    .ok_or(e)
 471                    .with_context(|| format!("{path:?} does not exist"))?;
 472                if parent == Path::new("") {
 473                    parent = util::paths::home_dir();
 474                }
 475                let parent = fs.canonicalize(parent).await.map_err(|_| {
 476                    anyhow!(
 477                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 478                            .with_tag("path", path.to_string_lossy().as_ref())
 479                    )
 480                })?;
 481                if let Some(file_name) = path.file_name() {
 482                    parent.join(file_name)
 483                } else {
 484                    parent
 485                }
 486            }
 487        };
 488        let next_worktree_id = this
 489            .update(&mut cx, |this, cx| {
 490                this.worktree_store
 491                    .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
 492            })
 493            .await?;
 494        let worktree = this
 495            .read_with(&cx.clone(), |this, _| {
 496                Worktree::local(
 497                    Arc::from(canonicalized.as_path()),
 498                    message.payload.visible,
 499                    this.fs.clone(),
 500                    this.next_entry_id.clone(),
 501                    true,
 502                    next_worktree_id,
 503                    &mut cx,
 504                )
 505            })
 506            .await?;
 507
 508        let response = this.read_with(&cx, |_, cx| {
 509            let worktree = worktree.read(cx);
 510            proto::AddWorktreeResponse {
 511                worktree_id: worktree.id().to_proto(),
 512                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 513            }
 514        });
 515
 516        // We spawn this asynchronously, so that we can send the response back
 517        // *before* `worktree_store.add()` can send out UpdateProject requests
 518        // to the client about the new worktree.
 519        //
 520        // That lets the client manage the reference/handles of the newly-added
 521        // worktree, before getting interrupted by an UpdateProject request.
 522        //
 523        // This fixes the problem of the client sending the AddWorktree request,
 524        // headless project sending out a project update, client receiving it
 525        // and immediately dropping the reference of the new client, causing it
 526        // to be dropped on the headless project, and the client only then
 527        // receiving a response to AddWorktree.
 528        cx.spawn(async move |cx| {
 529            this.update(cx, |this, cx| {
 530                this.worktree_store.update(cx, |worktree_store, cx| {
 531                    worktree_store.add(&worktree, cx);
 532                });
 533            });
 534        })
 535        .detach();
 536
 537        Ok(response)
 538    }
 539
 540    pub async fn handle_remove_worktree(
 541        this: Entity<Self>,
 542        envelope: TypedEnvelope<proto::RemoveWorktree>,
 543        mut cx: AsyncApp,
 544    ) -> Result<proto::Ack> {
 545        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 546        this.update(&mut cx, |this, cx| {
 547            this.worktree_store.update(cx, |worktree_store, cx| {
 548                worktree_store.remove_worktree(worktree_id, cx);
 549            });
 550        });
 551        Ok(proto::Ack {})
 552    }
 553
 554    pub async fn handle_open_buffer_by_path(
 555        this: Entity<Self>,
 556        message: TypedEnvelope<proto::OpenBufferByPath>,
 557        mut cx: AsyncApp,
 558    ) -> Result<proto::OpenBufferResponse> {
 559        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 560        let path = RelPath::from_proto(&message.payload.path)?;
 561        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 562            let buffer_store = this.buffer_store.clone();
 563            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 564                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 565            });
 566            (buffer_store, buffer)
 567        });
 568
 569        let buffer = buffer.await?;
 570        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 571        buffer_store.update(&mut cx, |buffer_store, cx| {
 572            buffer_store
 573                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 574                .detach_and_log_err(cx);
 575        });
 576
 577        Ok(proto::OpenBufferResponse {
 578            buffer_id: buffer_id.to_proto(),
 579        })
 580    }
 581
 582    pub async fn handle_open_image_by_path(
 583        this: Entity<Self>,
 584        message: TypedEnvelope<proto::OpenImageByPath>,
 585        mut cx: AsyncApp,
 586    ) -> Result<proto::OpenImageResponse> {
 587        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 588        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 589        let path = RelPath::from_proto(&message.payload.path)?;
 590        let project_id = message.payload.project_id;
 591        use proto::create_image_for_peer::Variant;
 592
 593        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 594            (this.worktree_store.clone(), this.session.clone())
 595        });
 596
 597        let worktree = worktree_store
 598            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 599            .context("worktree not found")?;
 600
 601        let load_task = worktree.update(&mut cx, |worktree, cx| {
 602            worktree.load_binary_file(path.as_ref(), cx)
 603        });
 604
 605        let loaded_file = load_task.await?;
 606        let content = loaded_file.content;
 607        let file = loaded_file.file;
 608
 609        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 610        let image_id =
 611            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 612
 613        let format = image::guess_format(&content)
 614            .map(|f| format!("{:?}", f).to_lowercase())
 615            .unwrap_or_else(|_| "unknown".to_string());
 616
 617        let state = proto::ImageState {
 618            id: image_id.to_proto(),
 619            file: Some(proto_file),
 620            content_size: content.len() as u64,
 621            format,
 622        };
 623
 624        session.send(proto::CreateImageForPeer {
 625            project_id,
 626            peer_id: Some(REMOTE_SERVER_PEER_ID),
 627            variant: Some(Variant::State(state)),
 628        })?;
 629
 630        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 631        for chunk in content.chunks(CHUNK_SIZE) {
 632            session.send(proto::CreateImageForPeer {
 633                project_id,
 634                peer_id: Some(REMOTE_SERVER_PEER_ID),
 635                variant: Some(Variant::Chunk(proto::ImageChunk {
 636                    image_id: image_id.to_proto(),
 637                    data: chunk.to_vec(),
 638                })),
 639            })?;
 640        }
 641
 642        Ok(proto::OpenImageResponse {
 643            image_id: image_id.to_proto(),
 644        })
 645    }
 646
 647    pub async fn handle_trust_worktrees(
 648        this: Entity<Self>,
 649        envelope: TypedEnvelope<proto::TrustWorktrees>,
 650        mut cx: AsyncApp,
 651    ) -> Result<proto::Ack> {
 652        let trusted_worktrees = cx
 653            .update(|cx| TrustedWorktrees::try_get_global(cx))
 654            .context("missing trusted worktrees")?;
 655        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 656        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 657            trusted_worktrees.trust(
 658                &worktree_store,
 659                envelope
 660                    .payload
 661                    .trusted_paths
 662                    .into_iter()
 663                    .filter_map(PathTrust::from_proto)
 664                    .collect(),
 665                cx,
 666            );
 667        });
 668        Ok(proto::Ack {})
 669    }
 670
 671    pub async fn handle_restrict_worktrees(
 672        this: Entity<Self>,
 673        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 674        mut cx: AsyncApp,
 675    ) -> Result<proto::Ack> {
 676        let trusted_worktrees = cx
 677            .update(|cx| TrustedWorktrees::try_get_global(cx))
 678            .context("missing trusted worktrees")?;
 679        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 680        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 681            let restricted_paths = envelope
 682                .payload
 683                .worktree_ids
 684                .into_iter()
 685                .map(WorktreeId::from_proto)
 686                .map(PathTrust::Worktree)
 687                .collect::<HashSet<_>>();
 688            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 689        });
 690        Ok(proto::Ack {})
 691    }
 692
 693    pub async fn handle_download_file_by_path(
 694        this: Entity<Self>,
 695        message: TypedEnvelope<proto::DownloadFileByPath>,
 696        mut cx: AsyncApp,
 697    ) -> Result<proto::DownloadFileResponse> {
 698        log::debug!(
 699            "handle_download_file_by_path: received request: {:?}",
 700            message.payload
 701        );
 702
 703        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 704        let path = RelPath::from_proto(&message.payload.path)?;
 705        let project_id = message.payload.project_id;
 706        let file_id = message.payload.file_id;
 707        log::debug!(
 708            "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
 709            worktree_id,
 710            path,
 711            file_id
 712        );
 713        use proto::create_file_for_peer::Variant;
 714
 715        let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
 716            .read_with(&cx, |this, _| {
 717                (this.worktree_store.clone(), this.session.clone())
 718            });
 719
 720        let worktree = worktree_store
 721            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 722            .context("worktree not found")?;
 723
 724        let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
 725            worktree.load_binary_file(path.as_ref(), cx)
 726        });
 727
 728        let downloaded_file = download_task.await?;
 729        let content = downloaded_file.content;
 730        let file = downloaded_file.file;
 731        log::debug!(
 732            "handle_download_file_by_path: file loaded, content_size={}",
 733            content.len()
 734        );
 735
 736        let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
 737        log::debug!(
 738            "handle_download_file_by_path: using client-provided file_id={}",
 739            file_id
 740        );
 741
 742        let state = proto::FileState {
 743            id: file_id,
 744            file: Some(proto_file),
 745            content_size: content.len() as u64,
 746        };
 747
 748        log::debug!("handle_download_file_by_path: sending State message");
 749        session.send(proto::CreateFileForPeer {
 750            project_id,
 751            peer_id: Some(REMOTE_SERVER_PEER_ID),
 752            variant: Some(Variant::State(state)),
 753        })?;
 754
 755        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 756        let num_chunks = content.len().div_ceil(CHUNK_SIZE);
 757        log::debug!(
 758            "handle_download_file_by_path: sending {} chunks",
 759            num_chunks
 760        );
 761        for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
 762            log::trace!(
 763                "handle_download_file_by_path: sending chunk {}/{}, size={}",
 764                i + 1,
 765                num_chunks,
 766                chunk.len()
 767            );
 768            session.send(proto::CreateFileForPeer {
 769                project_id,
 770                peer_id: Some(REMOTE_SERVER_PEER_ID),
 771                variant: Some(Variant::Chunk(proto::FileChunk {
 772                    file_id,
 773                    data: chunk.to_vec(),
 774                })),
 775            })?;
 776        }
 777
 778        log::debug!(
 779            "handle_download_file_by_path: returning file_id={}",
 780            file_id
 781        );
 782        Ok(proto::DownloadFileResponse { file_id })
 783    }
 784
 785    pub async fn handle_open_new_buffer(
 786        this: Entity<Self>,
 787        _message: TypedEnvelope<proto::OpenNewBuffer>,
 788        mut cx: AsyncApp,
 789    ) -> Result<proto::OpenBufferResponse> {
 790        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 791            let buffer_store = this.buffer_store.clone();
 792            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 793                buffer_store.create_buffer(None, true, cx)
 794            });
 795            (buffer_store, buffer)
 796        });
 797
 798        let buffer = buffer.await?;
 799        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 800        buffer_store.update(&mut cx, |buffer_store, cx| {
 801            buffer_store
 802                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 803                .detach_and_log_err(cx);
 804        });
 805
 806        Ok(proto::OpenBufferResponse {
 807            buffer_id: buffer_id.to_proto(),
 808        })
 809    }
 810
 811    async fn handle_toggle_lsp_logs(
 812        _: Entity<Self>,
 813        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 814        cx: AsyncApp,
 815    ) -> Result<()> {
 816        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 817        cx.update(|cx| {
 818            let log_store = cx
 819                .try_global::<GlobalLogStore>()
 820                .map(|global_log_store| global_log_store.0.clone())
 821                .context("lsp logs store is missing")?;
 822            let toggled_log_kind =
 823                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 824                    .context("invalid log type")?
 825                {
 826                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 827                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 828                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 829                };
 830            log_store.update(cx, |log_store, _| {
 831                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 832            });
 833            anyhow::Ok(())
 834        })?;
 835
 836        Ok(())
 837    }
 838
 839    async fn handle_open_server_settings(
 840        this: Entity<Self>,
 841        _: TypedEnvelope<proto::OpenServerSettings>,
 842        mut cx: AsyncApp,
 843    ) -> Result<proto::OpenBufferResponse> {
 844        let settings_path = paths::settings_file();
 845        let (worktree, path) = this
 846            .update(&mut cx, |this, cx| {
 847                this.worktree_store.update(cx, |worktree_store, cx| {
 848                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 849                })
 850            })
 851            .await?;
 852
 853        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 854            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 855                buffer_store.open_buffer(
 856                    ProjectPath {
 857                        worktree_id: worktree.read(cx).id(),
 858                        path,
 859                    },
 860                    cx,
 861                )
 862            });
 863
 864            (buffer, this.buffer_store.clone())
 865        });
 866
 867        let buffer = buffer.await?;
 868
 869        let buffer_id = cx.update(|cx| {
 870            if buffer.read(cx).is_empty() {
 871                buffer.update(cx, |buffer, cx| {
 872                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 873                });
 874            }
 875
 876            let buffer_id = buffer.read(cx).remote_id();
 877
 878            buffer_store.update(cx, |buffer_store, cx| {
 879                buffer_store
 880                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 881                    .detach_and_log_err(cx);
 882            });
 883
 884            buffer_id
 885        });
 886
 887        Ok(proto::OpenBufferResponse {
 888            buffer_id: buffer_id.to_proto(),
 889        })
 890    }
 891
 892    async fn handle_find_search_candidates(
 893        this: Entity<Self>,
 894        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 895        mut cx: AsyncApp,
 896    ) -> Result<proto::Ack> {
 897        use futures::stream::StreamExt as _;
 898
 899        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 900        let message = envelope.payload;
 901        let query = SearchQuery::from_proto(
 902            message.query.context("missing query field")?,
 903            PathStyle::local(),
 904        )?;
 905
 906        let project_id = message.project_id;
 907        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 908        let handle = message.handle;
 909        let _buffer_store = buffer_store.clone();
 910        let client = this.read_with(&cx, |this, _| this.session.clone());
 911        let task = cx.spawn(async move |cx| {
 912            let results = this.update(cx, |this, cx| {
 913                project::Search::local(
 914                    this.fs.clone(),
 915                    this.buffer_store.clone(),
 916                    this.worktree_store.clone(),
 917                    message.limit as _,
 918                    cx,
 919                )
 920                .into_handle(query, cx)
 921                .matching_buffers(cx)
 922            });
 923            let (batcher, batches) =
 924                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 925            let mut new_matches = Box::pin(results.rx);
 926
 927            let sender_task = cx.background_executor().spawn({
 928                let client = client.clone();
 929                async move {
 930                    let mut batches = std::pin::pin!(batches);
 931                    while let Some(buffer_ids) = batches.next().await {
 932                        client
 933                            .request(proto::FindSearchCandidatesChunk {
 934                                handle,
 935                                peer_id: Some(peer_id),
 936                                project_id,
 937                                variant: Some(
 938                                    proto::find_search_candidates_chunk::Variant::Matches(
 939                                        proto::FindSearchCandidatesMatches { buffer_ids },
 940                                    ),
 941                                ),
 942                            })
 943                            .await?;
 944                    }
 945                    anyhow::Ok(())
 946                }
 947            });
 948
 949            while let Some(buffer) = new_matches.next().await {
 950                let _ = buffer_store
 951                    .update(cx, |this, cx| {
 952                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 953                    })
 954                    .await;
 955                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 956                batcher.push(buffer_id).await;
 957            }
 958            batcher.flush().await;
 959
 960            sender_task.await?;
 961
 962            client
 963                .request(proto::FindSearchCandidatesChunk {
 964                    handle,
 965                    peer_id: Some(peer_id),
 966                    project_id,
 967                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 968                        proto::FindSearchCandidatesDone {},
 969                    )),
 970                })
 971                .await?;
 972            anyhow::Ok(())
 973        });
 974        _buffer_store.update(&mut cx, |this, _| {
 975            this.register_ongoing_project_search((peer_id, handle), task);
 976        });
 977
 978        Ok(proto::Ack {})
 979    }
 980
 981    // Goes from client to host.
 982    async fn handle_find_search_candidates_cancel(
 983        this: Entity<Self>,
 984        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 985        mut cx: AsyncApp,
 986    ) -> Result<()> {
 987        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 988        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 989    }
 990
 991    async fn handle_list_remote_directory(
 992        this: Entity<Self>,
 993        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
 994        cx: AsyncApp,
 995    ) -> Result<proto::ListRemoteDirectoryResponse> {
 996        use smol::stream::StreamExt;
 997        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 998        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 999        let check_info = envelope
1000            .payload
1001            .config
1002            .as_ref()
1003            .is_some_and(|config| config.is_dir);
1004
1005        let mut entries = Vec::new();
1006        let mut entry_info = Vec::new();
1007        let mut response = fs.read_dir(&expanded).await?;
1008        while let Some(path) = response.next().await {
1009            let path = path?;
1010            if let Some(file_name) = path.file_name() {
1011                entries.push(file_name.to_string_lossy().into_owned());
1012                if check_info {
1013                    let is_dir = fs.is_dir(&path).await;
1014                    entry_info.push(proto::EntryInfo { is_dir });
1015                }
1016            }
1017        }
1018        Ok(proto::ListRemoteDirectoryResponse {
1019            entries,
1020            entry_info,
1021        })
1022    }
1023
1024    async fn handle_get_path_metadata(
1025        this: Entity<Self>,
1026        envelope: TypedEnvelope<proto::GetPathMetadata>,
1027        cx: AsyncApp,
1028    ) -> Result<proto::GetPathMetadataResponse> {
1029        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1030        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1031
1032        let metadata = fs.metadata(&expanded).await?;
1033        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1034
1035        Ok(proto::GetPathMetadataResponse {
1036            exists: metadata.is_some(),
1037            is_dir,
1038            path: expanded.to_string_lossy().into_owned(),
1039        })
1040    }
1041
1042    async fn handle_shutdown_remote_server(
1043        _this: Entity<Self>,
1044        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1045        cx: AsyncApp,
1046    ) -> Result<proto::Ack> {
1047        cx.spawn(async move |cx| {
1048            cx.update(|cx| {
1049                // TODO: This is a hack, because in a headless project, shutdown isn't executed
1050                // when calling quit, but it should be.
1051                cx.shutdown();
1052                cx.quit();
1053            })
1054        })
1055        .detach();
1056
1057        Ok(proto::Ack {})
1058    }
1059
1060    pub async fn handle_ping(
1061        _this: Entity<Self>,
1062        _envelope: TypedEnvelope<proto::Ping>,
1063        _cx: AsyncApp,
1064    ) -> Result<proto::Ack> {
1065        log::debug!("Received ping from client");
1066        Ok(proto::Ack {})
1067    }
1068
1069    async fn handle_get_processes(
1070        _this: Entity<Self>,
1071        _envelope: TypedEnvelope<proto::GetProcesses>,
1072        _cx: AsyncApp,
1073    ) -> Result<proto::GetProcessesResponse> {
1074        let mut processes = Vec::new();
1075        let refresh_kind = RefreshKind::nothing().with_processes(
1076            ProcessRefreshKind::nothing()
1077                .without_tasks()
1078                .with_cmd(UpdateKind::Always),
1079        );
1080
1081        for process in System::new_with_specifics(refresh_kind)
1082            .processes()
1083            .values()
1084        {
1085            let name = process.name().to_string_lossy().into_owned();
1086            let command = process
1087                .cmd()
1088                .iter()
1089                .map(|s| s.to_string_lossy().into_owned())
1090                .collect::<Vec<_>>();
1091
1092            processes.push(proto::ProcessInfo {
1093                pid: process.pid().as_u32(),
1094                name,
1095                command,
1096            });
1097        }
1098
1099        processes.sort_by_key(|p| p.name.clone());
1100
1101        Ok(proto::GetProcessesResponse { processes })
1102    }
1103
1104    async fn handle_get_directory_environment(
1105        this: Entity<Self>,
1106        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1107        mut cx: AsyncApp,
1108    ) -> Result<proto::DirectoryEnvironment> {
1109        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1110        let directory = PathBuf::from(envelope.payload.directory);
1111        let environment = this
1112            .update(&mut cx, |this, cx| {
1113                this.environment.update(cx, |environment, cx| {
1114                    environment.local_directory_environment(&shell, directory.into(), cx)
1115                })
1116            })
1117            .await
1118            .context("failed to get directory environment")?
1119            .into_iter()
1120            .collect();
1121        Ok(proto::DirectoryEnvironment { environment })
1122    }
1123}
1124
1125fn prompt_to_proto(
1126    prompt: &project::LanguageServerPromptRequest,
1127) -> proto::language_server_prompt_request::Level {
1128    match prompt.level {
1129        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1130            proto::language_server_prompt_request::Info {},
1131        ),
1132        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1133            proto::language_server_prompt_request::Warning {},
1134        ),
1135        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1136            proto::language_server_prompt_request::Critical {},
1137        ),
1138    }
1139}