1use anyhow::{Context as _, Result, anyhow};
2use client::ProjectId;
3use collections::HashSet;
4use language::File;
5use lsp::LanguageServerId;
6
7use extension::ExtensionHostProxy;
8use extension_host::headless_host::HeadlessExtensionStore;
9use fs::Fs;
10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
11use http_client::HttpClient;
12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
13use node_runtime::NodeRuntime;
14use project::{
15 LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
16 ToolchainStore, WorktreeId,
17 agent_server_store::AgentServerStore,
18 buffer_store::{BufferStore, BufferStoreEvent},
19 context_server_store::ContextServerStore,
20 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
21 git_store::GitStore,
22 image_store::ImageId,
23 lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
24 project_settings::SettingsObserver,
25 search::SearchQuery,
26 task_store::TaskStore,
27 trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
28 worktree_store::{WorktreeIdCounter, WorktreeStore},
29};
30use rpc::{
31 AnyProtoClient, TypedEnvelope,
32 proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
33};
34
35use settings::initial_server_settings_content;
36use std::{
37 num::NonZeroU64,
38 path::{Path, PathBuf},
39 sync::{
40 Arc,
41 atomic::{AtomicU64, AtomicUsize, Ordering},
42 },
43};
44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
46use worktree::Worktree;
47
48pub struct HeadlessProject {
49 pub fs: Arc<dyn Fs>,
50 pub session: AnyProtoClient,
51 pub worktree_store: Entity<WorktreeStore>,
52 pub buffer_store: Entity<BufferStore>,
53 pub lsp_store: Entity<LspStore>,
54 pub task_store: Entity<TaskStore>,
55 pub dap_store: Entity<DapStore>,
56 pub breakpoint_store: Entity<BreakpointStore>,
57 pub agent_server_store: Entity<AgentServerStore>,
58 pub context_server_store: Entity<ContextServerStore>,
59 pub settings_observer: Entity<SettingsObserver>,
60 pub next_entry_id: Arc<AtomicUsize>,
61 pub languages: Arc<LanguageRegistry>,
62 pub extensions: Entity<HeadlessExtensionStore>,
63 pub git_store: Entity<GitStore>,
64 pub environment: Entity<ProjectEnvironment>,
65 // Used mostly to keep alive the toolchain store for RPC handlers.
66 // Local variant is used within LSP store, but that's a separate entity.
67 pub _toolchain_store: Entity<ToolchainStore>,
68}
69
70pub struct HeadlessAppState {
71 pub session: AnyProtoClient,
72 pub fs: Arc<dyn Fs>,
73 pub http_client: Arc<dyn HttpClient>,
74 pub node_runtime: NodeRuntime,
75 pub languages: Arc<LanguageRegistry>,
76 pub extension_host_proxy: Arc<ExtensionHostProxy>,
77}
78
79impl HeadlessProject {
80 pub fn init(cx: &mut App) {
81 settings::init(cx);
82 log_store::init(true, cx);
83 }
84
85 pub fn new(
86 HeadlessAppState {
87 session,
88 fs,
89 http_client,
90 node_runtime,
91 languages,
92 extension_host_proxy: proxy,
93 }: HeadlessAppState,
94 init_worktree_trust: bool,
95 cx: &mut Context<Self>,
96 ) -> Self {
97 debug_adapter_extension::init(proxy.clone(), cx);
98 languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
99
100 let worktree_store = cx.new(|cx| {
101 let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
102 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
103 store
104 });
105
106 if init_worktree_trust {
107 project::trusted_worktrees::track_worktree_trust(
108 worktree_store.clone(),
109 None::<RemoteHostLocation>,
110 Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
111 None,
112 cx,
113 );
114 }
115
116 let environment =
117 cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
118 let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
119 let toolchain_store = cx.new(|cx| {
120 ToolchainStore::local(
121 languages.clone(),
122 worktree_store.clone(),
123 environment.clone(),
124 manifest_tree.clone(),
125 fs.clone(),
126 cx,
127 )
128 });
129
130 let buffer_store = cx.new(|cx| {
131 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
132 buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
133 buffer_store
134 });
135
136 let breakpoint_store = cx.new(|_| {
137 let mut breakpoint_store =
138 BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
139 breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
140
141 breakpoint_store
142 });
143
144 let dap_store = cx.new(|cx| {
145 let mut dap_store = DapStore::new_local(
146 http_client.clone(),
147 node_runtime.clone(),
148 fs.clone(),
149 environment.clone(),
150 toolchain_store.read(cx).as_language_toolchain_store(),
151 worktree_store.clone(),
152 breakpoint_store.clone(),
153 true,
154 cx,
155 );
156 dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
157 dap_store
158 });
159
160 let git_store = cx.new(|cx| {
161 let mut store = GitStore::local(
162 &worktree_store,
163 buffer_store.clone(),
164 environment.clone(),
165 fs.clone(),
166 cx,
167 );
168 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
169 store
170 });
171
172 let prettier_store = cx.new(|cx| {
173 PrettierStore::new(
174 node_runtime.clone(),
175 fs.clone(),
176 languages.clone(),
177 worktree_store.clone(),
178 cx,
179 )
180 });
181
182 let task_store = cx.new(|cx| {
183 let mut task_store = TaskStore::local(
184 buffer_store.downgrade(),
185 worktree_store.clone(),
186 toolchain_store.read(cx).as_language_toolchain_store(),
187 environment.clone(),
188 cx,
189 );
190 task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
191 task_store
192 });
193 let settings_observer = cx.new(|cx| {
194 let mut observer = SettingsObserver::new_local(
195 fs.clone(),
196 worktree_store.clone(),
197 task_store.clone(),
198 true,
199 cx,
200 );
201 observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
202 observer
203 });
204
205 let lsp_store = cx.new(|cx| {
206 let mut lsp_store = LspStore::new_local(
207 buffer_store.clone(),
208 worktree_store.clone(),
209 prettier_store.clone(),
210 toolchain_store
211 .read(cx)
212 .as_local_store()
213 .expect("Toolchain store to be local")
214 .clone(),
215 environment.clone(),
216 manifest_tree,
217 languages.clone(),
218 http_client.clone(),
219 fs.clone(),
220 cx,
221 );
222 lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
223 lsp_store
224 });
225
226 let agent_server_store = cx.new(|cx| {
227 let mut agent_server_store = AgentServerStore::local(
228 node_runtime.clone(),
229 fs.clone(),
230 environment.clone(),
231 http_client.clone(),
232 cx,
233 );
234 agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
235 agent_server_store
236 });
237
238 let context_server_store = cx.new(|cx| {
239 let mut context_server_store =
240 ContextServerStore::local(worktree_store.clone(), None, true, cx);
241 context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
242 context_server_store
243 });
244
245 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
246 language_extension::init(
247 language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
248 proxy.clone(),
249 languages.clone(),
250 );
251
252 cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
253 if let BufferStoreEvent::BufferAdded(buffer) = event {
254 cx.subscribe(buffer, Self::on_buffer_event).detach();
255 }
256 })
257 .detach();
258
259 let extensions = HeadlessExtensionStore::new(
260 fs.clone(),
261 http_client.clone(),
262 paths::remote_extensions_dir().to_path_buf(),
263 proxy,
264 node_runtime,
265 cx,
266 );
267
268 // local_machine -> ssh handlers
269 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
270 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
271 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
272 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
273 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
274 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
275 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
276 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
277 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
278 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
279 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
280 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
281
282 session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
283 session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
284 session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
285 session.add_request_handler(cx.weak_entity(), Self::handle_ping);
286 session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
287
288 session.add_entity_request_handler(Self::handle_add_worktree);
289 session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
290
291 session.add_entity_request_handler(Self::handle_open_buffer_by_path);
292 session.add_entity_request_handler(Self::handle_open_new_buffer);
293 session.add_entity_request_handler(Self::handle_find_search_candidates);
294 session.add_entity_request_handler(Self::handle_open_server_settings);
295 session.add_entity_request_handler(Self::handle_get_directory_environment);
296 session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
297 session.add_entity_request_handler(Self::handle_open_image_by_path);
298 session.add_entity_request_handler(Self::handle_trust_worktrees);
299 session.add_entity_request_handler(Self::handle_restrict_worktrees);
300 session.add_entity_request_handler(Self::handle_download_file_by_path);
301
302 session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
303 session.add_entity_request_handler(BufferStore::handle_update_buffer);
304 session.add_entity_message_handler(BufferStore::handle_close_buffer);
305
306 session.add_request_handler(
307 extensions.downgrade(),
308 HeadlessExtensionStore::handle_sync_extensions,
309 );
310 session.add_request_handler(
311 extensions.downgrade(),
312 HeadlessExtensionStore::handle_install_extension,
313 );
314
315 BufferStore::init(&session);
316 WorktreeStore::init(&session);
317 SettingsObserver::init(&session);
318 LspStore::init(&session);
319 TaskStore::init(Some(&session));
320 ToolchainStore::init(&session);
321 DapStore::init(&session, cx);
322 // todo(debugger): Re init breakpoint store when we set it up for collab
323 BreakpointStore::init(&session);
324 GitStore::init(&session);
325 AgentServerStore::init_headless(&session);
326 ContextServerStore::init_headless(&session);
327
328 HeadlessProject {
329 next_entry_id: Default::default(),
330 session,
331 settings_observer,
332 fs,
333 worktree_store,
334 buffer_store,
335 lsp_store,
336 task_store,
337 dap_store,
338 breakpoint_store,
339 agent_server_store,
340 context_server_store,
341 languages,
342 extensions,
343 git_store,
344 environment,
345 _toolchain_store: toolchain_store,
346 }
347 }
348
349 fn on_buffer_event(
350 &mut self,
351 buffer: Entity<Buffer>,
352 event: &BufferEvent,
353 cx: &mut Context<Self>,
354 ) {
355 if let BufferEvent::Operation {
356 operation,
357 is_local: true,
358 } = event
359 {
360 cx.background_spawn(self.session.request(proto::UpdateBuffer {
361 project_id: REMOTE_SERVER_PROJECT_ID,
362 buffer_id: buffer.read(cx).remote_id().to_proto(),
363 operations: vec![serialize_operation(operation)],
364 }))
365 .detach()
366 }
367 }
368
369 fn on_lsp_store_event(
370 &mut self,
371 lsp_store: Entity<LspStore>,
372 event: &LspStoreEvent,
373 cx: &mut Context<Self>,
374 ) {
375 match event {
376 LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
377 let log_store = cx
378 .try_global::<GlobalLogStore>()
379 .map(|lsp_logs| lsp_logs.0.clone());
380 if let Some(log_store) = log_store {
381 log_store.update(cx, |log_store, cx| {
382 log_store.add_language_server(
383 LanguageServerKind::LocalSsh {
384 lsp_store: self.lsp_store.downgrade(),
385 },
386 *id,
387 Some(name.clone()),
388 *worktree_id,
389 lsp_store.read(cx).language_server_for_id(*id),
390 cx,
391 );
392 });
393 }
394 }
395 LspStoreEvent::LanguageServerRemoved(id) => {
396 let log_store = cx
397 .try_global::<GlobalLogStore>()
398 .map(|lsp_logs| lsp_logs.0.clone());
399 if let Some(log_store) = log_store {
400 log_store.update(cx, |log_store, cx| {
401 log_store.remove_language_server(*id, cx);
402 });
403 }
404 }
405 LspStoreEvent::LanguageServerUpdate {
406 language_server_id,
407 name,
408 message,
409 } => {
410 self.session
411 .send(proto::UpdateLanguageServer {
412 project_id: REMOTE_SERVER_PROJECT_ID,
413 server_name: name.as_ref().map(|name| name.to_string()),
414 language_server_id: language_server_id.to_proto(),
415 variant: Some(message.clone()),
416 })
417 .log_err();
418 }
419 LspStoreEvent::Notification(message) => {
420 self.session
421 .send(proto::Toast {
422 project_id: REMOTE_SERVER_PROJECT_ID,
423 notification_id: "lsp".to_string(),
424 message: message.clone(),
425 })
426 .log_err();
427 }
428 LspStoreEvent::LanguageServerPrompt(prompt) => {
429 let request = self.session.request(proto::LanguageServerPromptRequest {
430 project_id: REMOTE_SERVER_PROJECT_ID,
431 actions: prompt
432 .actions
433 .iter()
434 .map(|action| action.title.to_string())
435 .collect(),
436 level: Some(prompt_to_proto(prompt)),
437 lsp_name: prompt.lsp_name.clone(),
438 message: prompt.message.clone(),
439 });
440 let prompt = prompt.clone();
441 cx.background_spawn(async move {
442 let response = request.await?;
443 if let Some(action_response) = response.action_response {
444 prompt.respond(action_response as usize).await;
445 }
446 anyhow::Ok(())
447 })
448 .detach();
449 }
450 _ => {}
451 }
452 }
453
454 pub async fn handle_add_worktree(
455 this: Entity<Self>,
456 message: TypedEnvelope<proto::AddWorktree>,
457 mut cx: AsyncApp,
458 ) -> Result<proto::AddWorktreeResponse> {
459 use client::ErrorCodeExt;
460 let fs = this.read_with(&cx, |this, _| this.fs.clone());
461 let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
462
463 let canonicalized = match fs.canonicalize(&path).await {
464 Ok(path) => path,
465 Err(e) => {
466 let mut parent = path
467 .parent()
468 .ok_or(e)
469 .with_context(|| format!("{path:?} does not exist"))?;
470 if parent == Path::new("") {
471 parent = util::paths::home_dir();
472 }
473 let parent = fs.canonicalize(parent).await.map_err(|_| {
474 anyhow!(
475 proto::ErrorCode::DevServerProjectPathDoesNotExist
476 .with_tag("path", path.to_string_lossy().as_ref())
477 )
478 })?;
479 if let Some(file_name) = path.file_name() {
480 parent.join(file_name)
481 } else {
482 parent
483 }
484 }
485 };
486 let next_worktree_id = this
487 .update(&mut cx, |this, cx| {
488 this.worktree_store
489 .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
490 })
491 .await?;
492 let worktree = this
493 .read_with(&cx.clone(), |this, _| {
494 Worktree::local(
495 Arc::from(canonicalized.as_path()),
496 message.payload.visible,
497 this.fs.clone(),
498 this.next_entry_id.clone(),
499 true,
500 next_worktree_id,
501 &mut cx,
502 )
503 })
504 .await?;
505
506 let response = this.read_with(&cx, |_, cx| {
507 let worktree = worktree.read(cx);
508 proto::AddWorktreeResponse {
509 worktree_id: worktree.id().to_proto(),
510 canonicalized_path: canonicalized.to_string_lossy().into_owned(),
511 }
512 });
513
514 // We spawn this asynchronously, so that we can send the response back
515 // *before* `worktree_store.add()` can send out UpdateProject requests
516 // to the client about the new worktree.
517 //
518 // That lets the client manage the reference/handles of the newly-added
519 // worktree, before getting interrupted by an UpdateProject request.
520 //
521 // This fixes the problem of the client sending the AddWorktree request,
522 // headless project sending out a project update, client receiving it
523 // and immediately dropping the reference of the new client, causing it
524 // to be dropped on the headless project, and the client only then
525 // receiving a response to AddWorktree.
526 cx.spawn(async move |cx| {
527 this.update(cx, |this, cx| {
528 this.worktree_store.update(cx, |worktree_store, cx| {
529 worktree_store.add(&worktree, cx);
530 });
531 });
532 })
533 .detach();
534
535 Ok(response)
536 }
537
538 pub async fn handle_remove_worktree(
539 this: Entity<Self>,
540 envelope: TypedEnvelope<proto::RemoveWorktree>,
541 mut cx: AsyncApp,
542 ) -> Result<proto::Ack> {
543 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
544 this.update(&mut cx, |this, cx| {
545 this.worktree_store.update(cx, |worktree_store, cx| {
546 worktree_store.remove_worktree(worktree_id, cx);
547 });
548 });
549 Ok(proto::Ack {})
550 }
551
552 pub async fn handle_open_buffer_by_path(
553 this: Entity<Self>,
554 message: TypedEnvelope<proto::OpenBufferByPath>,
555 mut cx: AsyncApp,
556 ) -> Result<proto::OpenBufferResponse> {
557 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
558 let path = RelPath::from_proto(&message.payload.path)?;
559 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
560 let buffer_store = this.buffer_store.clone();
561 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
562 buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
563 });
564 (buffer_store, buffer)
565 });
566
567 let buffer = buffer.await?;
568 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
569 buffer_store.update(&mut cx, |buffer_store, cx| {
570 buffer_store
571 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
572 .detach_and_log_err(cx);
573 });
574
575 Ok(proto::OpenBufferResponse {
576 buffer_id: buffer_id.to_proto(),
577 })
578 }
579
580 pub async fn handle_open_image_by_path(
581 this: Entity<Self>,
582 message: TypedEnvelope<proto::OpenImageByPath>,
583 mut cx: AsyncApp,
584 ) -> Result<proto::OpenImageResponse> {
585 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
586 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
587 let path = RelPath::from_proto(&message.payload.path)?;
588 let project_id = message.payload.project_id;
589 use proto::create_image_for_peer::Variant;
590
591 let (worktree_store, session) = this.read_with(&cx, |this, _| {
592 (this.worktree_store.clone(), this.session.clone())
593 });
594
595 let worktree = worktree_store
596 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
597 .context("worktree not found")?;
598
599 let load_task = worktree.update(&mut cx, |worktree, cx| {
600 worktree.load_binary_file(path.as_ref(), cx)
601 });
602
603 let loaded_file = load_task.await?;
604 let content = loaded_file.content;
605 let file = loaded_file.file;
606
607 let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
608 let image_id =
609 ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
610
611 let format = image::guess_format(&content)
612 .map(|f| format!("{:?}", f).to_lowercase())
613 .unwrap_or_else(|_| "unknown".to_string());
614
615 let state = proto::ImageState {
616 id: image_id.to_proto(),
617 file: Some(proto_file),
618 content_size: content.len() as u64,
619 format,
620 };
621
622 session.send(proto::CreateImageForPeer {
623 project_id,
624 peer_id: Some(REMOTE_SERVER_PEER_ID),
625 variant: Some(Variant::State(state)),
626 })?;
627
628 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
629 for chunk in content.chunks(CHUNK_SIZE) {
630 session.send(proto::CreateImageForPeer {
631 project_id,
632 peer_id: Some(REMOTE_SERVER_PEER_ID),
633 variant: Some(Variant::Chunk(proto::ImageChunk {
634 image_id: image_id.to_proto(),
635 data: chunk.to_vec(),
636 })),
637 })?;
638 }
639
640 Ok(proto::OpenImageResponse {
641 image_id: image_id.to_proto(),
642 })
643 }
644
645 pub async fn handle_trust_worktrees(
646 this: Entity<Self>,
647 envelope: TypedEnvelope<proto::TrustWorktrees>,
648 mut cx: AsyncApp,
649 ) -> Result<proto::Ack> {
650 let trusted_worktrees = cx
651 .update(|cx| TrustedWorktrees::try_get_global(cx))
652 .context("missing trusted worktrees")?;
653 let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
654 trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
655 trusted_worktrees.trust(
656 &worktree_store,
657 envelope
658 .payload
659 .trusted_paths
660 .into_iter()
661 .filter_map(PathTrust::from_proto)
662 .collect(),
663 cx,
664 );
665 });
666 Ok(proto::Ack {})
667 }
668
669 pub async fn handle_restrict_worktrees(
670 this: Entity<Self>,
671 envelope: TypedEnvelope<proto::RestrictWorktrees>,
672 mut cx: AsyncApp,
673 ) -> Result<proto::Ack> {
674 let trusted_worktrees = cx
675 .update(|cx| TrustedWorktrees::try_get_global(cx))
676 .context("missing trusted worktrees")?;
677 let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
678 trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
679 let restricted_paths = envelope
680 .payload
681 .worktree_ids
682 .into_iter()
683 .map(WorktreeId::from_proto)
684 .map(PathTrust::Worktree)
685 .collect::<HashSet<_>>();
686 trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
687 });
688 Ok(proto::Ack {})
689 }
690
691 pub async fn handle_download_file_by_path(
692 this: Entity<Self>,
693 message: TypedEnvelope<proto::DownloadFileByPath>,
694 mut cx: AsyncApp,
695 ) -> Result<proto::DownloadFileResponse> {
696 log::debug!(
697 "handle_download_file_by_path: received request: {:?}",
698 message.payload
699 );
700
701 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
702 let path = RelPath::from_proto(&message.payload.path)?;
703 let project_id = message.payload.project_id;
704 let file_id = message.payload.file_id;
705 log::debug!(
706 "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
707 worktree_id,
708 path,
709 file_id
710 );
711 use proto::create_file_for_peer::Variant;
712
713 let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
714 .read_with(&cx, |this, _| {
715 (this.worktree_store.clone(), this.session.clone())
716 });
717
718 let worktree = worktree_store
719 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
720 .context("worktree not found")?;
721
722 let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
723 worktree.load_binary_file(path.as_ref(), cx)
724 });
725
726 let downloaded_file = download_task.await?;
727 let content = downloaded_file.content;
728 let file = downloaded_file.file;
729 log::debug!(
730 "handle_download_file_by_path: file loaded, content_size={}",
731 content.len()
732 );
733
734 let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
735 log::debug!(
736 "handle_download_file_by_path: using client-provided file_id={}",
737 file_id
738 );
739
740 let state = proto::FileState {
741 id: file_id,
742 file: Some(proto_file),
743 content_size: content.len() as u64,
744 };
745
746 log::debug!("handle_download_file_by_path: sending State message");
747 session.send(proto::CreateFileForPeer {
748 project_id,
749 peer_id: Some(REMOTE_SERVER_PEER_ID),
750 variant: Some(Variant::State(state)),
751 })?;
752
753 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
754 let num_chunks = content.len().div_ceil(CHUNK_SIZE);
755 log::debug!(
756 "handle_download_file_by_path: sending {} chunks",
757 num_chunks
758 );
759 for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
760 log::trace!(
761 "handle_download_file_by_path: sending chunk {}/{}, size={}",
762 i + 1,
763 num_chunks,
764 chunk.len()
765 );
766 session.send(proto::CreateFileForPeer {
767 project_id,
768 peer_id: Some(REMOTE_SERVER_PEER_ID),
769 variant: Some(Variant::Chunk(proto::FileChunk {
770 file_id,
771 data: chunk.to_vec(),
772 })),
773 })?;
774 }
775
776 log::debug!(
777 "handle_download_file_by_path: returning file_id={}",
778 file_id
779 );
780 Ok(proto::DownloadFileResponse { file_id })
781 }
782
783 pub async fn handle_open_new_buffer(
784 this: Entity<Self>,
785 _message: TypedEnvelope<proto::OpenNewBuffer>,
786 mut cx: AsyncApp,
787 ) -> Result<proto::OpenBufferResponse> {
788 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
789 let buffer_store = this.buffer_store.clone();
790 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
791 buffer_store.create_buffer(None, true, cx)
792 });
793 (buffer_store, buffer)
794 });
795
796 let buffer = buffer.await?;
797 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
798 buffer_store.update(&mut cx, |buffer_store, cx| {
799 buffer_store
800 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
801 .detach_and_log_err(cx);
802 });
803
804 Ok(proto::OpenBufferResponse {
805 buffer_id: buffer_id.to_proto(),
806 })
807 }
808
809 async fn handle_toggle_lsp_logs(
810 _: Entity<Self>,
811 envelope: TypedEnvelope<proto::ToggleLspLogs>,
812 cx: AsyncApp,
813 ) -> Result<()> {
814 let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
815 cx.update(|cx| {
816 let log_store = cx
817 .try_global::<GlobalLogStore>()
818 .map(|global_log_store| global_log_store.0.clone())
819 .context("lsp logs store is missing")?;
820 let toggled_log_kind =
821 match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
822 .context("invalid log type")?
823 {
824 proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
825 proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
826 proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
827 };
828 log_store.update(cx, |log_store, _| {
829 log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
830 });
831 anyhow::Ok(())
832 })?;
833
834 Ok(())
835 }
836
837 async fn handle_open_server_settings(
838 this: Entity<Self>,
839 _: TypedEnvelope<proto::OpenServerSettings>,
840 mut cx: AsyncApp,
841 ) -> Result<proto::OpenBufferResponse> {
842 let settings_path = paths::settings_file();
843 let (worktree, path) = this
844 .update(&mut cx, |this, cx| {
845 this.worktree_store.update(cx, |worktree_store, cx| {
846 worktree_store.find_or_create_worktree(settings_path, false, cx)
847 })
848 })
849 .await?;
850
851 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
852 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
853 buffer_store.open_buffer(
854 ProjectPath {
855 worktree_id: worktree.read(cx).id(),
856 path,
857 },
858 cx,
859 )
860 });
861
862 (buffer, this.buffer_store.clone())
863 });
864
865 let buffer = buffer.await?;
866
867 let buffer_id = cx.update(|cx| {
868 if buffer.read(cx).is_empty() {
869 buffer.update(cx, |buffer, cx| {
870 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
871 });
872 }
873
874 let buffer_id = buffer.read(cx).remote_id();
875
876 buffer_store.update(cx, |buffer_store, cx| {
877 buffer_store
878 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
879 .detach_and_log_err(cx);
880 });
881
882 buffer_id
883 });
884
885 Ok(proto::OpenBufferResponse {
886 buffer_id: buffer_id.to_proto(),
887 })
888 }
889
890 async fn handle_find_search_candidates(
891 this: Entity<Self>,
892 envelope: TypedEnvelope<proto::FindSearchCandidates>,
893 mut cx: AsyncApp,
894 ) -> Result<proto::Ack> {
895 use futures::stream::StreamExt as _;
896
897 let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
898 let message = envelope.payload;
899 let query = SearchQuery::from_proto(
900 message.query.context("missing query field")?,
901 PathStyle::local(),
902 )?;
903
904 let project_id = message.project_id;
905 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
906 let handle = message.handle;
907 let _buffer_store = buffer_store.clone();
908 let client = this.read_with(&cx, |this, _| this.session.clone());
909 let task = cx.spawn(async move |cx| {
910 let results = this.update(cx, |this, cx| {
911 project::Search::local(
912 this.fs.clone(),
913 this.buffer_store.clone(),
914 this.worktree_store.clone(),
915 message.limit as _,
916 cx,
917 )
918 .into_handle(query, cx)
919 .matching_buffers(cx)
920 });
921 let (batcher, batches) =
922 project::project_search::AdaptiveBatcher::new(cx.background_executor());
923 let mut new_matches = Box::pin(results.rx);
924
925 let sender_task = cx.background_executor().spawn({
926 let client = client.clone();
927 async move {
928 let mut batches = std::pin::pin!(batches);
929 while let Some(buffer_ids) = batches.next().await {
930 client
931 .request(proto::FindSearchCandidatesChunk {
932 handle,
933 peer_id: Some(peer_id),
934 project_id,
935 variant: Some(
936 proto::find_search_candidates_chunk::Variant::Matches(
937 proto::FindSearchCandidatesMatches { buffer_ids },
938 ),
939 ),
940 })
941 .await?;
942 }
943 anyhow::Ok(())
944 }
945 });
946
947 while let Some(buffer) = new_matches.next().await {
948 let _ = buffer_store
949 .update(cx, |this, cx| {
950 this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
951 })
952 .await;
953 let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
954 batcher.push(buffer_id).await;
955 }
956 batcher.flush().await;
957
958 sender_task.await?;
959
960 client
961 .request(proto::FindSearchCandidatesChunk {
962 handle,
963 peer_id: Some(peer_id),
964 project_id,
965 variant: Some(proto::find_search_candidates_chunk::Variant::Done(
966 proto::FindSearchCandidatesDone {},
967 )),
968 })
969 .await?;
970 anyhow::Ok(())
971 });
972 _buffer_store.update(&mut cx, |this, _| {
973 this.register_ongoing_project_search((peer_id, handle), task);
974 });
975
976 Ok(proto::Ack {})
977 }
978
979 // Goes from client to host.
980 async fn handle_find_search_candidates_cancel(
981 this: Entity<Self>,
982 envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
983 mut cx: AsyncApp,
984 ) -> Result<()> {
985 let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
986 BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
987 }
988
989 async fn handle_list_remote_directory(
990 this: Entity<Self>,
991 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
992 cx: AsyncApp,
993 ) -> Result<proto::ListRemoteDirectoryResponse> {
994 use smol::stream::StreamExt;
995 let fs = cx.read_entity(&this, |this, _| this.fs.clone());
996 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
997 let check_info = envelope
998 .payload
999 .config
1000 .as_ref()
1001 .is_some_and(|config| config.is_dir);
1002
1003 let mut entries = Vec::new();
1004 let mut entry_info = Vec::new();
1005 let mut response = fs.read_dir(&expanded).await?;
1006 while let Some(path) = response.next().await {
1007 let path = path?;
1008 if let Some(file_name) = path.file_name() {
1009 entries.push(file_name.to_string_lossy().into_owned());
1010 if check_info {
1011 let is_dir = fs.is_dir(&path).await;
1012 entry_info.push(proto::EntryInfo { is_dir });
1013 }
1014 }
1015 }
1016 Ok(proto::ListRemoteDirectoryResponse {
1017 entries,
1018 entry_info,
1019 })
1020 }
1021
1022 async fn handle_get_path_metadata(
1023 this: Entity<Self>,
1024 envelope: TypedEnvelope<proto::GetPathMetadata>,
1025 cx: AsyncApp,
1026 ) -> Result<proto::GetPathMetadataResponse> {
1027 let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1028 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1029
1030 let metadata = fs.metadata(&expanded).await?;
1031 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1032
1033 Ok(proto::GetPathMetadataResponse {
1034 exists: metadata.is_some(),
1035 is_dir,
1036 path: expanded.to_string_lossy().into_owned(),
1037 })
1038 }
1039
1040 async fn handle_shutdown_remote_server(
1041 _this: Entity<Self>,
1042 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1043 cx: AsyncApp,
1044 ) -> Result<proto::Ack> {
1045 cx.spawn(async move |cx| {
1046 cx.update(|cx| {
1047 // TODO: This is a hack, because in a headless project, shutdown isn't executed
1048 // when calling quit, but it should be.
1049 cx.shutdown();
1050 cx.quit();
1051 })
1052 })
1053 .detach();
1054
1055 Ok(proto::Ack {})
1056 }
1057
1058 pub async fn handle_ping(
1059 _this: Entity<Self>,
1060 _envelope: TypedEnvelope<proto::Ping>,
1061 _cx: AsyncApp,
1062 ) -> Result<proto::Ack> {
1063 log::debug!("Received ping from client");
1064 Ok(proto::Ack {})
1065 }
1066
1067 async fn handle_get_processes(
1068 _this: Entity<Self>,
1069 _envelope: TypedEnvelope<proto::GetProcesses>,
1070 _cx: AsyncApp,
1071 ) -> Result<proto::GetProcessesResponse> {
1072 let mut processes = Vec::new();
1073 let refresh_kind = RefreshKind::nothing().with_processes(
1074 ProcessRefreshKind::nothing()
1075 .without_tasks()
1076 .with_cmd(UpdateKind::Always),
1077 );
1078
1079 for process in System::new_with_specifics(refresh_kind)
1080 .processes()
1081 .values()
1082 {
1083 let name = process.name().to_string_lossy().into_owned();
1084 let command = process
1085 .cmd()
1086 .iter()
1087 .map(|s| s.to_string_lossy().into_owned())
1088 .collect::<Vec<_>>();
1089
1090 processes.push(proto::ProcessInfo {
1091 pid: process.pid().as_u32(),
1092 name,
1093 command,
1094 });
1095 }
1096
1097 processes.sort_by_key(|p| p.name.clone());
1098
1099 Ok(proto::GetProcessesResponse { processes })
1100 }
1101
1102 async fn handle_get_directory_environment(
1103 this: Entity<Self>,
1104 envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1105 mut cx: AsyncApp,
1106 ) -> Result<proto::DirectoryEnvironment> {
1107 let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1108 let directory = PathBuf::from(envelope.payload.directory);
1109 let environment = this
1110 .update(&mut cx, |this, cx| {
1111 this.environment.update(cx, |environment, cx| {
1112 environment.local_directory_environment(&shell, directory.into(), cx)
1113 })
1114 })
1115 .await
1116 .context("failed to get directory environment")?
1117 .into_iter()
1118 .collect();
1119 Ok(proto::DirectoryEnvironment { environment })
1120 }
1121}
1122
1123fn prompt_to_proto(
1124 prompt: &project::LanguageServerPromptRequest,
1125) -> proto::language_server_prompt_request::Level {
1126 match prompt.level {
1127 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1128 proto::language_server_prompt_request::Info {},
1129 ),
1130 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1131 proto::language_server_prompt_request::Warning {},
1132 ),
1133 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1134 proto::language_server_prompt_request::Critical {},
1135 ),
1136 }
1137}