1use anyhow::{Context as _, Result, anyhow};
2use client::ProjectId;
3use collections::HashSet;
4use language::File;
5use lsp::LanguageServerId;
6
7use extension::ExtensionHostProxy;
8use extension_host::headless_host::HeadlessExtensionStore;
9use fs::Fs;
10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
11use http_client::HttpClient;
12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
13use node_runtime::NodeRuntime;
14use project::{
15 AgentRegistryStore, LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment,
16 ProjectPath, ToolchainStore, WorktreeId,
17 agent_server_store::AgentServerStore,
18 buffer_store::{BufferStore, BufferStoreEvent},
19 context_server_store::ContextServerStore,
20 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
21 git_store::GitStore,
22 image_store::ImageId,
23 lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
24 project_settings::SettingsObserver,
25 search::SearchQuery,
26 task_store::TaskStore,
27 trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
28 worktree_store::{WorktreeIdCounter, WorktreeStore},
29};
30use rpc::{
31 AnyProtoClient, TypedEnvelope,
32 proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
33};
34
35use settings::initial_server_settings_content;
36use std::{
37 num::NonZeroU64,
38 path::{Path, PathBuf},
39 sync::{
40 Arc,
41 atomic::{AtomicU64, AtomicUsize, Ordering},
42 },
43 time::Instant,
44};
45use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
46use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
47use worktree::Worktree;
48
49pub struct HeadlessProject {
50 pub fs: Arc<dyn Fs>,
51 pub session: AnyProtoClient,
52 pub worktree_store: Entity<WorktreeStore>,
53 pub buffer_store: Entity<BufferStore>,
54 pub lsp_store: Entity<LspStore>,
55 pub task_store: Entity<TaskStore>,
56 pub dap_store: Entity<DapStore>,
57 pub breakpoint_store: Entity<BreakpointStore>,
58 pub agent_server_store: Entity<AgentServerStore>,
59 pub context_server_store: Entity<ContextServerStore>,
60 pub settings_observer: Entity<SettingsObserver>,
61 pub next_entry_id: Arc<AtomicUsize>,
62 pub languages: Arc<LanguageRegistry>,
63 pub extensions: Entity<HeadlessExtensionStore>,
64 pub git_store: Entity<GitStore>,
65 pub environment: Entity<ProjectEnvironment>,
66 pub profiling_collector: gpui::ProfilingCollector,
67 // Used mostly to keep alive the toolchain store for RPC handlers.
68 // Local variant is used within LSP store, but that's a separate entity.
69 pub _toolchain_store: Entity<ToolchainStore>,
70}
71
72pub struct HeadlessAppState {
73 pub session: AnyProtoClient,
74 pub fs: Arc<dyn Fs>,
75 pub http_client: Arc<dyn HttpClient>,
76 pub node_runtime: NodeRuntime,
77 pub languages: Arc<LanguageRegistry>,
78 pub extension_host_proxy: Arc<ExtensionHostProxy>,
79 pub startup_time: Instant,
80}
81
82impl HeadlessProject {
83 pub fn init(cx: &mut App) {
84 settings::init(cx);
85 log_store::init(true, cx);
86 }
87
88 pub fn new(
89 HeadlessAppState {
90 session,
91 fs,
92 http_client,
93 node_runtime,
94 languages,
95 extension_host_proxy: proxy,
96 startup_time,
97 }: HeadlessAppState,
98 init_worktree_trust: bool,
99 cx: &mut Context<Self>,
100 ) -> Self {
101 debug_adapter_extension::init(proxy.clone(), cx);
102 languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
103
104 let worktree_store = cx.new(|cx| {
105 let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
106 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
107 store
108 });
109
110 if init_worktree_trust {
111 project::trusted_worktrees::track_worktree_trust(
112 worktree_store.clone(),
113 None::<RemoteHostLocation>,
114 Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
115 None,
116 cx,
117 );
118 }
119
120 let environment =
121 cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
122 let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
123 let toolchain_store = cx.new(|cx| {
124 ToolchainStore::local(
125 languages.clone(),
126 worktree_store.clone(),
127 environment.clone(),
128 manifest_tree.clone(),
129 fs.clone(),
130 cx,
131 )
132 });
133
134 let buffer_store = cx.new(|cx| {
135 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
136 buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
137 buffer_store
138 });
139
140 let breakpoint_store = cx.new(|_| {
141 let mut breakpoint_store =
142 BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
143 breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
144
145 breakpoint_store
146 });
147
148 let dap_store = cx.new(|cx| {
149 let mut dap_store = DapStore::new_local(
150 http_client.clone(),
151 node_runtime.clone(),
152 fs.clone(),
153 environment.clone(),
154 toolchain_store.read(cx).as_language_toolchain_store(),
155 worktree_store.clone(),
156 breakpoint_store.clone(),
157 true,
158 cx,
159 );
160 dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
161 dap_store
162 });
163
164 let git_store = cx.new(|cx| {
165 let mut store = GitStore::local(
166 &worktree_store,
167 buffer_store.clone(),
168 environment.clone(),
169 fs.clone(),
170 cx,
171 );
172 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
173 store
174 });
175
176 let prettier_store = cx.new(|cx| {
177 PrettierStore::new(
178 node_runtime.clone(),
179 fs.clone(),
180 languages.clone(),
181 worktree_store.clone(),
182 cx,
183 )
184 });
185
186 let task_store = cx.new(|cx| {
187 let mut task_store = TaskStore::local(
188 buffer_store.downgrade(),
189 worktree_store.clone(),
190 toolchain_store.read(cx).as_language_toolchain_store(),
191 environment.clone(),
192 cx,
193 );
194 task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
195 task_store
196 });
197 let settings_observer = cx.new(|cx| {
198 let mut observer = SettingsObserver::new_local(
199 fs.clone(),
200 worktree_store.clone(),
201 task_store.clone(),
202 true,
203 cx,
204 );
205 observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
206 observer
207 });
208
209 let lsp_store = cx.new(|cx| {
210 let mut lsp_store = LspStore::new_local(
211 buffer_store.clone(),
212 worktree_store.clone(),
213 prettier_store.clone(),
214 toolchain_store
215 .read(cx)
216 .as_local_store()
217 .expect("Toolchain store to be local")
218 .clone(),
219 environment.clone(),
220 manifest_tree,
221 languages.clone(),
222 http_client.clone(),
223 fs.clone(),
224 cx,
225 );
226 lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
227 lsp_store
228 });
229
230 AgentRegistryStore::init_global(cx, fs.clone(), http_client.clone());
231
232 let agent_server_store = cx.new(|cx| {
233 let mut agent_server_store = AgentServerStore::local(
234 node_runtime.clone(),
235 fs.clone(),
236 environment.clone(),
237 http_client.clone(),
238 cx,
239 );
240 agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
241 agent_server_store
242 });
243
244 let context_server_store = cx.new(|cx| {
245 let mut context_server_store =
246 ContextServerStore::local(worktree_store.clone(), None, true, cx);
247 context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
248 context_server_store
249 });
250
251 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
252 language_extension::init(
253 language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
254 proxy.clone(),
255 languages.clone(),
256 );
257
258 cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
259 if let BufferStoreEvent::BufferAdded(buffer) = event {
260 cx.subscribe(buffer, Self::on_buffer_event).detach();
261 }
262 })
263 .detach();
264
265 let extensions = HeadlessExtensionStore::new(
266 fs.clone(),
267 http_client.clone(),
268 paths::remote_extensions_dir().to_path_buf(),
269 proxy,
270 node_runtime,
271 cx,
272 );
273
274 // local_machine -> ssh handlers
275 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
276 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
277 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
278 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
279 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
280 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
281 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
282 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
283 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
284 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
285 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
286 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
287
288 session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
289 session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
290 session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
291 session.add_request_handler(cx.weak_entity(), Self::handle_ping);
292 session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
293 session.add_request_handler(cx.weak_entity(), Self::handle_get_remote_profiling_data);
294
295 session.add_entity_request_handler(Self::handle_add_worktree);
296 session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
297
298 session.add_entity_request_handler(Self::handle_open_buffer_by_path);
299 session.add_entity_request_handler(Self::handle_open_new_buffer);
300 session.add_entity_request_handler(Self::handle_find_search_candidates);
301 session.add_entity_request_handler(Self::handle_open_server_settings);
302 session.add_entity_request_handler(Self::handle_get_directory_environment);
303 session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
304 session.add_entity_request_handler(Self::handle_open_image_by_path);
305 session.add_entity_request_handler(Self::handle_trust_worktrees);
306 session.add_entity_request_handler(Self::handle_restrict_worktrees);
307 session.add_entity_request_handler(Self::handle_download_file_by_path);
308
309 session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
310 session.add_entity_request_handler(BufferStore::handle_update_buffer);
311 session.add_entity_message_handler(BufferStore::handle_close_buffer);
312
313 session.add_request_handler(
314 extensions.downgrade(),
315 HeadlessExtensionStore::handle_sync_extensions,
316 );
317 session.add_request_handler(
318 extensions.downgrade(),
319 HeadlessExtensionStore::handle_install_extension,
320 );
321
322 BufferStore::init(&session);
323 WorktreeStore::init(&session);
324 SettingsObserver::init(&session);
325 LspStore::init(&session);
326 TaskStore::init(Some(&session));
327 ToolchainStore::init(&session);
328 DapStore::init(&session, cx);
329 // todo(debugger): Re init breakpoint store when we set it up for collab
330 BreakpointStore::init(&session);
331 GitStore::init(&session);
332 AgentServerStore::init_headless(&session);
333 ContextServerStore::init_headless(&session);
334
335 HeadlessProject {
336 next_entry_id: Default::default(),
337 session,
338 settings_observer,
339 fs,
340 worktree_store,
341 buffer_store,
342 lsp_store,
343 task_store,
344 dap_store,
345 breakpoint_store,
346 agent_server_store,
347 context_server_store,
348 languages,
349 extensions,
350 git_store,
351 environment,
352 profiling_collector: gpui::ProfilingCollector::new(startup_time),
353 _toolchain_store: toolchain_store,
354 }
355 }
356
357 fn on_buffer_event(
358 &mut self,
359 buffer: Entity<Buffer>,
360 event: &BufferEvent,
361 cx: &mut Context<Self>,
362 ) {
363 if let BufferEvent::Operation {
364 operation,
365 is_local: true,
366 } = event
367 {
368 cx.background_spawn(self.session.request(proto::UpdateBuffer {
369 project_id: REMOTE_SERVER_PROJECT_ID,
370 buffer_id: buffer.read(cx).remote_id().to_proto(),
371 operations: vec![serialize_operation(operation)],
372 }))
373 .detach()
374 }
375 }
376
377 fn on_lsp_store_event(
378 &mut self,
379 lsp_store: Entity<LspStore>,
380 event: &LspStoreEvent,
381 cx: &mut Context<Self>,
382 ) {
383 match event {
384 LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
385 let log_store = cx
386 .try_global::<GlobalLogStore>()
387 .map(|lsp_logs| lsp_logs.0.clone());
388 if let Some(log_store) = log_store {
389 log_store.update(cx, |log_store, cx| {
390 log_store.add_language_server(
391 LanguageServerKind::LocalSsh {
392 lsp_store: self.lsp_store.downgrade(),
393 },
394 *id,
395 Some(name.clone()),
396 *worktree_id,
397 lsp_store.read(cx).language_server_for_id(*id),
398 cx,
399 );
400 });
401 }
402 }
403 LspStoreEvent::LanguageServerRemoved(id) => {
404 let log_store = cx
405 .try_global::<GlobalLogStore>()
406 .map(|lsp_logs| lsp_logs.0.clone());
407 if let Some(log_store) = log_store {
408 log_store.update(cx, |log_store, cx| {
409 log_store.remove_language_server(*id, cx);
410 });
411 }
412 }
413 LspStoreEvent::LanguageServerUpdate {
414 language_server_id,
415 name,
416 message,
417 } => {
418 self.session
419 .send(proto::UpdateLanguageServer {
420 project_id: REMOTE_SERVER_PROJECT_ID,
421 server_name: name.as_ref().map(|name| name.to_string()),
422 language_server_id: language_server_id.to_proto(),
423 variant: Some(message.clone()),
424 })
425 .log_err();
426 }
427 LspStoreEvent::Notification(message) => {
428 self.session
429 .send(proto::Toast {
430 project_id: REMOTE_SERVER_PROJECT_ID,
431 notification_id: "lsp".to_string(),
432 message: message.clone(),
433 })
434 .log_err();
435 }
436 LspStoreEvent::LanguageServerPrompt(prompt) => {
437 let request = self.session.request(proto::LanguageServerPromptRequest {
438 project_id: REMOTE_SERVER_PROJECT_ID,
439 actions: prompt
440 .actions
441 .iter()
442 .map(|action| action.title.to_string())
443 .collect(),
444 level: Some(prompt_to_proto(prompt)),
445 lsp_name: prompt.lsp_name.clone(),
446 message: prompt.message.clone(),
447 });
448 let prompt = prompt.clone();
449 cx.background_spawn(async move {
450 let response = request.await?;
451 if let Some(action_response) = response.action_response {
452 prompt.respond(action_response as usize).await;
453 }
454 anyhow::Ok(())
455 })
456 .detach();
457 }
458 _ => {}
459 }
460 }
461
462 pub async fn handle_add_worktree(
463 this: Entity<Self>,
464 message: TypedEnvelope<proto::AddWorktree>,
465 mut cx: AsyncApp,
466 ) -> Result<proto::AddWorktreeResponse> {
467 use client::ErrorCodeExt;
468 let fs = this.read_with(&cx, |this, _| this.fs.clone());
469 let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
470
471 let canonicalized = match fs.canonicalize(&path).await {
472 Ok(path) => path,
473 Err(e) => {
474 let mut parent = path
475 .parent()
476 .ok_or(e)
477 .with_context(|| format!("{path:?} does not exist"))?;
478 if parent == Path::new("") {
479 parent = util::paths::home_dir();
480 }
481 let parent = fs.canonicalize(parent).await.map_err(|_| {
482 anyhow!(
483 proto::ErrorCode::DevServerProjectPathDoesNotExist
484 .with_tag("path", path.to_string_lossy().as_ref())
485 )
486 })?;
487 if let Some(file_name) = path.file_name() {
488 parent.join(file_name)
489 } else {
490 parent
491 }
492 }
493 };
494 let next_worktree_id = this
495 .update(&mut cx, |this, cx| {
496 this.worktree_store
497 .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
498 })
499 .await?;
500 let worktree = this
501 .read_with(&cx.clone(), |this, _| {
502 Worktree::local(
503 Arc::from(canonicalized.as_path()),
504 message.payload.visible,
505 this.fs.clone(),
506 this.next_entry_id.clone(),
507 true,
508 next_worktree_id,
509 &mut cx,
510 )
511 })
512 .await?;
513
514 let response = this.read_with(&cx, |_, cx| {
515 let worktree = worktree.read(cx);
516 proto::AddWorktreeResponse {
517 worktree_id: worktree.id().to_proto(),
518 canonicalized_path: canonicalized.to_string_lossy().into_owned(),
519 }
520 });
521
522 // We spawn this asynchronously, so that we can send the response back
523 // *before* `worktree_store.add()` can send out UpdateProject requests
524 // to the client about the new worktree.
525 //
526 // That lets the client manage the reference/handles of the newly-added
527 // worktree, before getting interrupted by an UpdateProject request.
528 //
529 // This fixes the problem of the client sending the AddWorktree request,
530 // headless project sending out a project update, client receiving it
531 // and immediately dropping the reference of the new client, causing it
532 // to be dropped on the headless project, and the client only then
533 // receiving a response to AddWorktree.
534 cx.spawn(async move |cx| {
535 this.update(cx, |this, cx| {
536 this.worktree_store.update(cx, |worktree_store, cx| {
537 worktree_store.add(&worktree, cx);
538 });
539 });
540 })
541 .detach();
542
543 Ok(response)
544 }
545
546 pub async fn handle_remove_worktree(
547 this: Entity<Self>,
548 envelope: TypedEnvelope<proto::RemoveWorktree>,
549 mut cx: AsyncApp,
550 ) -> Result<proto::Ack> {
551 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
552 this.update(&mut cx, |this, cx| {
553 this.worktree_store.update(cx, |worktree_store, cx| {
554 worktree_store.remove_worktree(worktree_id, cx);
555 });
556 });
557 Ok(proto::Ack {})
558 }
559
560 pub async fn handle_open_buffer_by_path(
561 this: Entity<Self>,
562 message: TypedEnvelope<proto::OpenBufferByPath>,
563 mut cx: AsyncApp,
564 ) -> Result<proto::OpenBufferResponse> {
565 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
566 let path = RelPath::from_proto(&message.payload.path)?;
567 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
568 let buffer_store = this.buffer_store.clone();
569 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
570 buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
571 });
572 (buffer_store, buffer)
573 });
574
575 let buffer = buffer.await?;
576 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
577 buffer_store.update(&mut cx, |buffer_store, cx| {
578 buffer_store
579 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
580 .detach_and_log_err(cx);
581 });
582
583 Ok(proto::OpenBufferResponse {
584 buffer_id: buffer_id.to_proto(),
585 })
586 }
587
588 pub async fn handle_open_image_by_path(
589 this: Entity<Self>,
590 message: TypedEnvelope<proto::OpenImageByPath>,
591 mut cx: AsyncApp,
592 ) -> Result<proto::OpenImageResponse> {
593 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
594 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
595 let path = RelPath::from_proto(&message.payload.path)?;
596 let project_id = message.payload.project_id;
597 use proto::create_image_for_peer::Variant;
598
599 let (worktree_store, session) = this.read_with(&cx, |this, _| {
600 (this.worktree_store.clone(), this.session.clone())
601 });
602
603 let worktree = worktree_store
604 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
605 .context("worktree not found")?;
606
607 let load_task = worktree.update(&mut cx, |worktree, cx| {
608 worktree.load_binary_file(path.as_ref(), cx)
609 });
610
611 let loaded_file = load_task.await?;
612 let content = loaded_file.content;
613 let file = loaded_file.file;
614
615 let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
616 let image_id =
617 ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
618
619 let format = image::guess_format(&content)
620 .map(|f| format!("{:?}", f).to_lowercase())
621 .unwrap_or_else(|_| "unknown".to_string());
622
623 let state = proto::ImageState {
624 id: image_id.to_proto(),
625 file: Some(proto_file),
626 content_size: content.len() as u64,
627 format,
628 };
629
630 session.send(proto::CreateImageForPeer {
631 project_id,
632 peer_id: Some(REMOTE_SERVER_PEER_ID),
633 variant: Some(Variant::State(state)),
634 })?;
635
636 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
637 for chunk in content.chunks(CHUNK_SIZE) {
638 session.send(proto::CreateImageForPeer {
639 project_id,
640 peer_id: Some(REMOTE_SERVER_PEER_ID),
641 variant: Some(Variant::Chunk(proto::ImageChunk {
642 image_id: image_id.to_proto(),
643 data: chunk.to_vec(),
644 })),
645 })?;
646 }
647
648 Ok(proto::OpenImageResponse {
649 image_id: image_id.to_proto(),
650 })
651 }
652
653 pub async fn handle_trust_worktrees(
654 this: Entity<Self>,
655 envelope: TypedEnvelope<proto::TrustWorktrees>,
656 mut cx: AsyncApp,
657 ) -> Result<proto::Ack> {
658 let trusted_worktrees = cx
659 .update(|cx| TrustedWorktrees::try_get_global(cx))
660 .context("missing trusted worktrees")?;
661 let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
662 trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
663 trusted_worktrees.trust(
664 &worktree_store,
665 envelope
666 .payload
667 .trusted_paths
668 .into_iter()
669 .filter_map(PathTrust::from_proto)
670 .collect(),
671 cx,
672 );
673 });
674 Ok(proto::Ack {})
675 }
676
677 pub async fn handle_restrict_worktrees(
678 this: Entity<Self>,
679 envelope: TypedEnvelope<proto::RestrictWorktrees>,
680 mut cx: AsyncApp,
681 ) -> Result<proto::Ack> {
682 let trusted_worktrees = cx
683 .update(|cx| TrustedWorktrees::try_get_global(cx))
684 .context("missing trusted worktrees")?;
685 let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
686 trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
687 let restricted_paths = envelope
688 .payload
689 .worktree_ids
690 .into_iter()
691 .map(WorktreeId::from_proto)
692 .map(PathTrust::Worktree)
693 .collect::<HashSet<_>>();
694 trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
695 });
696 Ok(proto::Ack {})
697 }
698
699 pub async fn handle_download_file_by_path(
700 this: Entity<Self>,
701 message: TypedEnvelope<proto::DownloadFileByPath>,
702 mut cx: AsyncApp,
703 ) -> Result<proto::DownloadFileResponse> {
704 log::debug!(
705 "handle_download_file_by_path: received request: {:?}",
706 message.payload
707 );
708
709 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
710 let path = RelPath::from_proto(&message.payload.path)?;
711 let project_id = message.payload.project_id;
712 let file_id = message.payload.file_id;
713 log::debug!(
714 "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
715 worktree_id,
716 path,
717 file_id
718 );
719 use proto::create_file_for_peer::Variant;
720
721 let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
722 .read_with(&cx, |this, _| {
723 (this.worktree_store.clone(), this.session.clone())
724 });
725
726 let worktree = worktree_store
727 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
728 .context("worktree not found")?;
729
730 let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
731 worktree.load_binary_file(path.as_ref(), cx)
732 });
733
734 let downloaded_file = download_task.await?;
735 let content = downloaded_file.content;
736 let file = downloaded_file.file;
737 log::debug!(
738 "handle_download_file_by_path: file loaded, content_size={}",
739 content.len()
740 );
741
742 let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
743 log::debug!(
744 "handle_download_file_by_path: using client-provided file_id={}",
745 file_id
746 );
747
748 let state = proto::FileState {
749 id: file_id,
750 file: Some(proto_file),
751 content_size: content.len() as u64,
752 };
753
754 log::debug!("handle_download_file_by_path: sending State message");
755 session.send(proto::CreateFileForPeer {
756 project_id,
757 peer_id: Some(REMOTE_SERVER_PEER_ID),
758 variant: Some(Variant::State(state)),
759 })?;
760
761 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
762 let num_chunks = content.len().div_ceil(CHUNK_SIZE);
763 log::debug!(
764 "handle_download_file_by_path: sending {} chunks",
765 num_chunks
766 );
767 for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
768 log::trace!(
769 "handle_download_file_by_path: sending chunk {}/{}, size={}",
770 i + 1,
771 num_chunks,
772 chunk.len()
773 );
774 session.send(proto::CreateFileForPeer {
775 project_id,
776 peer_id: Some(REMOTE_SERVER_PEER_ID),
777 variant: Some(Variant::Chunk(proto::FileChunk {
778 file_id,
779 data: chunk.to_vec(),
780 })),
781 })?;
782 }
783
784 log::debug!(
785 "handle_download_file_by_path: returning file_id={}",
786 file_id
787 );
788 Ok(proto::DownloadFileResponse { file_id })
789 }
790
791 pub async fn handle_open_new_buffer(
792 this: Entity<Self>,
793 _message: TypedEnvelope<proto::OpenNewBuffer>,
794 mut cx: AsyncApp,
795 ) -> Result<proto::OpenBufferResponse> {
796 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
797 let buffer_store = this.buffer_store.clone();
798 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
799 buffer_store.create_buffer(None, true, cx)
800 });
801 (buffer_store, buffer)
802 });
803
804 let buffer = buffer.await?;
805 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
806 buffer_store.update(&mut cx, |buffer_store, cx| {
807 buffer_store
808 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
809 .detach_and_log_err(cx);
810 });
811
812 Ok(proto::OpenBufferResponse {
813 buffer_id: buffer_id.to_proto(),
814 })
815 }
816
817 async fn handle_toggle_lsp_logs(
818 _: Entity<Self>,
819 envelope: TypedEnvelope<proto::ToggleLspLogs>,
820 cx: AsyncApp,
821 ) -> Result<()> {
822 let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
823 cx.update(|cx| {
824 let log_store = cx
825 .try_global::<GlobalLogStore>()
826 .map(|global_log_store| global_log_store.0.clone())
827 .context("lsp logs store is missing")?;
828 let toggled_log_kind =
829 match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
830 .context("invalid log type")?
831 {
832 proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
833 proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
834 proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
835 };
836 log_store.update(cx, |log_store, _| {
837 log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
838 });
839 anyhow::Ok(())
840 })?;
841
842 Ok(())
843 }
844
845 async fn handle_open_server_settings(
846 this: Entity<Self>,
847 _: TypedEnvelope<proto::OpenServerSettings>,
848 mut cx: AsyncApp,
849 ) -> Result<proto::OpenBufferResponse> {
850 let settings_path = paths::settings_file();
851 let (worktree, path) = this
852 .update(&mut cx, |this, cx| {
853 this.worktree_store.update(cx, |worktree_store, cx| {
854 worktree_store.find_or_create_worktree(settings_path, false, cx)
855 })
856 })
857 .await?;
858
859 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
860 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
861 buffer_store.open_buffer(
862 ProjectPath {
863 worktree_id: worktree.read(cx).id(),
864 path,
865 },
866 cx,
867 )
868 });
869
870 (buffer, this.buffer_store.clone())
871 });
872
873 let buffer = buffer.await?;
874
875 let buffer_id = cx.update(|cx| {
876 if buffer.read(cx).is_empty() {
877 buffer.update(cx, |buffer, cx| {
878 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
879 });
880 }
881
882 let buffer_id = buffer.read(cx).remote_id();
883
884 buffer_store.update(cx, |buffer_store, cx| {
885 buffer_store
886 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
887 .detach_and_log_err(cx);
888 });
889
890 buffer_id
891 });
892
893 Ok(proto::OpenBufferResponse {
894 buffer_id: buffer_id.to_proto(),
895 })
896 }
897
898 async fn handle_find_search_candidates(
899 this: Entity<Self>,
900 envelope: TypedEnvelope<proto::FindSearchCandidates>,
901 mut cx: AsyncApp,
902 ) -> Result<proto::Ack> {
903 use futures::stream::StreamExt as _;
904
905 let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
906 let message = envelope.payload;
907 let query = SearchQuery::from_proto(
908 message.query.context("missing query field")?,
909 PathStyle::local(),
910 )?;
911
912 let project_id = message.project_id;
913 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
914 let handle = message.handle;
915 let _buffer_store = buffer_store.clone();
916 let client = this.read_with(&cx, |this, _| this.session.clone());
917 let task = cx.spawn(async move |cx| {
918 let results = this.update(cx, |this, cx| {
919 project::Search::local(
920 this.fs.clone(),
921 this.buffer_store.clone(),
922 this.worktree_store.clone(),
923 message.limit as _,
924 cx,
925 )
926 .into_handle(query, cx)
927 .matching_buffers(cx)
928 });
929 let (batcher, batches) =
930 project::project_search::AdaptiveBatcher::new(cx.background_executor());
931 let mut new_matches = Box::pin(results.rx);
932
933 let sender_task = cx.background_executor().spawn({
934 let client = client.clone();
935 async move {
936 let mut batches = std::pin::pin!(batches);
937 while let Some(buffer_ids) = batches.next().await {
938 client
939 .request(proto::FindSearchCandidatesChunk {
940 handle,
941 peer_id: Some(peer_id),
942 project_id,
943 variant: Some(
944 proto::find_search_candidates_chunk::Variant::Matches(
945 proto::FindSearchCandidatesMatches { buffer_ids },
946 ),
947 ),
948 })
949 .await?;
950 }
951 anyhow::Ok(())
952 }
953 });
954
955 while let Some(buffer) = new_matches.next().await {
956 let _ = buffer_store
957 .update(cx, |this, cx| {
958 this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
959 })
960 .await;
961 let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
962 batcher.push(buffer_id).await;
963 }
964 batcher.flush().await;
965
966 sender_task.await?;
967
968 client
969 .request(proto::FindSearchCandidatesChunk {
970 handle,
971 peer_id: Some(peer_id),
972 project_id,
973 variant: Some(proto::find_search_candidates_chunk::Variant::Done(
974 proto::FindSearchCandidatesDone {},
975 )),
976 })
977 .await?;
978 anyhow::Ok(())
979 });
980 _buffer_store.update(&mut cx, |this, _| {
981 this.register_ongoing_project_search((peer_id, handle), task);
982 });
983
984 Ok(proto::Ack {})
985 }
986
987 // Goes from client to host.
988 async fn handle_find_search_candidates_cancel(
989 this: Entity<Self>,
990 envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
991 mut cx: AsyncApp,
992 ) -> Result<()> {
993 let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
994 BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
995 }
996
997 async fn handle_list_remote_directory(
998 this: Entity<Self>,
999 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
1000 cx: AsyncApp,
1001 ) -> Result<proto::ListRemoteDirectoryResponse> {
1002 use smol::stream::StreamExt;
1003 let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1004 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1005 let check_info = envelope
1006 .payload
1007 .config
1008 .as_ref()
1009 .is_some_and(|config| config.is_dir);
1010
1011 let mut entries = Vec::new();
1012 let mut entry_info = Vec::new();
1013 let mut response = fs.read_dir(&expanded).await?;
1014 while let Some(path) = response.next().await {
1015 let path = path?;
1016 if let Some(file_name) = path.file_name() {
1017 entries.push(file_name.to_string_lossy().into_owned());
1018 if check_info {
1019 let is_dir = fs.is_dir(&path).await;
1020 entry_info.push(proto::EntryInfo { is_dir });
1021 }
1022 }
1023 }
1024 Ok(proto::ListRemoteDirectoryResponse {
1025 entries,
1026 entry_info,
1027 })
1028 }
1029
1030 async fn handle_get_path_metadata(
1031 this: Entity<Self>,
1032 envelope: TypedEnvelope<proto::GetPathMetadata>,
1033 cx: AsyncApp,
1034 ) -> Result<proto::GetPathMetadataResponse> {
1035 let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1036 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1037
1038 let metadata = fs.metadata(&expanded).await?;
1039 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1040
1041 Ok(proto::GetPathMetadataResponse {
1042 exists: metadata.is_some(),
1043 is_dir,
1044 path: expanded.to_string_lossy().into_owned(),
1045 })
1046 }
1047
1048 async fn handle_shutdown_remote_server(
1049 _this: Entity<Self>,
1050 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1051 cx: AsyncApp,
1052 ) -> Result<proto::Ack> {
1053 cx.spawn(async move |cx| {
1054 cx.update(|cx| {
1055 // TODO: This is a hack, because in a headless project, shutdown isn't executed
1056 // when calling quit, but it should be.
1057 cx.shutdown();
1058 cx.quit();
1059 })
1060 })
1061 .detach();
1062
1063 Ok(proto::Ack {})
1064 }
1065
1066 pub async fn handle_ping(
1067 _this: Entity<Self>,
1068 _envelope: TypedEnvelope<proto::Ping>,
1069 _cx: AsyncApp,
1070 ) -> Result<proto::Ack> {
1071 log::debug!("Received ping from client");
1072 Ok(proto::Ack {})
1073 }
1074
1075 async fn handle_get_processes(
1076 _this: Entity<Self>,
1077 _envelope: TypedEnvelope<proto::GetProcesses>,
1078 _cx: AsyncApp,
1079 ) -> Result<proto::GetProcessesResponse> {
1080 let mut processes = Vec::new();
1081 let refresh_kind = RefreshKind::nothing().with_processes(
1082 ProcessRefreshKind::nothing()
1083 .without_tasks()
1084 .with_cmd(UpdateKind::Always),
1085 );
1086
1087 for process in System::new_with_specifics(refresh_kind)
1088 .processes()
1089 .values()
1090 {
1091 let name = process.name().to_string_lossy().into_owned();
1092 let command = process
1093 .cmd()
1094 .iter()
1095 .map(|s| s.to_string_lossy().into_owned())
1096 .collect::<Vec<_>>();
1097
1098 processes.push(proto::ProcessInfo {
1099 pid: process.pid().as_u32(),
1100 name,
1101 command,
1102 });
1103 }
1104
1105 processes.sort_by_key(|p| p.name.clone());
1106
1107 Ok(proto::GetProcessesResponse { processes })
1108 }
1109
1110 async fn handle_get_remote_profiling_data(
1111 this: Entity<Self>,
1112 envelope: TypedEnvelope<proto::GetRemoteProfilingData>,
1113 cx: AsyncApp,
1114 ) -> Result<proto::GetRemoteProfilingDataResponse> {
1115 let foreground_only = envelope.payload.foreground_only;
1116
1117 let (deltas, now_nanos) = cx.update(|cx| {
1118 let dispatcher = cx.foreground_executor().dispatcher();
1119 let timings = if foreground_only {
1120 vec![dispatcher.get_current_thread_timings()]
1121 } else {
1122 dispatcher.get_all_timings()
1123 };
1124 this.update(cx, |this, _cx| {
1125 let deltas = this.profiling_collector.collect_unseen(timings);
1126 let now_nanos = Instant::now()
1127 .duration_since(this.profiling_collector.startup_time())
1128 .as_nanos() as u64;
1129 (deltas, now_nanos)
1130 })
1131 });
1132
1133 let threads = deltas
1134 .into_iter()
1135 .map(|delta| proto::RemoteProfilingThread {
1136 thread_name: delta.thread_name,
1137 thread_id: delta.thread_id,
1138 timings: delta
1139 .new_timings
1140 .into_iter()
1141 .map(|t| proto::RemoteProfilingTiming {
1142 location: Some(proto::RemoteProfilingLocation {
1143 file: t.location.file.to_string(),
1144 line: t.location.line,
1145 column: t.location.column,
1146 }),
1147 start_nanos: t.start as u64,
1148 duration_nanos: t.duration as u64,
1149 })
1150 .collect(),
1151 })
1152 .collect();
1153
1154 Ok(proto::GetRemoteProfilingDataResponse { threads, now_nanos })
1155 }
1156
1157 async fn handle_get_directory_environment(
1158 this: Entity<Self>,
1159 envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1160 mut cx: AsyncApp,
1161 ) -> Result<proto::DirectoryEnvironment> {
1162 let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1163 let directory = PathBuf::from(envelope.payload.directory);
1164 let environment = this
1165 .update(&mut cx, |this, cx| {
1166 this.environment.update(cx, |environment, cx| {
1167 environment.local_directory_environment(&shell, directory.into(), cx)
1168 })
1169 })
1170 .await
1171 .context("failed to get directory environment")?
1172 .into_iter()
1173 .collect();
1174 Ok(proto::DirectoryEnvironment { environment })
1175 }
1176}
1177
1178fn prompt_to_proto(
1179 prompt: &project::LanguageServerPromptRequest,
1180) -> proto::language_server_prompt_request::Level {
1181 match prompt.level {
1182 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1183 proto::language_server_prompt_request::Info {},
1184 ),
1185 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1186 proto::language_server_prompt_request::Warning {},
1187 ),
1188 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1189 proto::language_server_prompt_request::Critical {},
1190 ),
1191 }
1192}