1use anyhow::{Context as _, Result, anyhow};
2use client::ProjectId;
3use collections::HashSet;
4use language::File;
5use lsp::LanguageServerId;
6
7use extension::ExtensionHostProxy;
8use extension_host::headless_host::HeadlessExtensionStore;
9use fs::Fs;
10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
11use http_client::HttpClient;
12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
13use node_runtime::NodeRuntime;
14use project::{
15 AgentRegistryStore, LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment,
16 ProjectPath, ToolchainStore, WorktreeId,
17 agent_server_store::AgentServerStore,
18 buffer_store::{BufferStore, BufferStoreEvent},
19 context_server_store::ContextServerStore,
20 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
21 git_store::GitStore,
22 image_store::ImageId,
23 lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
24 project_settings::SettingsObserver,
25 search::SearchQuery,
26 task_store::TaskStore,
27 trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
28 worktree_store::{WorktreeIdCounter, WorktreeStore},
29};
30use rpc::{
31 AnyProtoClient, TypedEnvelope,
32 proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
33};
34
35use settings::initial_server_settings_content;
36use std::{
37 num::NonZeroU64,
38 path::{Path, PathBuf},
39 sync::{
40 Arc,
41 atomic::{AtomicU64, AtomicUsize, Ordering},
42 },
43};
44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
46use worktree::Worktree;
47
48pub struct HeadlessProject {
49 pub fs: Arc<dyn Fs>,
50 pub session: AnyProtoClient,
51 pub worktree_store: Entity<WorktreeStore>,
52 pub buffer_store: Entity<BufferStore>,
53 pub lsp_store: Entity<LspStore>,
54 pub task_store: Entity<TaskStore>,
55 pub dap_store: Entity<DapStore>,
56 pub breakpoint_store: Entity<BreakpointStore>,
57 pub agent_server_store: Entity<AgentServerStore>,
58 pub context_server_store: Entity<ContextServerStore>,
59 pub settings_observer: Entity<SettingsObserver>,
60 pub next_entry_id: Arc<AtomicUsize>,
61 pub languages: Arc<LanguageRegistry>,
62 pub extensions: Entity<HeadlessExtensionStore>,
63 pub git_store: Entity<GitStore>,
64 pub environment: Entity<ProjectEnvironment>,
65 // Used mostly to keep alive the toolchain store for RPC handlers.
66 // Local variant is used within LSP store, but that's a separate entity.
67 pub _toolchain_store: Entity<ToolchainStore>,
68}
69
70pub struct HeadlessAppState {
71 pub session: AnyProtoClient,
72 pub fs: Arc<dyn Fs>,
73 pub http_client: Arc<dyn HttpClient>,
74 pub node_runtime: NodeRuntime,
75 pub languages: Arc<LanguageRegistry>,
76 pub extension_host_proxy: Arc<ExtensionHostProxy>,
77}
78
79impl HeadlessProject {
80 pub fn init(cx: &mut App) {
81 settings::init(cx);
82 log_store::init(true, cx);
83 }
84
85 pub fn new(
86 HeadlessAppState {
87 session,
88 fs,
89 http_client,
90 node_runtime,
91 languages,
92 extension_host_proxy: proxy,
93 }: HeadlessAppState,
94 init_worktree_trust: bool,
95 cx: &mut Context<Self>,
96 ) -> Self {
97 debug_adapter_extension::init(proxy.clone(), cx);
98 languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
99
100 let worktree_store = cx.new(|cx| {
101 let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
102 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
103 store
104 });
105
106 if init_worktree_trust {
107 project::trusted_worktrees::track_worktree_trust(
108 worktree_store.clone(),
109 None::<RemoteHostLocation>,
110 Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
111 None,
112 cx,
113 );
114 }
115
116 let environment =
117 cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
118 let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
119 let toolchain_store = cx.new(|cx| {
120 ToolchainStore::local(
121 languages.clone(),
122 worktree_store.clone(),
123 environment.clone(),
124 manifest_tree.clone(),
125 fs.clone(),
126 cx,
127 )
128 });
129
130 let buffer_store = cx.new(|cx| {
131 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
132 buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
133 buffer_store
134 });
135
136 let breakpoint_store = cx.new(|_| {
137 let mut breakpoint_store =
138 BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
139 breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
140
141 breakpoint_store
142 });
143
144 let dap_store = cx.new(|cx| {
145 let mut dap_store = DapStore::new_local(
146 http_client.clone(),
147 node_runtime.clone(),
148 fs.clone(),
149 environment.clone(),
150 toolchain_store.read(cx).as_language_toolchain_store(),
151 worktree_store.clone(),
152 breakpoint_store.clone(),
153 true,
154 cx,
155 );
156 dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
157 dap_store
158 });
159
160 let git_store = cx.new(|cx| {
161 let mut store = GitStore::local(
162 &worktree_store,
163 buffer_store.clone(),
164 environment.clone(),
165 fs.clone(),
166 cx,
167 );
168 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
169 store
170 });
171
172 let prettier_store = cx.new(|cx| {
173 PrettierStore::new(
174 node_runtime.clone(),
175 fs.clone(),
176 languages.clone(),
177 worktree_store.clone(),
178 cx,
179 )
180 });
181
182 let task_store = cx.new(|cx| {
183 let mut task_store = TaskStore::local(
184 buffer_store.downgrade(),
185 worktree_store.clone(),
186 toolchain_store.read(cx).as_language_toolchain_store(),
187 environment.clone(),
188 cx,
189 );
190 task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
191 task_store
192 });
193 let settings_observer = cx.new(|cx| {
194 let mut observer = SettingsObserver::new_local(
195 fs.clone(),
196 worktree_store.clone(),
197 task_store.clone(),
198 true,
199 cx,
200 );
201 observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
202 observer
203 });
204
205 let lsp_store = cx.new(|cx| {
206 let mut lsp_store = LspStore::new_local(
207 buffer_store.clone(),
208 worktree_store.clone(),
209 prettier_store.clone(),
210 toolchain_store
211 .read(cx)
212 .as_local_store()
213 .expect("Toolchain store to be local")
214 .clone(),
215 environment.clone(),
216 manifest_tree,
217 languages.clone(),
218 http_client.clone(),
219 fs.clone(),
220 cx,
221 );
222 lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
223 lsp_store
224 });
225
226 AgentRegistryStore::init_global(cx, fs.clone(), http_client.clone());
227
228 let agent_server_store = cx.new(|cx| {
229 let mut agent_server_store = AgentServerStore::local(
230 node_runtime.clone(),
231 fs.clone(),
232 environment.clone(),
233 http_client.clone(),
234 cx,
235 );
236 agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
237 agent_server_store
238 });
239
240 let context_server_store = cx.new(|cx| {
241 let mut context_server_store =
242 ContextServerStore::local(worktree_store.clone(), None, true, cx);
243 context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
244 context_server_store
245 });
246
247 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
248 language_extension::init(
249 language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
250 proxy.clone(),
251 languages.clone(),
252 );
253
254 cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
255 if let BufferStoreEvent::BufferAdded(buffer) = event {
256 cx.subscribe(buffer, Self::on_buffer_event).detach();
257 }
258 })
259 .detach();
260
261 let extensions = HeadlessExtensionStore::new(
262 fs.clone(),
263 http_client.clone(),
264 paths::remote_extensions_dir().to_path_buf(),
265 proxy,
266 node_runtime,
267 cx,
268 );
269
270 // local_machine -> ssh handlers
271 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
272 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
273 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
274 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
275 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
276 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
277 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
278 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
279 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
280 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
281 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
282 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
283
284 session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
285 session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
286 session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
287 session.add_request_handler(cx.weak_entity(), Self::handle_ping);
288 session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
289
290 session.add_entity_request_handler(Self::handle_add_worktree);
291 session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
292
293 session.add_entity_request_handler(Self::handle_open_buffer_by_path);
294 session.add_entity_request_handler(Self::handle_open_new_buffer);
295 session.add_entity_request_handler(Self::handle_find_search_candidates);
296 session.add_entity_request_handler(Self::handle_open_server_settings);
297 session.add_entity_request_handler(Self::handle_get_directory_environment);
298 session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
299 session.add_entity_request_handler(Self::handle_open_image_by_path);
300 session.add_entity_request_handler(Self::handle_trust_worktrees);
301 session.add_entity_request_handler(Self::handle_restrict_worktrees);
302 session.add_entity_request_handler(Self::handle_download_file_by_path);
303
304 session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
305 session.add_entity_request_handler(BufferStore::handle_update_buffer);
306 session.add_entity_message_handler(BufferStore::handle_close_buffer);
307
308 session.add_request_handler(
309 extensions.downgrade(),
310 HeadlessExtensionStore::handle_sync_extensions,
311 );
312 session.add_request_handler(
313 extensions.downgrade(),
314 HeadlessExtensionStore::handle_install_extension,
315 );
316
317 BufferStore::init(&session);
318 WorktreeStore::init(&session);
319 SettingsObserver::init(&session);
320 LspStore::init(&session);
321 TaskStore::init(Some(&session));
322 ToolchainStore::init(&session);
323 DapStore::init(&session, cx);
324 // todo(debugger): Re init breakpoint store when we set it up for collab
325 BreakpointStore::init(&session);
326 GitStore::init(&session);
327 AgentServerStore::init_headless(&session);
328 ContextServerStore::init_headless(&session);
329
330 HeadlessProject {
331 next_entry_id: Default::default(),
332 session,
333 settings_observer,
334 fs,
335 worktree_store,
336 buffer_store,
337 lsp_store,
338 task_store,
339 dap_store,
340 breakpoint_store,
341 agent_server_store,
342 context_server_store,
343 languages,
344 extensions,
345 git_store,
346 environment,
347 _toolchain_store: toolchain_store,
348 }
349 }
350
351 fn on_buffer_event(
352 &mut self,
353 buffer: Entity<Buffer>,
354 event: &BufferEvent,
355 cx: &mut Context<Self>,
356 ) {
357 if let BufferEvent::Operation {
358 operation,
359 is_local: true,
360 } = event
361 {
362 cx.background_spawn(self.session.request(proto::UpdateBuffer {
363 project_id: REMOTE_SERVER_PROJECT_ID,
364 buffer_id: buffer.read(cx).remote_id().to_proto(),
365 operations: vec![serialize_operation(operation)],
366 }))
367 .detach()
368 }
369 }
370
371 fn on_lsp_store_event(
372 &mut self,
373 lsp_store: Entity<LspStore>,
374 event: &LspStoreEvent,
375 cx: &mut Context<Self>,
376 ) {
377 match event {
378 LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
379 let log_store = cx
380 .try_global::<GlobalLogStore>()
381 .map(|lsp_logs| lsp_logs.0.clone());
382 if let Some(log_store) = log_store {
383 log_store.update(cx, |log_store, cx| {
384 log_store.add_language_server(
385 LanguageServerKind::LocalSsh {
386 lsp_store: self.lsp_store.downgrade(),
387 },
388 *id,
389 Some(name.clone()),
390 *worktree_id,
391 lsp_store.read(cx).language_server_for_id(*id),
392 cx,
393 );
394 });
395 }
396 }
397 LspStoreEvent::LanguageServerRemoved(id) => {
398 let log_store = cx
399 .try_global::<GlobalLogStore>()
400 .map(|lsp_logs| lsp_logs.0.clone());
401 if let Some(log_store) = log_store {
402 log_store.update(cx, |log_store, cx| {
403 log_store.remove_language_server(*id, cx);
404 });
405 }
406 }
407 LspStoreEvent::LanguageServerUpdate {
408 language_server_id,
409 name,
410 message,
411 } => {
412 self.session
413 .send(proto::UpdateLanguageServer {
414 project_id: REMOTE_SERVER_PROJECT_ID,
415 server_name: name.as_ref().map(|name| name.to_string()),
416 language_server_id: language_server_id.to_proto(),
417 variant: Some(message.clone()),
418 })
419 .log_err();
420 }
421 LspStoreEvent::Notification(message) => {
422 self.session
423 .send(proto::Toast {
424 project_id: REMOTE_SERVER_PROJECT_ID,
425 notification_id: "lsp".to_string(),
426 message: message.clone(),
427 })
428 .log_err();
429 }
430 LspStoreEvent::LanguageServerPrompt(prompt) => {
431 let request = self.session.request(proto::LanguageServerPromptRequest {
432 project_id: REMOTE_SERVER_PROJECT_ID,
433 actions: prompt
434 .actions
435 .iter()
436 .map(|action| action.title.to_string())
437 .collect(),
438 level: Some(prompt_to_proto(prompt)),
439 lsp_name: prompt.lsp_name.clone(),
440 message: prompt.message.clone(),
441 });
442 let prompt = prompt.clone();
443 cx.background_spawn(async move {
444 let response = request.await?;
445 if let Some(action_response) = response.action_response {
446 prompt.respond(action_response as usize).await;
447 }
448 anyhow::Ok(())
449 })
450 .detach();
451 }
452 _ => {}
453 }
454 }
455
456 pub async fn handle_add_worktree(
457 this: Entity<Self>,
458 message: TypedEnvelope<proto::AddWorktree>,
459 mut cx: AsyncApp,
460 ) -> Result<proto::AddWorktreeResponse> {
461 use client::ErrorCodeExt;
462 let fs = this.read_with(&cx, |this, _| this.fs.clone());
463 let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
464
465 let canonicalized = match fs.canonicalize(&path).await {
466 Ok(path) => path,
467 Err(e) => {
468 let mut parent = path
469 .parent()
470 .ok_or(e)
471 .with_context(|| format!("{path:?} does not exist"))?;
472 if parent == Path::new("") {
473 parent = util::paths::home_dir();
474 }
475 let parent = fs.canonicalize(parent).await.map_err(|_| {
476 anyhow!(
477 proto::ErrorCode::DevServerProjectPathDoesNotExist
478 .with_tag("path", path.to_string_lossy().as_ref())
479 )
480 })?;
481 if let Some(file_name) = path.file_name() {
482 parent.join(file_name)
483 } else {
484 parent
485 }
486 }
487 };
488 let next_worktree_id = this
489 .update(&mut cx, |this, cx| {
490 this.worktree_store
491 .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
492 })
493 .await?;
494 let worktree = this
495 .read_with(&cx.clone(), |this, _| {
496 Worktree::local(
497 Arc::from(canonicalized.as_path()),
498 message.payload.visible,
499 this.fs.clone(),
500 this.next_entry_id.clone(),
501 true,
502 next_worktree_id,
503 &mut cx,
504 )
505 })
506 .await?;
507
508 let response = this.read_with(&cx, |_, cx| {
509 let worktree = worktree.read(cx);
510 proto::AddWorktreeResponse {
511 worktree_id: worktree.id().to_proto(),
512 canonicalized_path: canonicalized.to_string_lossy().into_owned(),
513 }
514 });
515
516 // We spawn this asynchronously, so that we can send the response back
517 // *before* `worktree_store.add()` can send out UpdateProject requests
518 // to the client about the new worktree.
519 //
520 // That lets the client manage the reference/handles of the newly-added
521 // worktree, before getting interrupted by an UpdateProject request.
522 //
523 // This fixes the problem of the client sending the AddWorktree request,
524 // headless project sending out a project update, client receiving it
525 // and immediately dropping the reference of the new client, causing it
526 // to be dropped on the headless project, and the client only then
527 // receiving a response to AddWorktree.
528 cx.spawn(async move |cx| {
529 this.update(cx, |this, cx| {
530 this.worktree_store.update(cx, |worktree_store, cx| {
531 worktree_store.add(&worktree, cx);
532 });
533 });
534 })
535 .detach();
536
537 Ok(response)
538 }
539
540 pub async fn handle_remove_worktree(
541 this: Entity<Self>,
542 envelope: TypedEnvelope<proto::RemoveWorktree>,
543 mut cx: AsyncApp,
544 ) -> Result<proto::Ack> {
545 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
546 this.update(&mut cx, |this, cx| {
547 this.worktree_store.update(cx, |worktree_store, cx| {
548 worktree_store.remove_worktree(worktree_id, cx);
549 });
550 });
551 Ok(proto::Ack {})
552 }
553
554 pub async fn handle_open_buffer_by_path(
555 this: Entity<Self>,
556 message: TypedEnvelope<proto::OpenBufferByPath>,
557 mut cx: AsyncApp,
558 ) -> Result<proto::OpenBufferResponse> {
559 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
560 let path = RelPath::from_proto(&message.payload.path)?;
561 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
562 let buffer_store = this.buffer_store.clone();
563 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
564 buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
565 });
566 (buffer_store, buffer)
567 });
568
569 let buffer = buffer.await?;
570 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
571 buffer_store.update(&mut cx, |buffer_store, cx| {
572 buffer_store
573 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
574 .detach_and_log_err(cx);
575 });
576
577 Ok(proto::OpenBufferResponse {
578 buffer_id: buffer_id.to_proto(),
579 })
580 }
581
582 pub async fn handle_open_image_by_path(
583 this: Entity<Self>,
584 message: TypedEnvelope<proto::OpenImageByPath>,
585 mut cx: AsyncApp,
586 ) -> Result<proto::OpenImageResponse> {
587 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
588 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
589 let path = RelPath::from_proto(&message.payload.path)?;
590 let project_id = message.payload.project_id;
591 use proto::create_image_for_peer::Variant;
592
593 let (worktree_store, session) = this.read_with(&cx, |this, _| {
594 (this.worktree_store.clone(), this.session.clone())
595 });
596
597 let worktree = worktree_store
598 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
599 .context("worktree not found")?;
600
601 let load_task = worktree.update(&mut cx, |worktree, cx| {
602 worktree.load_binary_file(path.as_ref(), cx)
603 });
604
605 let loaded_file = load_task.await?;
606 let content = loaded_file.content;
607 let file = loaded_file.file;
608
609 let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
610 let image_id =
611 ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
612
613 let format = image::guess_format(&content)
614 .map(|f| format!("{:?}", f).to_lowercase())
615 .unwrap_or_else(|_| "unknown".to_string());
616
617 let state = proto::ImageState {
618 id: image_id.to_proto(),
619 file: Some(proto_file),
620 content_size: content.len() as u64,
621 format,
622 };
623
624 session.send(proto::CreateImageForPeer {
625 project_id,
626 peer_id: Some(REMOTE_SERVER_PEER_ID),
627 variant: Some(Variant::State(state)),
628 })?;
629
630 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
631 for chunk in content.chunks(CHUNK_SIZE) {
632 session.send(proto::CreateImageForPeer {
633 project_id,
634 peer_id: Some(REMOTE_SERVER_PEER_ID),
635 variant: Some(Variant::Chunk(proto::ImageChunk {
636 image_id: image_id.to_proto(),
637 data: chunk.to_vec(),
638 })),
639 })?;
640 }
641
642 Ok(proto::OpenImageResponse {
643 image_id: image_id.to_proto(),
644 })
645 }
646
647 pub async fn handle_trust_worktrees(
648 this: Entity<Self>,
649 envelope: TypedEnvelope<proto::TrustWorktrees>,
650 mut cx: AsyncApp,
651 ) -> Result<proto::Ack> {
652 let trusted_worktrees = cx
653 .update(|cx| TrustedWorktrees::try_get_global(cx))
654 .context("missing trusted worktrees")?;
655 let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
656 trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
657 trusted_worktrees.trust(
658 &worktree_store,
659 envelope
660 .payload
661 .trusted_paths
662 .into_iter()
663 .filter_map(PathTrust::from_proto)
664 .collect(),
665 cx,
666 );
667 });
668 Ok(proto::Ack {})
669 }
670
671 pub async fn handle_restrict_worktrees(
672 this: Entity<Self>,
673 envelope: TypedEnvelope<proto::RestrictWorktrees>,
674 mut cx: AsyncApp,
675 ) -> Result<proto::Ack> {
676 let trusted_worktrees = cx
677 .update(|cx| TrustedWorktrees::try_get_global(cx))
678 .context("missing trusted worktrees")?;
679 let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
680 trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
681 let restricted_paths = envelope
682 .payload
683 .worktree_ids
684 .into_iter()
685 .map(WorktreeId::from_proto)
686 .map(PathTrust::Worktree)
687 .collect::<HashSet<_>>();
688 trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
689 });
690 Ok(proto::Ack {})
691 }
692
693 pub async fn handle_download_file_by_path(
694 this: Entity<Self>,
695 message: TypedEnvelope<proto::DownloadFileByPath>,
696 mut cx: AsyncApp,
697 ) -> Result<proto::DownloadFileResponse> {
698 log::debug!(
699 "handle_download_file_by_path: received request: {:?}",
700 message.payload
701 );
702
703 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
704 let path = RelPath::from_proto(&message.payload.path)?;
705 let project_id = message.payload.project_id;
706 let file_id = message.payload.file_id;
707 log::debug!(
708 "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
709 worktree_id,
710 path,
711 file_id
712 );
713 use proto::create_file_for_peer::Variant;
714
715 let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
716 .read_with(&cx, |this, _| {
717 (this.worktree_store.clone(), this.session.clone())
718 });
719
720 let worktree = worktree_store
721 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
722 .context("worktree not found")?;
723
724 let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
725 worktree.load_binary_file(path.as_ref(), cx)
726 });
727
728 let downloaded_file = download_task.await?;
729 let content = downloaded_file.content;
730 let file = downloaded_file.file;
731 log::debug!(
732 "handle_download_file_by_path: file loaded, content_size={}",
733 content.len()
734 );
735
736 let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
737 log::debug!(
738 "handle_download_file_by_path: using client-provided file_id={}",
739 file_id
740 );
741
742 let state = proto::FileState {
743 id: file_id,
744 file: Some(proto_file),
745 content_size: content.len() as u64,
746 };
747
748 log::debug!("handle_download_file_by_path: sending State message");
749 session.send(proto::CreateFileForPeer {
750 project_id,
751 peer_id: Some(REMOTE_SERVER_PEER_ID),
752 variant: Some(Variant::State(state)),
753 })?;
754
755 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
756 let num_chunks = content.len().div_ceil(CHUNK_SIZE);
757 log::debug!(
758 "handle_download_file_by_path: sending {} chunks",
759 num_chunks
760 );
761 for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
762 log::trace!(
763 "handle_download_file_by_path: sending chunk {}/{}, size={}",
764 i + 1,
765 num_chunks,
766 chunk.len()
767 );
768 session.send(proto::CreateFileForPeer {
769 project_id,
770 peer_id: Some(REMOTE_SERVER_PEER_ID),
771 variant: Some(Variant::Chunk(proto::FileChunk {
772 file_id,
773 data: chunk.to_vec(),
774 })),
775 })?;
776 }
777
778 log::debug!(
779 "handle_download_file_by_path: returning file_id={}",
780 file_id
781 );
782 Ok(proto::DownloadFileResponse { file_id })
783 }
784
785 pub async fn handle_open_new_buffer(
786 this: Entity<Self>,
787 _message: TypedEnvelope<proto::OpenNewBuffer>,
788 mut cx: AsyncApp,
789 ) -> Result<proto::OpenBufferResponse> {
790 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
791 let buffer_store = this.buffer_store.clone();
792 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
793 buffer_store.create_buffer(None, true, cx)
794 });
795 (buffer_store, buffer)
796 });
797
798 let buffer = buffer.await?;
799 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
800 buffer_store.update(&mut cx, |buffer_store, cx| {
801 buffer_store
802 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
803 .detach_and_log_err(cx);
804 });
805
806 Ok(proto::OpenBufferResponse {
807 buffer_id: buffer_id.to_proto(),
808 })
809 }
810
811 async fn handle_toggle_lsp_logs(
812 _: Entity<Self>,
813 envelope: TypedEnvelope<proto::ToggleLspLogs>,
814 cx: AsyncApp,
815 ) -> Result<()> {
816 let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
817 cx.update(|cx| {
818 let log_store = cx
819 .try_global::<GlobalLogStore>()
820 .map(|global_log_store| global_log_store.0.clone())
821 .context("lsp logs store is missing")?;
822 let toggled_log_kind =
823 match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
824 .context("invalid log type")?
825 {
826 proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
827 proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
828 proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
829 };
830 log_store.update(cx, |log_store, _| {
831 log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
832 });
833 anyhow::Ok(())
834 })?;
835
836 Ok(())
837 }
838
839 async fn handle_open_server_settings(
840 this: Entity<Self>,
841 _: TypedEnvelope<proto::OpenServerSettings>,
842 mut cx: AsyncApp,
843 ) -> Result<proto::OpenBufferResponse> {
844 let settings_path = paths::settings_file();
845 let (worktree, path) = this
846 .update(&mut cx, |this, cx| {
847 this.worktree_store.update(cx, |worktree_store, cx| {
848 worktree_store.find_or_create_worktree(settings_path, false, cx)
849 })
850 })
851 .await?;
852
853 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
854 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
855 buffer_store.open_buffer(
856 ProjectPath {
857 worktree_id: worktree.read(cx).id(),
858 path,
859 },
860 cx,
861 )
862 });
863
864 (buffer, this.buffer_store.clone())
865 });
866
867 let buffer = buffer.await?;
868
869 let buffer_id = cx.update(|cx| {
870 if buffer.read(cx).is_empty() {
871 buffer.update(cx, |buffer, cx| {
872 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
873 });
874 }
875
876 let buffer_id = buffer.read(cx).remote_id();
877
878 buffer_store.update(cx, |buffer_store, cx| {
879 buffer_store
880 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
881 .detach_and_log_err(cx);
882 });
883
884 buffer_id
885 });
886
887 Ok(proto::OpenBufferResponse {
888 buffer_id: buffer_id.to_proto(),
889 })
890 }
891
892 async fn handle_find_search_candidates(
893 this: Entity<Self>,
894 envelope: TypedEnvelope<proto::FindSearchCandidates>,
895 mut cx: AsyncApp,
896 ) -> Result<proto::Ack> {
897 use futures::stream::StreamExt as _;
898
899 let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
900 let message = envelope.payload;
901 let query = SearchQuery::from_proto(
902 message.query.context("missing query field")?,
903 PathStyle::local(),
904 )?;
905
906 let project_id = message.project_id;
907 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
908 let handle = message.handle;
909 let _buffer_store = buffer_store.clone();
910 let client = this.read_with(&cx, |this, _| this.session.clone());
911 let task = cx.spawn(async move |cx| {
912 let results = this.update(cx, |this, cx| {
913 project::Search::local(
914 this.fs.clone(),
915 this.buffer_store.clone(),
916 this.worktree_store.clone(),
917 message.limit as _,
918 cx,
919 )
920 .into_handle(query, cx)
921 .matching_buffers(cx)
922 });
923 let (batcher, batches) =
924 project::project_search::AdaptiveBatcher::new(cx.background_executor());
925 let mut new_matches = Box::pin(results.rx);
926
927 let sender_task = cx.background_executor().spawn({
928 let client = client.clone();
929 async move {
930 let mut batches = std::pin::pin!(batches);
931 while let Some(buffer_ids) = batches.next().await {
932 client
933 .request(proto::FindSearchCandidatesChunk {
934 handle,
935 peer_id: Some(peer_id),
936 project_id,
937 variant: Some(
938 proto::find_search_candidates_chunk::Variant::Matches(
939 proto::FindSearchCandidatesMatches { buffer_ids },
940 ),
941 ),
942 })
943 .await?;
944 }
945 anyhow::Ok(())
946 }
947 });
948
949 while let Some(buffer) = new_matches.next().await {
950 let _ = buffer_store
951 .update(cx, |this, cx| {
952 this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
953 })
954 .await;
955 let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
956 batcher.push(buffer_id).await;
957 }
958 batcher.flush().await;
959
960 sender_task.await?;
961
962 client
963 .request(proto::FindSearchCandidatesChunk {
964 handle,
965 peer_id: Some(peer_id),
966 project_id,
967 variant: Some(proto::find_search_candidates_chunk::Variant::Done(
968 proto::FindSearchCandidatesDone {},
969 )),
970 })
971 .await?;
972 anyhow::Ok(())
973 });
974 _buffer_store.update(&mut cx, |this, _| {
975 this.register_ongoing_project_search((peer_id, handle), task);
976 });
977
978 Ok(proto::Ack {})
979 }
980
981 // Goes from client to host.
982 async fn handle_find_search_candidates_cancel(
983 this: Entity<Self>,
984 envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
985 mut cx: AsyncApp,
986 ) -> Result<()> {
987 let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
988 BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
989 }
990
991 async fn handle_list_remote_directory(
992 this: Entity<Self>,
993 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
994 cx: AsyncApp,
995 ) -> Result<proto::ListRemoteDirectoryResponse> {
996 use smol::stream::StreamExt;
997 let fs = cx.read_entity(&this, |this, _| this.fs.clone());
998 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
999 let check_info = envelope
1000 .payload
1001 .config
1002 .as_ref()
1003 .is_some_and(|config| config.is_dir);
1004
1005 let mut entries = Vec::new();
1006 let mut entry_info = Vec::new();
1007 let mut response = fs.read_dir(&expanded).await?;
1008 while let Some(path) = response.next().await {
1009 let path = path?;
1010 if let Some(file_name) = path.file_name() {
1011 entries.push(file_name.to_string_lossy().into_owned());
1012 if check_info {
1013 let is_dir = fs.is_dir(&path).await;
1014 entry_info.push(proto::EntryInfo { is_dir });
1015 }
1016 }
1017 }
1018 Ok(proto::ListRemoteDirectoryResponse {
1019 entries,
1020 entry_info,
1021 })
1022 }
1023
1024 async fn handle_get_path_metadata(
1025 this: Entity<Self>,
1026 envelope: TypedEnvelope<proto::GetPathMetadata>,
1027 cx: AsyncApp,
1028 ) -> Result<proto::GetPathMetadataResponse> {
1029 let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1030 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1031
1032 let metadata = fs.metadata(&expanded).await?;
1033 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1034
1035 Ok(proto::GetPathMetadataResponse {
1036 exists: metadata.is_some(),
1037 is_dir,
1038 path: expanded.to_string_lossy().into_owned(),
1039 })
1040 }
1041
1042 async fn handle_shutdown_remote_server(
1043 _this: Entity<Self>,
1044 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1045 cx: AsyncApp,
1046 ) -> Result<proto::Ack> {
1047 cx.spawn(async move |cx| {
1048 cx.update(|cx| {
1049 // TODO: This is a hack, because in a headless project, shutdown isn't executed
1050 // when calling quit, but it should be.
1051 cx.shutdown();
1052 cx.quit();
1053 })
1054 })
1055 .detach();
1056
1057 Ok(proto::Ack {})
1058 }
1059
1060 pub async fn handle_ping(
1061 _this: Entity<Self>,
1062 _envelope: TypedEnvelope<proto::Ping>,
1063 _cx: AsyncApp,
1064 ) -> Result<proto::Ack> {
1065 log::debug!("Received ping from client");
1066 Ok(proto::Ack {})
1067 }
1068
1069 async fn handle_get_processes(
1070 _this: Entity<Self>,
1071 _envelope: TypedEnvelope<proto::GetProcesses>,
1072 _cx: AsyncApp,
1073 ) -> Result<proto::GetProcessesResponse> {
1074 let mut processes = Vec::new();
1075 let refresh_kind = RefreshKind::nothing().with_processes(
1076 ProcessRefreshKind::nothing()
1077 .without_tasks()
1078 .with_cmd(UpdateKind::Always),
1079 );
1080
1081 for process in System::new_with_specifics(refresh_kind)
1082 .processes()
1083 .values()
1084 {
1085 let name = process.name().to_string_lossy().into_owned();
1086 let command = process
1087 .cmd()
1088 .iter()
1089 .map(|s| s.to_string_lossy().into_owned())
1090 .collect::<Vec<_>>();
1091
1092 processes.push(proto::ProcessInfo {
1093 pid: process.pid().as_u32(),
1094 name,
1095 command,
1096 });
1097 }
1098
1099 processes.sort_by_key(|p| p.name.clone());
1100
1101 Ok(proto::GetProcessesResponse { processes })
1102 }
1103
1104 async fn handle_get_directory_environment(
1105 this: Entity<Self>,
1106 envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1107 mut cx: AsyncApp,
1108 ) -> Result<proto::DirectoryEnvironment> {
1109 let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1110 let directory = PathBuf::from(envelope.payload.directory);
1111 let environment = this
1112 .update(&mut cx, |this, cx| {
1113 this.environment.update(cx, |environment, cx| {
1114 environment.local_directory_environment(&shell, directory.into(), cx)
1115 })
1116 })
1117 .await
1118 .context("failed to get directory environment")?
1119 .into_iter()
1120 .collect();
1121 Ok(proto::DirectoryEnvironment { environment })
1122 }
1123}
1124
1125fn prompt_to_proto(
1126 prompt: &project::LanguageServerPromptRequest,
1127) -> proto::language_server_prompt_request::Level {
1128 match prompt.level {
1129 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1130 proto::language_server_prompt_request::Info {},
1131 ),
1132 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1133 proto::language_server_prompt_request::Warning {},
1134 ),
1135 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1136 proto::language_server_prompt_request::Critical {},
1137 ),
1138 }
1139}