headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    AgentRegistryStore, LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment,
  16    ProjectPath, ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    context_server_store::ContextServerStore,
  20    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  21    git_store::GitStore,
  22    image_store::ImageId,
  23    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  24    project_settings::SettingsObserver,
  25    search::SearchQuery,
  26    task_store::TaskStore,
  27    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  28    worktree_store::{WorktreeIdCounter, WorktreeStore},
  29};
  30use rpc::{
  31    AnyProtoClient, TypedEnvelope,
  32    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  33};
  34
  35use settings::initial_server_settings_content;
  36use std::{
  37    num::NonZeroU64,
  38    path::{Path, PathBuf},
  39    sync::{
  40        Arc,
  41        atomic::{AtomicU64, AtomicUsize, Ordering},
  42    },
  43    time::Instant,
  44};
  45use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  46use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  47use worktree::Worktree;
  48
  49pub struct HeadlessProject {
  50    pub fs: Arc<dyn Fs>,
  51    pub session: AnyProtoClient,
  52    pub worktree_store: Entity<WorktreeStore>,
  53    pub buffer_store: Entity<BufferStore>,
  54    pub lsp_store: Entity<LspStore>,
  55    pub task_store: Entity<TaskStore>,
  56    pub dap_store: Entity<DapStore>,
  57    pub breakpoint_store: Entity<BreakpointStore>,
  58    pub agent_server_store: Entity<AgentServerStore>,
  59    pub context_server_store: Entity<ContextServerStore>,
  60    pub settings_observer: Entity<SettingsObserver>,
  61    pub next_entry_id: Arc<AtomicUsize>,
  62    pub languages: Arc<LanguageRegistry>,
  63    pub extensions: Entity<HeadlessExtensionStore>,
  64    pub git_store: Entity<GitStore>,
  65    pub environment: Entity<ProjectEnvironment>,
  66    pub profiling_collector: gpui::ProfilingCollector,
  67    // Used mostly to keep alive the toolchain store for RPC handlers.
  68    // Local variant is used within LSP store, but that's a separate entity.
  69    pub _toolchain_store: Entity<ToolchainStore>,
  70}
  71
  72pub struct HeadlessAppState {
  73    pub session: AnyProtoClient,
  74    pub fs: Arc<dyn Fs>,
  75    pub http_client: Arc<dyn HttpClient>,
  76    pub node_runtime: NodeRuntime,
  77    pub languages: Arc<LanguageRegistry>,
  78    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  79    pub startup_time: Instant,
  80}
  81
  82impl HeadlessProject {
  83    pub fn init(cx: &mut App) {
  84        settings::init(cx);
  85        log_store::init(true, cx);
  86    }
  87
  88    pub fn new(
  89        HeadlessAppState {
  90            session,
  91            fs,
  92            http_client,
  93            node_runtime,
  94            languages,
  95            extension_host_proxy: proxy,
  96            startup_time,
  97        }: HeadlessAppState,
  98        init_worktree_trust: bool,
  99        cx: &mut Context<Self>,
 100    ) -> Self {
 101        debug_adapter_extension::init(proxy.clone(), cx);
 102        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
 103
 104        let worktree_store = cx.new(|cx| {
 105            let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
 106            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 107            store
 108        });
 109
 110        if init_worktree_trust {
 111            project::trusted_worktrees::track_worktree_trust(
 112                worktree_store.clone(),
 113                None::<RemoteHostLocation>,
 114                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 115                None,
 116                cx,
 117            );
 118        }
 119
 120        let environment =
 121            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 122        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 123        let toolchain_store = cx.new(|cx| {
 124            ToolchainStore::local(
 125                languages.clone(),
 126                worktree_store.clone(),
 127                environment.clone(),
 128                manifest_tree.clone(),
 129                fs.clone(),
 130                cx,
 131            )
 132        });
 133
 134        let buffer_store = cx.new(|cx| {
 135            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 136            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 137            buffer_store
 138        });
 139
 140        let breakpoint_store = cx.new(|_| {
 141            let mut breakpoint_store =
 142                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 143            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 144
 145            breakpoint_store
 146        });
 147
 148        let dap_store = cx.new(|cx| {
 149            let mut dap_store = DapStore::new_local(
 150                http_client.clone(),
 151                node_runtime.clone(),
 152                fs.clone(),
 153                environment.clone(),
 154                toolchain_store.read(cx).as_language_toolchain_store(),
 155                worktree_store.clone(),
 156                breakpoint_store.clone(),
 157                true,
 158                cx,
 159            );
 160            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 161            dap_store
 162        });
 163
 164        let git_store = cx.new(|cx| {
 165            let mut store = GitStore::local(
 166                &worktree_store,
 167                buffer_store.clone(),
 168                environment.clone(),
 169                fs.clone(),
 170                cx,
 171            );
 172            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 173            store
 174        });
 175
 176        let prettier_store = cx.new(|cx| {
 177            PrettierStore::new(
 178                node_runtime.clone(),
 179                fs.clone(),
 180                languages.clone(),
 181                worktree_store.clone(),
 182                cx,
 183            )
 184        });
 185
 186        let task_store = cx.new(|cx| {
 187            let mut task_store = TaskStore::local(
 188                buffer_store.downgrade(),
 189                worktree_store.clone(),
 190                toolchain_store.read(cx).as_language_toolchain_store(),
 191                environment.clone(),
 192                cx,
 193            );
 194            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 195            task_store
 196        });
 197        let settings_observer = cx.new(|cx| {
 198            let mut observer = SettingsObserver::new_local(
 199                fs.clone(),
 200                worktree_store.clone(),
 201                task_store.clone(),
 202                true,
 203                cx,
 204            );
 205            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 206            observer
 207        });
 208
 209        let lsp_store = cx.new(|cx| {
 210            let mut lsp_store = LspStore::new_local(
 211                buffer_store.clone(),
 212                worktree_store.clone(),
 213                prettier_store.clone(),
 214                toolchain_store
 215                    .read(cx)
 216                    .as_local_store()
 217                    .expect("Toolchain store to be local")
 218                    .clone(),
 219                environment.clone(),
 220                manifest_tree,
 221                languages.clone(),
 222                http_client.clone(),
 223                fs.clone(),
 224                cx,
 225            );
 226            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 227            lsp_store
 228        });
 229
 230        AgentRegistryStore::init_global(cx, fs.clone(), http_client.clone());
 231
 232        let agent_server_store = cx.new(|cx| {
 233            let mut agent_server_store = AgentServerStore::local(
 234                node_runtime.clone(),
 235                fs.clone(),
 236                environment.clone(),
 237                http_client.clone(),
 238                cx,
 239            );
 240            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 241            agent_server_store
 242        });
 243
 244        let context_server_store = cx.new(|cx| {
 245            let mut context_server_store =
 246                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 247            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 248            context_server_store
 249        });
 250
 251        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 252        language_extension::init(
 253            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 254            proxy.clone(),
 255            languages.clone(),
 256        );
 257
 258        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 259            if let BufferStoreEvent::BufferAdded(buffer) = event {
 260                cx.subscribe(buffer, Self::on_buffer_event).detach();
 261            }
 262        })
 263        .detach();
 264
 265        let extensions = HeadlessExtensionStore::new(
 266            fs.clone(),
 267            http_client.clone(),
 268            paths::remote_extensions_dir().to_path_buf(),
 269            proxy,
 270            node_runtime,
 271            cx,
 272        );
 273
 274        // local_machine -> ssh handlers
 275        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 276        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 280        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 281        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 282        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 283        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 284        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 285        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 286        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 287
 288        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 289        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 290        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 291        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 292        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 293        session.add_request_handler(cx.weak_entity(), Self::handle_get_remote_profiling_data);
 294
 295        session.add_entity_request_handler(Self::handle_add_worktree);
 296        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 297
 298        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 299        session.add_entity_request_handler(Self::handle_open_new_buffer);
 300        session.add_entity_request_handler(Self::handle_find_search_candidates);
 301        session.add_entity_request_handler(Self::handle_open_server_settings);
 302        session.add_entity_request_handler(Self::handle_get_directory_environment);
 303        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 304        session.add_entity_request_handler(Self::handle_open_image_by_path);
 305        session.add_entity_request_handler(Self::handle_trust_worktrees);
 306        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 307        session.add_entity_request_handler(Self::handle_download_file_by_path);
 308
 309        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 310        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 311        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 312
 313        session.add_request_handler(
 314            extensions.downgrade(),
 315            HeadlessExtensionStore::handle_sync_extensions,
 316        );
 317        session.add_request_handler(
 318            extensions.downgrade(),
 319            HeadlessExtensionStore::handle_install_extension,
 320        );
 321
 322        BufferStore::init(&session);
 323        WorktreeStore::init(&session);
 324        SettingsObserver::init(&session);
 325        LspStore::init(&session);
 326        TaskStore::init(Some(&session));
 327        ToolchainStore::init(&session);
 328        DapStore::init(&session, cx);
 329        // todo(debugger): Re init breakpoint store when we set it up for collab
 330        BreakpointStore::init(&session);
 331        GitStore::init(&session);
 332        AgentServerStore::init_headless(&session);
 333        ContextServerStore::init_headless(&session);
 334
 335        HeadlessProject {
 336            next_entry_id: Default::default(),
 337            session,
 338            settings_observer,
 339            fs,
 340            worktree_store,
 341            buffer_store,
 342            lsp_store,
 343            task_store,
 344            dap_store,
 345            breakpoint_store,
 346            agent_server_store,
 347            context_server_store,
 348            languages,
 349            extensions,
 350            git_store,
 351            environment,
 352            profiling_collector: gpui::ProfilingCollector::new(startup_time),
 353            _toolchain_store: toolchain_store,
 354        }
 355    }
 356
 357    fn on_buffer_event(
 358        &mut self,
 359        buffer: Entity<Buffer>,
 360        event: &BufferEvent,
 361        cx: &mut Context<Self>,
 362    ) {
 363        if let BufferEvent::Operation {
 364            operation,
 365            is_local: true,
 366        } = event
 367        {
 368            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 369                project_id: REMOTE_SERVER_PROJECT_ID,
 370                buffer_id: buffer.read(cx).remote_id().to_proto(),
 371                operations: vec![serialize_operation(operation)],
 372            }))
 373            .detach()
 374        }
 375    }
 376
 377    fn on_lsp_store_event(
 378        &mut self,
 379        lsp_store: Entity<LspStore>,
 380        event: &LspStoreEvent,
 381        cx: &mut Context<Self>,
 382    ) {
 383        match event {
 384            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 385                let log_store = cx
 386                    .try_global::<GlobalLogStore>()
 387                    .map(|lsp_logs| lsp_logs.0.clone());
 388                if let Some(log_store) = log_store {
 389                    log_store.update(cx, |log_store, cx| {
 390                        log_store.add_language_server(
 391                            LanguageServerKind::LocalSsh {
 392                                lsp_store: self.lsp_store.downgrade(),
 393                            },
 394                            *id,
 395                            Some(name.clone()),
 396                            *worktree_id,
 397                            lsp_store.read(cx).language_server_for_id(*id),
 398                            cx,
 399                        );
 400                    });
 401                }
 402            }
 403            LspStoreEvent::LanguageServerRemoved(id) => {
 404                let log_store = cx
 405                    .try_global::<GlobalLogStore>()
 406                    .map(|lsp_logs| lsp_logs.0.clone());
 407                if let Some(log_store) = log_store {
 408                    log_store.update(cx, |log_store, cx| {
 409                        log_store.remove_language_server(*id, cx);
 410                    });
 411                }
 412            }
 413            LspStoreEvent::LanguageServerUpdate {
 414                language_server_id,
 415                name,
 416                message,
 417            } => {
 418                self.session
 419                    .send(proto::UpdateLanguageServer {
 420                        project_id: REMOTE_SERVER_PROJECT_ID,
 421                        server_name: name.as_ref().map(|name| name.to_string()),
 422                        language_server_id: language_server_id.to_proto(),
 423                        variant: Some(message.clone()),
 424                    })
 425                    .log_err();
 426            }
 427            LspStoreEvent::Notification(message) => {
 428                self.session
 429                    .send(proto::Toast {
 430                        project_id: REMOTE_SERVER_PROJECT_ID,
 431                        notification_id: "lsp".to_string(),
 432                        message: message.clone(),
 433                    })
 434                    .log_err();
 435            }
 436            LspStoreEvent::LanguageServerPrompt(prompt) => {
 437                let request = self.session.request(proto::LanguageServerPromptRequest {
 438                    project_id: REMOTE_SERVER_PROJECT_ID,
 439                    actions: prompt
 440                        .actions
 441                        .iter()
 442                        .map(|action| action.title.to_string())
 443                        .collect(),
 444                    level: Some(prompt_to_proto(prompt)),
 445                    lsp_name: prompt.lsp_name.clone(),
 446                    message: prompt.message.clone(),
 447                });
 448                let prompt = prompt.clone();
 449                cx.background_spawn(async move {
 450                    let response = request.await?;
 451                    if let Some(action_response) = response.action_response {
 452                        prompt.respond(action_response as usize).await;
 453                    }
 454                    anyhow::Ok(())
 455                })
 456                .detach();
 457            }
 458            _ => {}
 459        }
 460    }
 461
 462    pub async fn handle_add_worktree(
 463        this: Entity<Self>,
 464        message: TypedEnvelope<proto::AddWorktree>,
 465        mut cx: AsyncApp,
 466    ) -> Result<proto::AddWorktreeResponse> {
 467        use client::ErrorCodeExt;
 468        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 469        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 470
 471        let canonicalized = match fs.canonicalize(&path).await {
 472            Ok(path) => path,
 473            Err(e) => {
 474                let mut parent = path
 475                    .parent()
 476                    .ok_or(e)
 477                    .with_context(|| format!("{path:?} does not exist"))?;
 478                if parent == Path::new("") {
 479                    parent = util::paths::home_dir();
 480                }
 481                let parent = fs.canonicalize(parent).await.map_err(|_| {
 482                    anyhow!(
 483                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 484                            .with_tag("path", path.to_string_lossy().as_ref())
 485                    )
 486                })?;
 487                if let Some(file_name) = path.file_name() {
 488                    parent.join(file_name)
 489                } else {
 490                    parent
 491                }
 492            }
 493        };
 494        let next_worktree_id = this
 495            .update(&mut cx, |this, cx| {
 496                this.worktree_store
 497                    .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
 498            })
 499            .await?;
 500        let worktree = this
 501            .read_with(&cx.clone(), |this, _| {
 502                Worktree::local(
 503                    Arc::from(canonicalized.as_path()),
 504                    message.payload.visible,
 505                    this.fs.clone(),
 506                    this.next_entry_id.clone(),
 507                    true,
 508                    next_worktree_id,
 509                    &mut cx,
 510                )
 511            })
 512            .await?;
 513
 514        let response = this.read_with(&cx, |_, cx| {
 515            let worktree = worktree.read(cx);
 516            proto::AddWorktreeResponse {
 517                worktree_id: worktree.id().to_proto(),
 518                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 519            }
 520        });
 521
 522        // We spawn this asynchronously, so that we can send the response back
 523        // *before* `worktree_store.add()` can send out UpdateProject requests
 524        // to the client about the new worktree.
 525        //
 526        // That lets the client manage the reference/handles of the newly-added
 527        // worktree, before getting interrupted by an UpdateProject request.
 528        //
 529        // This fixes the problem of the client sending the AddWorktree request,
 530        // headless project sending out a project update, client receiving it
 531        // and immediately dropping the reference of the new client, causing it
 532        // to be dropped on the headless project, and the client only then
 533        // receiving a response to AddWorktree.
 534        cx.spawn(async move |cx| {
 535            this.update(cx, |this, cx| {
 536                this.worktree_store.update(cx, |worktree_store, cx| {
 537                    worktree_store.add(&worktree, cx);
 538                });
 539            });
 540        })
 541        .detach();
 542
 543        Ok(response)
 544    }
 545
 546    pub async fn handle_remove_worktree(
 547        this: Entity<Self>,
 548        envelope: TypedEnvelope<proto::RemoveWorktree>,
 549        mut cx: AsyncApp,
 550    ) -> Result<proto::Ack> {
 551        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 552        this.update(&mut cx, |this, cx| {
 553            this.worktree_store.update(cx, |worktree_store, cx| {
 554                worktree_store.remove_worktree(worktree_id, cx);
 555            });
 556        });
 557        Ok(proto::Ack {})
 558    }
 559
 560    pub async fn handle_open_buffer_by_path(
 561        this: Entity<Self>,
 562        message: TypedEnvelope<proto::OpenBufferByPath>,
 563        mut cx: AsyncApp,
 564    ) -> Result<proto::OpenBufferResponse> {
 565        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 566        let path = RelPath::from_proto(&message.payload.path)?;
 567        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 568            let buffer_store = this.buffer_store.clone();
 569            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 570                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 571            });
 572            (buffer_store, buffer)
 573        });
 574
 575        let buffer = buffer.await?;
 576        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 577        buffer_store.update(&mut cx, |buffer_store, cx| {
 578            buffer_store
 579                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 580                .detach_and_log_err(cx);
 581        });
 582
 583        Ok(proto::OpenBufferResponse {
 584            buffer_id: buffer_id.to_proto(),
 585        })
 586    }
 587
 588    pub async fn handle_open_image_by_path(
 589        this: Entity<Self>,
 590        message: TypedEnvelope<proto::OpenImageByPath>,
 591        mut cx: AsyncApp,
 592    ) -> Result<proto::OpenImageResponse> {
 593        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 594        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 595        let path = RelPath::from_proto(&message.payload.path)?;
 596        let project_id = message.payload.project_id;
 597        use proto::create_image_for_peer::Variant;
 598
 599        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 600            (this.worktree_store.clone(), this.session.clone())
 601        });
 602
 603        let worktree = worktree_store
 604            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 605            .context("worktree not found")?;
 606
 607        let load_task = worktree.update(&mut cx, |worktree, cx| {
 608            worktree.load_binary_file(path.as_ref(), cx)
 609        });
 610
 611        let loaded_file = load_task.await?;
 612        let content = loaded_file.content;
 613        let file = loaded_file.file;
 614
 615        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 616        let image_id =
 617            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 618
 619        let format = image::guess_format(&content)
 620            .map(|f| format!("{:?}", f).to_lowercase())
 621            .unwrap_or_else(|_| "unknown".to_string());
 622
 623        let state = proto::ImageState {
 624            id: image_id.to_proto(),
 625            file: Some(proto_file),
 626            content_size: content.len() as u64,
 627            format,
 628        };
 629
 630        session.send(proto::CreateImageForPeer {
 631            project_id,
 632            peer_id: Some(REMOTE_SERVER_PEER_ID),
 633            variant: Some(Variant::State(state)),
 634        })?;
 635
 636        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 637        for chunk in content.chunks(CHUNK_SIZE) {
 638            session.send(proto::CreateImageForPeer {
 639                project_id,
 640                peer_id: Some(REMOTE_SERVER_PEER_ID),
 641                variant: Some(Variant::Chunk(proto::ImageChunk {
 642                    image_id: image_id.to_proto(),
 643                    data: chunk.to_vec(),
 644                })),
 645            })?;
 646        }
 647
 648        Ok(proto::OpenImageResponse {
 649            image_id: image_id.to_proto(),
 650        })
 651    }
 652
 653    pub async fn handle_trust_worktrees(
 654        this: Entity<Self>,
 655        envelope: TypedEnvelope<proto::TrustWorktrees>,
 656        mut cx: AsyncApp,
 657    ) -> Result<proto::Ack> {
 658        let trusted_worktrees = cx
 659            .update(|cx| TrustedWorktrees::try_get_global(cx))
 660            .context("missing trusted worktrees")?;
 661        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 662        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 663            trusted_worktrees.trust(
 664                &worktree_store,
 665                envelope
 666                    .payload
 667                    .trusted_paths
 668                    .into_iter()
 669                    .filter_map(PathTrust::from_proto)
 670                    .collect(),
 671                cx,
 672            );
 673        });
 674        Ok(proto::Ack {})
 675    }
 676
 677    pub async fn handle_restrict_worktrees(
 678        this: Entity<Self>,
 679        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 680        mut cx: AsyncApp,
 681    ) -> Result<proto::Ack> {
 682        let trusted_worktrees = cx
 683            .update(|cx| TrustedWorktrees::try_get_global(cx))
 684            .context("missing trusted worktrees")?;
 685        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 686        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 687            let restricted_paths = envelope
 688                .payload
 689                .worktree_ids
 690                .into_iter()
 691                .map(WorktreeId::from_proto)
 692                .map(PathTrust::Worktree)
 693                .collect::<HashSet<_>>();
 694            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 695        });
 696        Ok(proto::Ack {})
 697    }
 698
 699    pub async fn handle_download_file_by_path(
 700        this: Entity<Self>,
 701        message: TypedEnvelope<proto::DownloadFileByPath>,
 702        mut cx: AsyncApp,
 703    ) -> Result<proto::DownloadFileResponse> {
 704        log::debug!(
 705            "handle_download_file_by_path: received request: {:?}",
 706            message.payload
 707        );
 708
 709        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 710        let path = RelPath::from_proto(&message.payload.path)?;
 711        let project_id = message.payload.project_id;
 712        let file_id = message.payload.file_id;
 713        log::debug!(
 714            "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
 715            worktree_id,
 716            path,
 717            file_id
 718        );
 719        use proto::create_file_for_peer::Variant;
 720
 721        let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
 722            .read_with(&cx, |this, _| {
 723                (this.worktree_store.clone(), this.session.clone())
 724            });
 725
 726        let worktree = worktree_store
 727            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 728            .context("worktree not found")?;
 729
 730        let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
 731            worktree.load_binary_file(path.as_ref(), cx)
 732        });
 733
 734        let downloaded_file = download_task.await?;
 735        let content = downloaded_file.content;
 736        let file = downloaded_file.file;
 737        log::debug!(
 738            "handle_download_file_by_path: file loaded, content_size={}",
 739            content.len()
 740        );
 741
 742        let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
 743        log::debug!(
 744            "handle_download_file_by_path: using client-provided file_id={}",
 745            file_id
 746        );
 747
 748        let state = proto::FileState {
 749            id: file_id,
 750            file: Some(proto_file),
 751            content_size: content.len() as u64,
 752        };
 753
 754        log::debug!("handle_download_file_by_path: sending State message");
 755        session.send(proto::CreateFileForPeer {
 756            project_id,
 757            peer_id: Some(REMOTE_SERVER_PEER_ID),
 758            variant: Some(Variant::State(state)),
 759        })?;
 760
 761        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 762        let num_chunks = content.len().div_ceil(CHUNK_SIZE);
 763        log::debug!(
 764            "handle_download_file_by_path: sending {} chunks",
 765            num_chunks
 766        );
 767        for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
 768            log::trace!(
 769                "handle_download_file_by_path: sending chunk {}/{}, size={}",
 770                i + 1,
 771                num_chunks,
 772                chunk.len()
 773            );
 774            session.send(proto::CreateFileForPeer {
 775                project_id,
 776                peer_id: Some(REMOTE_SERVER_PEER_ID),
 777                variant: Some(Variant::Chunk(proto::FileChunk {
 778                    file_id,
 779                    data: chunk.to_vec(),
 780                })),
 781            })?;
 782        }
 783
 784        log::debug!(
 785            "handle_download_file_by_path: returning file_id={}",
 786            file_id
 787        );
 788        Ok(proto::DownloadFileResponse { file_id })
 789    }
 790
 791    pub async fn handle_open_new_buffer(
 792        this: Entity<Self>,
 793        _message: TypedEnvelope<proto::OpenNewBuffer>,
 794        mut cx: AsyncApp,
 795    ) -> Result<proto::OpenBufferResponse> {
 796        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 797            let buffer_store = this.buffer_store.clone();
 798            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 799                buffer_store.create_buffer(None, true, cx)
 800            });
 801            (buffer_store, buffer)
 802        });
 803
 804        let buffer = buffer.await?;
 805        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 806        buffer_store.update(&mut cx, |buffer_store, cx| {
 807            buffer_store
 808                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 809                .detach_and_log_err(cx);
 810        });
 811
 812        Ok(proto::OpenBufferResponse {
 813            buffer_id: buffer_id.to_proto(),
 814        })
 815    }
 816
 817    async fn handle_toggle_lsp_logs(
 818        _: Entity<Self>,
 819        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 820        cx: AsyncApp,
 821    ) -> Result<()> {
 822        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 823        cx.update(|cx| {
 824            let log_store = cx
 825                .try_global::<GlobalLogStore>()
 826                .map(|global_log_store| global_log_store.0.clone())
 827                .context("lsp logs store is missing")?;
 828            let toggled_log_kind =
 829                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 830                    .context("invalid log type")?
 831                {
 832                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 833                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 834                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 835                };
 836            log_store.update(cx, |log_store, _| {
 837                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 838            });
 839            anyhow::Ok(())
 840        })?;
 841
 842        Ok(())
 843    }
 844
 845    async fn handle_open_server_settings(
 846        this: Entity<Self>,
 847        _: TypedEnvelope<proto::OpenServerSettings>,
 848        mut cx: AsyncApp,
 849    ) -> Result<proto::OpenBufferResponse> {
 850        let settings_path = paths::settings_file();
 851        let (worktree, path) = this
 852            .update(&mut cx, |this, cx| {
 853                this.worktree_store.update(cx, |worktree_store, cx| {
 854                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 855                })
 856            })
 857            .await?;
 858
 859        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 860            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 861                buffer_store.open_buffer(
 862                    ProjectPath {
 863                        worktree_id: worktree.read(cx).id(),
 864                        path,
 865                    },
 866                    cx,
 867                )
 868            });
 869
 870            (buffer, this.buffer_store.clone())
 871        });
 872
 873        let buffer = buffer.await?;
 874
 875        let buffer_id = cx.update(|cx| {
 876            if buffer.read(cx).is_empty() {
 877                buffer.update(cx, |buffer, cx| {
 878                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 879                });
 880            }
 881
 882            let buffer_id = buffer.read(cx).remote_id();
 883
 884            buffer_store.update(cx, |buffer_store, cx| {
 885                buffer_store
 886                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 887                    .detach_and_log_err(cx);
 888            });
 889
 890            buffer_id
 891        });
 892
 893        Ok(proto::OpenBufferResponse {
 894            buffer_id: buffer_id.to_proto(),
 895        })
 896    }
 897
 898    async fn handle_find_search_candidates(
 899        this: Entity<Self>,
 900        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 901        mut cx: AsyncApp,
 902    ) -> Result<proto::Ack> {
 903        use futures::stream::StreamExt as _;
 904
 905        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 906        let message = envelope.payload;
 907        let query = SearchQuery::from_proto(
 908            message.query.context("missing query field")?,
 909            PathStyle::local(),
 910        )?;
 911
 912        let project_id = message.project_id;
 913        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 914        let handle = message.handle;
 915        let _buffer_store = buffer_store.clone();
 916        let client = this.read_with(&cx, |this, _| this.session.clone());
 917        let task = cx.spawn(async move |cx| {
 918            let results = this.update(cx, |this, cx| {
 919                project::Search::local(
 920                    this.fs.clone(),
 921                    this.buffer_store.clone(),
 922                    this.worktree_store.clone(),
 923                    message.limit as _,
 924                    cx,
 925                )
 926                .into_handle(query, cx)
 927                .matching_buffers(cx)
 928            });
 929            let (batcher, batches) =
 930                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 931            let mut new_matches = Box::pin(results.rx);
 932
 933            let sender_task = cx.background_executor().spawn({
 934                let client = client.clone();
 935                async move {
 936                    let mut batches = std::pin::pin!(batches);
 937                    while let Some(buffer_ids) = batches.next().await {
 938                        client
 939                            .request(proto::FindSearchCandidatesChunk {
 940                                handle,
 941                                peer_id: Some(peer_id),
 942                                project_id,
 943                                variant: Some(
 944                                    proto::find_search_candidates_chunk::Variant::Matches(
 945                                        proto::FindSearchCandidatesMatches { buffer_ids },
 946                                    ),
 947                                ),
 948                            })
 949                            .await?;
 950                    }
 951                    anyhow::Ok(())
 952                }
 953            });
 954
 955            while let Some(buffer) = new_matches.next().await {
 956                let _ = buffer_store
 957                    .update(cx, |this, cx| {
 958                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 959                    })
 960                    .await;
 961                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 962                batcher.push(buffer_id).await;
 963            }
 964            batcher.flush().await;
 965
 966            sender_task.await?;
 967
 968            client
 969                .request(proto::FindSearchCandidatesChunk {
 970                    handle,
 971                    peer_id: Some(peer_id),
 972                    project_id,
 973                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 974                        proto::FindSearchCandidatesDone {},
 975                    )),
 976                })
 977                .await?;
 978            anyhow::Ok(())
 979        });
 980        _buffer_store.update(&mut cx, |this, _| {
 981            this.register_ongoing_project_search((peer_id, handle), task);
 982        });
 983
 984        Ok(proto::Ack {})
 985    }
 986
 987    // Goes from client to host.
 988    async fn handle_find_search_candidates_cancel(
 989        this: Entity<Self>,
 990        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 991        mut cx: AsyncApp,
 992    ) -> Result<()> {
 993        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 994        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 995    }
 996
 997    async fn handle_list_remote_directory(
 998        this: Entity<Self>,
 999        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
1000        cx: AsyncApp,
1001    ) -> Result<proto::ListRemoteDirectoryResponse> {
1002        use smol::stream::StreamExt;
1003        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1004        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1005        let check_info = envelope
1006            .payload
1007            .config
1008            .as_ref()
1009            .is_some_and(|config| config.is_dir);
1010
1011        let mut entries = Vec::new();
1012        let mut entry_info = Vec::new();
1013        let mut response = fs.read_dir(&expanded).await?;
1014        while let Some(path) = response.next().await {
1015            let path = path?;
1016            if let Some(file_name) = path.file_name() {
1017                entries.push(file_name.to_string_lossy().into_owned());
1018                if check_info {
1019                    let is_dir = fs.is_dir(&path).await;
1020                    entry_info.push(proto::EntryInfo { is_dir });
1021                }
1022            }
1023        }
1024        Ok(proto::ListRemoteDirectoryResponse {
1025            entries,
1026            entry_info,
1027        })
1028    }
1029
1030    async fn handle_get_path_metadata(
1031        this: Entity<Self>,
1032        envelope: TypedEnvelope<proto::GetPathMetadata>,
1033        cx: AsyncApp,
1034    ) -> Result<proto::GetPathMetadataResponse> {
1035        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1036        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1037
1038        let metadata = fs.metadata(&expanded).await?;
1039        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1040
1041        Ok(proto::GetPathMetadataResponse {
1042            exists: metadata.is_some(),
1043            is_dir,
1044            path: expanded.to_string_lossy().into_owned(),
1045        })
1046    }
1047
1048    async fn handle_shutdown_remote_server(
1049        _this: Entity<Self>,
1050        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1051        cx: AsyncApp,
1052    ) -> Result<proto::Ack> {
1053        cx.spawn(async move |cx| {
1054            cx.update(|cx| {
1055                // TODO: This is a hack, because in a headless project, shutdown isn't executed
1056                // when calling quit, but it should be.
1057                cx.shutdown();
1058                cx.quit();
1059            })
1060        })
1061        .detach();
1062
1063        Ok(proto::Ack {})
1064    }
1065
1066    pub async fn handle_ping(
1067        _this: Entity<Self>,
1068        _envelope: TypedEnvelope<proto::Ping>,
1069        _cx: AsyncApp,
1070    ) -> Result<proto::Ack> {
1071        log::debug!("Received ping from client");
1072        Ok(proto::Ack {})
1073    }
1074
1075    async fn handle_get_processes(
1076        _this: Entity<Self>,
1077        _envelope: TypedEnvelope<proto::GetProcesses>,
1078        _cx: AsyncApp,
1079    ) -> Result<proto::GetProcessesResponse> {
1080        let mut processes = Vec::new();
1081        let refresh_kind = RefreshKind::nothing().with_processes(
1082            ProcessRefreshKind::nothing()
1083                .without_tasks()
1084                .with_cmd(UpdateKind::Always),
1085        );
1086
1087        for process in System::new_with_specifics(refresh_kind)
1088            .processes()
1089            .values()
1090        {
1091            let name = process.name().to_string_lossy().into_owned();
1092            let command = process
1093                .cmd()
1094                .iter()
1095                .map(|s| s.to_string_lossy().into_owned())
1096                .collect::<Vec<_>>();
1097
1098            processes.push(proto::ProcessInfo {
1099                pid: process.pid().as_u32(),
1100                name,
1101                command,
1102            });
1103        }
1104
1105        processes.sort_by_key(|p| p.name.clone());
1106
1107        Ok(proto::GetProcessesResponse { processes })
1108    }
1109
1110    async fn handle_get_remote_profiling_data(
1111        this: Entity<Self>,
1112        envelope: TypedEnvelope<proto::GetRemoteProfilingData>,
1113        cx: AsyncApp,
1114    ) -> Result<proto::GetRemoteProfilingDataResponse> {
1115        let foreground_only = envelope.payload.foreground_only;
1116
1117        let (deltas, now_nanos) = cx.update(|cx| {
1118            let dispatcher = cx.foreground_executor().dispatcher();
1119            let timings = if foreground_only {
1120                vec![dispatcher.get_current_thread_timings()]
1121            } else {
1122                dispatcher.get_all_timings()
1123            };
1124            this.update(cx, |this, _cx| {
1125                let deltas = this.profiling_collector.collect_unseen(timings);
1126                let now_nanos = Instant::now()
1127                    .duration_since(this.profiling_collector.startup_time())
1128                    .as_nanos() as u64;
1129                (deltas, now_nanos)
1130            })
1131        });
1132
1133        let threads = deltas
1134            .into_iter()
1135            .map(|delta| proto::RemoteProfilingThread {
1136                thread_name: delta.thread_name,
1137                thread_id: delta.thread_id,
1138                timings: delta
1139                    .new_timings
1140                    .into_iter()
1141                    .map(|t| proto::RemoteProfilingTiming {
1142                        location: Some(proto::RemoteProfilingLocation {
1143                            file: t.location.file.to_string(),
1144                            line: t.location.line,
1145                            column: t.location.column,
1146                        }),
1147                        start_nanos: t.start as u64,
1148                        duration_nanos: t.duration as u64,
1149                    })
1150                    .collect(),
1151            })
1152            .collect();
1153
1154        Ok(proto::GetRemoteProfilingDataResponse { threads, now_nanos })
1155    }
1156
1157    async fn handle_get_directory_environment(
1158        this: Entity<Self>,
1159        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1160        mut cx: AsyncApp,
1161    ) -> Result<proto::DirectoryEnvironment> {
1162        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1163        let directory = PathBuf::from(envelope.payload.directory);
1164        let environment = this
1165            .update(&mut cx, |this, cx| {
1166                this.environment.update(cx, |environment, cx| {
1167                    environment.local_directory_environment(&shell, directory.into(), cx)
1168                })
1169            })
1170            .await
1171            .context("failed to get directory environment")?
1172            .into_iter()
1173            .collect();
1174        Ok(proto::DirectoryEnvironment { environment })
1175    }
1176}
1177
1178fn prompt_to_proto(
1179    prompt: &project::LanguageServerPromptRequest,
1180) -> proto::language_server_prompt_request::Level {
1181    match prompt.level {
1182        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1183            proto::language_server_prompt_request::Info {},
1184        ),
1185        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1186            proto::language_server_prompt_request::Warning {},
1187        ),
1188        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1189            proto::language_server_prompt_request::Critical {},
1190        ),
1191    }
1192}