From b212cfbc2e7231eaf6d96f899ea34f1e660f685a Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Fri, 9 Jan 2026 20:22:29 +0100 Subject: [PATCH] search: Stream project search results sooner (#45245) - **project search: Stream result buffers sooner in remote scenarios** - **Fix remote server build** Closes #ISSUE Release Notes: - Improved performance of project search in remote projects. --------- Co-authored-by: Smit Co-authored-by: Zed Zippy <234243425+zed-zippy[bot]@users.noreply.github.com> Co-authored-by: Smit Barmase Co-authored-by: Conrad --- Cargo.lock | 26 +++ crates/collab/src/rpc.rs | 17 +- crates/project/src/buffer_store.rs | 91 +++++++++- crates/project/src/project.rs | 102 +++++++++-- crates/project/src/project_search.rs | 165 ++++++++++++++++-- crates/project_benchmarks/Cargo.toml | 5 + crates/project_benchmarks/src/main.rs | 138 ++++++++++++--- crates/proto/proto/buffer.proto | 24 ++- crates/proto/proto/zed.proto | 9 +- crates/proto/src/proto.rs | 10 +- crates/remote_server/src/headless_project.rs | 109 +++++++++--- .../remote_server/src/remote_editing_tests.rs | 3 +- 12 files changed, 609 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 240e8ae4101801026e4f9e4f0933364d63a0cb53..804da1e31956d0a2a0436dda0802856a92de99a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12597,6 +12597,7 @@ name = "project_benchmarks" version = "0.1.0" dependencies = [ "anyhow", + "askpass", "clap", "client", "futures 0.3.31", @@ -12605,6 +12606,10 @@ dependencies = [ "language", "node_runtime", "project", + "release_channel", + "remote", + "rpassword", + "semver", "settings", "watch", ] @@ -13880,6 +13885,17 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" +[[package]] +name = "rpassword" +version = "7.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d4c8b64f049c6721ec8ccec37ddfc3d641c4a7fca57e8f2a89de509c73df39" +dependencies = [ + "libc", + "rtoolbox", + "windows-sys 0.59.0", +] + [[package]] name = "rpc" version = "0.1.0" @@ -13925,6 +13941,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rtoolbox" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7cc970b249fbe527d6e02e0a227762c9108b2f49d81094fe357ffc6d14d7f6f" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "rtrb" version = "0.3.2" diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 13381f05bc0b36942329034a83549452cdb83b54..a74a36fc38738d7e15c2adfb7465165c3112b69e 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -468,7 +468,8 @@ impl Server { .add_request_handler(forward_mutating_project_request::) .add_message_handler(broadcast_project_message_from_host::) .add_request_handler(share_agent_thread) - .add_request_handler(get_shared_agent_thread); + .add_request_handler(get_shared_agent_thread) + .add_request_handler(forward_project_search_chunk); Arc::new(server) } @@ -2426,6 +2427,20 @@ async fn update_context(message: proto::UpdateContext, session: MessageContext) Ok(()) } +async fn forward_project_search_chunk( + message: proto::FindSearchCandidatesChunk, + response: Response, + session: MessageContext, +) -> Result<()> { + let peer_id = message.peer_id.context("missing peer_id")?; + let payload = session + .peer + .forward_request(session.connection_id, peer_id.into(), message) + .await?; + response.send(payload)?; + Ok(()) +} + /// Notify other participants that a project has been updated. async fn broadcast_project_message_from_host>( request: T, diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 5a6d005d17cd2eee856cec42b8fbb2278eae6a9e..32d17d77dd64ff53e99bf840231ee21504d10cfd 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -19,7 +19,7 @@ use language::{ }; use rpc::{ AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope, - proto::{self}, + proto::{self, PeerId}, }; use settings::Settings; @@ -39,6 +39,17 @@ pub struct BufferStore { downstream_client: Option<(AnyProtoClient, u64)>, shared_buffers: HashMap>, non_searchable_buffers: HashSet, + project_search: RemoteProjectSearchState, +} + +#[derive(Default)] +struct RemoteProjectSearchState { + // List of ongoing project search chunks from our remote host. Used by the side issuing a search RPC request. + chunks: HashMap>, + // Monotonously-increasing handle to hand out to remote host in order to identify the project search result chunk. + next_id: u64, + // Used by the side running the actual search for match candidates to potentially cancel the search prematurely. + searches_in_progress: HashMap<(PeerId, u64), Task>>, } #[derive(Hash, Eq, PartialEq, Clone)] @@ -771,6 +782,7 @@ impl BufferStore { loading_buffers: Default::default(), non_searchable_buffers: Default::default(), worktree_store, + project_search: Default::default(), } } @@ -796,6 +808,7 @@ impl BufferStore { shared_buffers: Default::default(), non_searchable_buffers: Default::default(), worktree_store, + project_search: Default::default(), } } @@ -1691,6 +1704,82 @@ impl BufferStore { } serialized_transaction } + + pub(crate) fn register_project_search_result_handle( + &mut self, + ) -> (u64, smol::channel::Receiver) { + let (tx, rx) = smol::channel::unbounded(); + let handle = util::post_inc(&mut self.project_search.next_id); + let _old_entry = self.project_search.chunks.insert(handle, tx); + debug_assert!(_old_entry.is_none()); + (handle, rx) + } + + pub fn register_ongoing_project_search( + &mut self, + id: (PeerId, u64), + search: Task>, + ) { + let _old = self.project_search.searches_in_progress.insert(id, search); + debug_assert!(_old.is_none()); + } + + pub async fn handle_find_search_candidates_cancel( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result<()> { + let id = ( + envelope.original_sender_id.unwrap_or(envelope.sender_id), + envelope.payload.handle, + ); + let _ = this.update(&mut cx, |this, _| { + this.project_search.searches_in_progress.remove(&id) + }); + Ok(()) + } + + pub(crate) async fn handle_find_search_candidates_chunk( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result { + use proto::find_search_candidates_chunk::Variant; + let handle = envelope.payload.handle; + + let buffer_ids = match envelope + .payload + .variant + .context("Expected non-null variant")? + { + Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches + .buffer_ids + .into_iter() + .filter_map(|buffer_id| BufferId::new(buffer_id).ok()) + .collect::>(), + Variant::Done(_) => { + this.update(&mut cx, |this, _| { + this.project_search.chunks.remove(&handle) + }); + return Ok(proto::Ack {}); + } + }; + let Some(sender) = this.read_with(&mut cx, |this, _| { + this.project_search.chunks.get(&handle).cloned() + }) else { + return Ok(proto::Ack {}); + }; + + for buffer_id in buffer_ids { + let Ok(_) = sender.send(buffer_id).await else { + this.update(&mut cx, |this, _| { + this.project_search.chunks.remove(&handle) + }); + return Ok(proto::Ack {}); + }; + } + Ok(proto::Ack {}) + } } impl OpenBuffer { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 07d55cf5e1a7d8a23cd66cb97d7f943167bcbc86..0d13789219577380977303606314d95c560101c7 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -11,7 +11,7 @@ pub mod lsp_command; pub mod lsp_store; mod manifest_tree; pub mod prettier_store; -mod project_search; +pub mod project_search; pub mod project_settings; pub mod search; mod task_inventory; @@ -1057,6 +1057,8 @@ impl Project { client.add_entity_message_handler(Self::handle_create_buffer_for_peer); client.add_entity_message_handler(Self::handle_toggle_lsp_logs); client.add_entity_message_handler(Self::handle_create_image_for_peer); + client.add_entity_request_handler(Self::handle_find_search_candidates_chunk); + client.add_entity_message_handler(Self::handle_find_search_candidates_cancel); WorktreeStore::init(&client); BufferStore::init(&client); @@ -1499,7 +1501,9 @@ impl Project { remote_proto.add_entity_request_handler(Self::handle_update_buffer_from_remote_server); remote_proto.add_entity_request_handler(Self::handle_trust_worktrees); remote_proto.add_entity_request_handler(Self::handle_restrict_worktrees); + remote_proto.add_entity_request_handler(Self::handle_find_search_candidates_chunk); + remote_proto.add_entity_message_handler(Self::handle_find_search_candidates_cancel); BufferStore::init(&remote_proto); LspStore::init(&remote_proto); SettingsObserver::init(&remote_proto); @@ -4924,6 +4928,26 @@ impl Project { Ok(proto::Ack {}) } + // Goes from host to client. + async fn handle_find_search_candidates_chunk( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result { + let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone()); + BufferStore::handle_find_search_candidates_chunk(buffer_store, envelope, cx).await + } + + // Goes from client to host. + async fn handle_find_search_candidates_cancel( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result<()> { + let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone()); + BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await + } + async fn handle_update_buffer( this: Entity, envelope: TypedEnvelope, @@ -5027,32 +5051,78 @@ impl Project { Ok(response) } + // Goes from client to host. async fn handle_search_candidate_buffers( this: Entity, envelope: TypedEnvelope, mut cx: AsyncApp, - ) -> Result { - let peer_id = envelope.original_sender_id()?; + ) -> Result { + let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id); let message = envelope.payload; + let project_id = message.project_id; let path_style = this.read_with(&cx, |this, cx| this.path_style(cx)); let query = SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?; - let results = this.update(&mut cx, |this, cx| { - this.search_impl(query, cx).matching_buffers(cx) - }); - let mut response = proto::FindSearchCandidatesResponse { - buffer_ids: Vec::new(), - }; - - while let Ok(buffer) = results.rx.recv().await { - this.update(&mut cx, |this, cx| { - let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx); - response.buffer_ids.push(buffer_id.to_proto()); + let handle = message.handle; + let buffer_store = this.read_with(&cx, |this, _| this.buffer_store().clone()); + let client = this.read_with(&cx, |this, _| this.client()); + let task = cx.spawn(async move |cx| { + let results = this.update(cx, |this, cx| { + this.search_impl(query, cx).matching_buffers(cx) + }); + let (batcher, batches) = project_search::AdaptiveBatcher::new(cx.background_executor()); + let mut new_matches = Box::pin(results.rx); + + let sender_task = cx.background_executor().spawn({ + let client = client.clone(); + async move { + let mut batches = std::pin::pin!(batches); + while let Some(buffer_ids) = batches.next().await { + client + .request(proto::FindSearchCandidatesChunk { + handle, + peer_id: Some(peer_id), + project_id, + variant: Some( + proto::find_search_candidates_chunk::Variant::Matches( + proto::FindSearchCandidatesMatches { buffer_ids }, + ), + ), + }) + .await?; + } + anyhow::Ok(()) + } }); - } - Ok(response) + while let Some(buffer) = new_matches.next().await { + let buffer_id = this.update(cx, |this, cx| { + this.create_buffer_for_peer(&buffer, peer_id, cx).to_proto() + }); + batcher.push(buffer_id).await; + } + batcher.flush().await; + + sender_task.await?; + + let _ = client + .request(proto::FindSearchCandidatesChunk { + handle, + peer_id: Some(peer_id), + project_id, + variant: Some(proto::find_search_candidates_chunk::Variant::Done( + proto::FindSearchCandidatesDone {}, + )), + }) + .await?; + anyhow::Ok(()) + }); + buffer_store.update(&mut cx, |this, _| { + this.register_ongoing_project_search((peer_id, handle), task); + }); + + Ok(proto::Ack {}) } async fn handle_open_buffer_by_id( diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 33c8b38f76f80c73499293ace2df2ce61cdb8cb2..b93646b31fd2c26448c5fee69e555bb13e9f4097 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -6,23 +6,21 @@ use std::{ path::{Path, PathBuf}, pin::pin, sync::Arc, + time::Duration, }; use anyhow::Context; use collections::HashSet; use fs::Fs; +use futures::FutureExt as _; use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered}; -use gpui::{App, AppContext, AsyncApp, Entity, Task}; +use gpui::{App, AppContext, AsyncApp, BackgroundExecutor, Entity, Priority, Task}; use language::{Buffer, BufferSnapshot}; use parking_lot::Mutex; use postage::oneshot; use rpc::{AnyProtoClient, proto}; -use smol::{ - channel::{Receiver, Sender, bounded, unbounded}, - future::FutureExt, -}; +use smol::channel::{Receiver, Sender, bounded, unbounded}; -use text::BufferId; use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath}; use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings}; @@ -249,12 +247,26 @@ impl Search { remote_id, models, } => { + let (handle, rx) = self + .buffer_store + .update(cx, |this, _| this.register_project_search_result_handle()); + + let cancel_ongoing_search = util::defer({ + let client = client.clone(); + move || { + _ = client.send(proto::FindSearchCandidatesCancelled { + project_id: remote_id, + handle, + }); + } + }); let request = client.request(proto::FindSearchCandidates { project_id: remote_id, query: Some(query.to_proto()), limit: self.limit as _, + handle, }); - let weak_buffer_store = self.buffer_store.downgrade(); + let buffer_store = self.buffer_store; let guard = cx.update(|cx| { Project::retain_remotely_created_models_impl( @@ -268,18 +280,39 @@ impl Search { let issue_remote_buffers_request = cx .spawn(async move |cx| { let _ = maybe!(async move { - let response = request.await?; - for buffer_id in response.buffer_ids { - let buffer_id = BufferId::new(buffer_id)?; - let buffer = weak_buffer_store - .update(cx, |buffer_store, cx| { - buffer_store.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; - let _ = grab_buffer_snapshot_tx.send(buffer).await; - } + request.await?; + + let (buffer_tx, buffer_rx) = bounded(24); + + let wait_for_remote_buffers = cx.spawn(async move |cx| { + while let Ok(buffer_id) = rx.recv().await { + let buffer = + buffer_store.update(cx, |buffer_store, cx| { + buffer_store + .wait_for_remote_buffer(buffer_id, cx) + }); + buffer_tx.send(buffer).await?; + } + anyhow::Ok(()) + }); + + let forward_buffers = cx.background_spawn(async move { + while let Ok(buffer) = buffer_rx.recv().await { + let _ = + grab_buffer_snapshot_tx.send(buffer.await?).await; + } + anyhow::Ok(()) + }); + let (left, right) = futures::future::join( + wait_for_remote_buffers, + forward_buffers, + ) + .await; + left?; + right?; drop(guard); + cancel_ongoing_search.abort(); anyhow::Ok(()) }) .await @@ -894,6 +927,104 @@ impl PathInclusionMatcher { } } +type IsTerminating = bool; +/// Adaptive batcher that starts eager (small batches) and grows batch size +/// when items arrive quickly, reducing RPC overhead while preserving low latency +/// for slow streams. +pub struct AdaptiveBatcher { + items: Sender, + flush_batch: Sender, + _batch_task: Task<()>, +} + +impl AdaptiveBatcher { + pub fn new(cx: &BackgroundExecutor) -> (Self, Receiver>) { + let (items, rx) = unbounded(); + let (batch_tx, batch_rx) = unbounded(); + let (flush_batch_tx, flush_batch_rx) = unbounded(); + let flush_batch = flush_batch_tx.clone(); + let executor = cx.clone(); + let _batch_task = cx.spawn_with_priority(gpui::Priority::High, async move { + let mut current_batch = vec![]; + let mut items_produced_so_far = 0_u64; + + let mut _schedule_flush_after_delay: Option> = None; + let _time_elapsed_since_start_of_search = std::time::Instant::now(); + let mut flush = pin!(flush_batch_rx); + let mut terminating = false; + loop { + select_biased! { + item = rx.recv().fuse() => { + match item { + Ok(new_item) => { + let is_fresh_batch = current_batch.is_empty(); + items_produced_so_far += 1; + current_batch.push(new_item); + if is_fresh_batch { + // Chosen arbitrarily based on some experimentation with plots. + let desired_duration_ms = (20 * (items_produced_so_far + 2).ilog2() as u64).min(300); + let desired_duration = Duration::from_millis(desired_duration_ms); + let _executor = executor.clone(); + let _flush = flush_batch_tx.clone(); + let new_timer = executor.spawn_with_priority(Priority::High, async move { + _executor.timer(desired_duration).await; + _ = _flush.send(false).await; + }); + _schedule_flush_after_delay = Some(new_timer); + } + } + Err(_) => { + // Items channel closed - send any remaining batch before exiting + if !current_batch.is_empty() { + _ = batch_tx.send(std::mem::take(&mut current_batch)).await; + } + break; + } + } + } + should_break_afterwards = flush.next() => { + if !current_batch.is_empty() { + _ = batch_tx.send(std::mem::take(&mut current_batch)).await; + _schedule_flush_after_delay = None; + } + if should_break_afterwards.unwrap_or_default() { + terminating = true; + } + } + complete => { + break; + } + } + if terminating { + // Drain any remaining items before exiting + while let Ok(new_item) = rx.try_recv() { + current_batch.push(new_item); + } + if !current_batch.is_empty() { + _ = batch_tx.send(std::mem::take(&mut current_batch)).await; + } + break; + } + } + }); + let this = Self { + items, + _batch_task, + flush_batch, + }; + (this, batch_rx) + } + + pub async fn push(&self, item: T) { + _ = self.items.send(item).await; + } + + pub async fn flush(self) { + _ = self.flush_batch.send(true).await; + self._batch_task.await; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/project_benchmarks/Cargo.toml b/crates/project_benchmarks/Cargo.toml index 1171d468c649bdd9f76a44b3ef0155dc652c6034..fbf2ec73cc53beeffc7be495e3d5d5b8c5dc735c 100644 --- a/crates/project_benchmarks/Cargo.toml +++ b/crates/project_benchmarks/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] anyhow.workspace = true +askpass.workspace = true clap.workspace = true client.workspace = true futures.workspace = true @@ -14,6 +15,10 @@ http_client = { workspace = true, features = ["test-support"]} language.workspace = true node_runtime.workspace = true project.workspace = true +release_channel.workspace = true +remote = { workspace = true, features = ["build-remote-server-binary"] } +rpassword = "7.4" +semver.workspace = true settings.workspace = true watch.workspace = true diff --git a/crates/project_benchmarks/src/main.rs b/crates/project_benchmarks/src/main.rs index 02f810ad50323ba0fa5c07e8634569a09255f054..f93f8e00404d53f06425b2ffe4fe18136f19330f 100644 --- a/crates/project_benchmarks/src/main.rs +++ b/crates/project_benchmarks/src/main.rs @@ -1,7 +1,10 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use anyhow::anyhow; +use askpass::EncryptedPassword; use clap::Parser; use client::{Client, UserStore}; +use futures::channel::oneshot; use gpui::{AppContext as _, Application}; use http_client::FakeHttpClient; use language::LanguageRegistry; @@ -10,13 +13,19 @@ use project::{ Project, RealFs, search::{SearchQuery, SearchResult}, }; +use release_channel::ReleaseChannel; +use remote::{ConnectionIdentifier, RemoteClientDelegate, SshConnectionOptions}; +use semver::Version; #[derive(Parser)] struct Args { /// List of worktrees to run the search against. worktrees: Vec, #[clap(short)] - query: String, + query: Option, + /// Askpass socket for SSH authentication + #[clap(long)] + askpass: Option, /// Treat query as a regex. #[clap(short, long)] regex: bool, @@ -29,13 +38,72 @@ struct Args { /// Include gitignored files in the search. #[clap(long)] include_ignored: bool, + #[clap(long)] + ssh: Option, } +struct BenchmarkRemoteClient; +impl RemoteClientDelegate for BenchmarkRemoteClient { + fn ask_password( + &self, + prompt: String, + tx: oneshot::Sender, + _cx: &mut gpui::AsyncApp, + ) { + eprintln!("SSH asking for password: {}", prompt); + match rpassword::prompt_password(&prompt) { + Ok(password) => match EncryptedPassword::try_from(password.as_ref()) { + Ok(encrypted) => { + if tx.send(encrypted).is_err() { + eprintln!("Failed to send password"); + } + } + Err(e) => eprintln!("Failed to encrypt password: {e}"), + }, + Err(e) => eprintln!("Failed to read password: {e}"), + } + } + + fn get_download_url( + &self, + _platform: remote::RemotePlatform, + _release_channel: ReleaseChannel, + _version: Option, + _cx: &mut gpui::AsyncApp, + ) -> gpui::Task>> { + unimplemented!() + } + + fn download_server_binary_locally( + &self, + _platform: remote::RemotePlatform, + _release_channel: ReleaseChannel, + _version: Option, + _cx: &mut gpui::AsyncApp, + ) -> gpui::Task> { + unimplemented!() + } + + fn set_status(&self, status: Option<&str>, _: &mut gpui::AsyncApp) { + if let Some(status) = status { + println!("SSH status: {status}"); + } + } +} fn main() -> Result<(), anyhow::Error> { let args = Args::parse(); + + if let Some(socket) = &args.askpass { + askpass::main(socket); + return Ok(()); + } + + let query_str = args + .query + .ok_or_else(|| anyhow!("-q/--query is required"))?; let query = if args.regex { SearchQuery::regex( - args.query, + query_str, args.whole_word, args.case_sensitive, args.include_ignored, @@ -47,7 +115,7 @@ fn main() -> Result<(), anyhow::Error> { ) } else { SearchQuery::text( - args.query, + query_str, args.whole_word, args.case_sensitive, args.include_ignored, @@ -58,6 +126,7 @@ fn main() -> Result<(), anyhow::Error> { ) }?; Application::headless().run(|cx| { + release_channel::init_test(semver::Version::new(0, 0, 0), ReleaseChannel::Dev, cx); settings::init(cx); let client = Client::production(cx); let http_client = FakeHttpClient::with_200_response(); @@ -66,19 +135,35 @@ fn main() -> Result<(), anyhow::Error> { let user_store = cx.new(|cx| UserStore::new(client.clone(), cx)); let registry = Arc::new(LanguageRegistry::new(cx.background_executor().clone())); let fs = Arc::new(RealFs::new(None, cx.background_executor().clone())); - let project = Project::local( - client, - node, - user_store, - registry, - fs, - Some(Default::default()), - false, - cx, - ); - project.clone().update(cx, move |_, cx| { - cx.spawn(async move |_, cx| { + + + cx.spawn(async move |cx| { + let project = if let Some(ssh_target) = args.ssh { + println!("Setting up SSH connection for {}", &ssh_target); + let ssh_connection_options = SshConnectionOptions::parse_command_line(&ssh_target)?; + + let connection_options = remote::RemoteConnectionOptions::from(ssh_connection_options); + let delegate = Arc::new(BenchmarkRemoteClient); + let remote_connection = remote::connect(connection_options.clone(), delegate.clone(), cx).await.unwrap(); + + let (_tx, rx) = oneshot::channel(); + let remote_client = cx.update(|cx| remote::RemoteClient::new(ConnectionIdentifier::setup(), remote_connection, rx, delegate.clone(), cx )).await?.ok_or_else(|| anyhow!("ssh initialization returned None"))?; + + cx.update(|cx| Project::remote(remote_client, client, node, user_store, registry, fs, false, cx)) + } else { + println!("Setting up local project"); + cx.update(|cx| Project::local( + client, + node, + user_store, + registry, + fs, + Some(Default::default()), + false, + cx, + )) + }; println!("Loading worktrees"); let worktrees = project.update(cx, |this, cx| { args.worktrees @@ -93,9 +178,20 @@ fn main() -> Result<(), anyhow::Error> { .collect::, anyhow::Error>>()?; for (worktree, _) in &worktrees { - worktree - .update(cx, |this, _| this.as_local().unwrap().scan_complete()) - .await; + let scan_complete = worktree + .update(cx, |this, _| { + if let Some(local) = this.as_local() { + Some(local.scan_complete()) + } else { + None + } + }); + if let Some(scan_complete) = scan_complete { + scan_complete.await; + } else { + cx.background_executor().timer(Duration::from_secs(10)).await; + } + } println!("Worktrees loaded"); @@ -127,8 +223,8 @@ fn main() -> Result<(), anyhow::Error> { anyhow::Ok(()) }) - .detach(); - }); + .detach_and_log_err(cx); + }); Ok(()) } diff --git a/crates/proto/proto/buffer.proto b/crates/proto/proto/buffer.proto index 486716b36a221911ddf5abe1336a1e6cc3808769..4cd83af2aab8a44feb9f9646ec85d343b8875f82 100644 --- a/crates/proto/proto/buffer.proto +++ b/crates/proto/proto/buffer.proto @@ -308,8 +308,28 @@ message FindSearchCandidates { uint64 project_id = 1; SearchQuery query = 2; uint64 limit = 3; + uint64 handle = 4; } -message FindSearchCandidatesResponse { - repeated uint64 buffer_ids = 1; + +message FindSearchCandidatesDone {} + +message FindSearchCandidatesMatches { + repeated uint64 buffer_ids = 1; +} + +message FindSearchCandidatesChunk { + uint64 project_id = 1; + PeerId peer_id = 2; + uint64 handle = 3; + + oneof variant { + FindSearchCandidatesMatches matches = 4; + FindSearchCandidatesDone done = 5; + } +} + +message FindSearchCandidatesCancelled { + uint64 project_id = 1; + uint64 handle = 2; } diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index 750d24d2faea8e67d363847d9ba312212e2c50dc..e5cfd0c02d4d40bf109cc2dd6357e9fbbdcc8adf 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -244,7 +244,6 @@ message Envelope { LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242; FindSearchCandidates find_search_candidates = 243; - FindSearchCandidatesResponse find_search_candidates_response = 244; CloseBuffer close_buffer = 245; @@ -448,10 +447,13 @@ message Envelope { TrustWorktrees trust_worktrees = 404; RestrictWorktrees restrict_worktrees = 405; - + ShareAgentThread share_agent_thread = 406; GetSharedAgentThread get_shared_agent_thread = 407; - GetSharedAgentThreadResponse get_shared_agent_thread_response = 408; // current max + GetSharedAgentThreadResponse get_shared_agent_thread_response = 408; + + FindSearchCandidatesChunk find_search_candidates_chunk = 409; + FindSearchCandidatesCancelled find_search_candidates_cancelled = 410; // current max } reserved 87 to 88; @@ -469,6 +471,7 @@ message Envelope { reserved 224 to 231; reserved 234 to 236; reserved 239 to 240; + reserved 244; reserved 246 to 256; reserved 259; reserved 270; diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 7c0e7fbfb3ff03d88a83bf1b62b3de684044eb3c..00f9d0e3ecfeeb438eca7fd89c087f5b4ac2b641 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -68,7 +68,6 @@ messages!( (Error, Foreground), (ExpandProjectEntry, Foreground), (ExpandProjectEntryResponse, Foreground), - (FindSearchCandidatesResponse, Background), (FindSearchCandidates, Background), (FlushBufferedMessages, Foreground), (ExpandAllForProjectEntry, Foreground), @@ -345,7 +344,9 @@ messages!( (GitCreateWorktree, Background), (ShareAgentThread, Foreground), (GetSharedAgentThread, Foreground), - (GetSharedAgentThreadResponse, Foreground) + (GetSharedAgentThreadResponse, Foreground), + (FindSearchCandidatesChunk, Background), + (FindSearchCandidatesCancelled, Background), ); request_messages!( @@ -440,7 +441,7 @@ request_messages!( (RespondToContactRequest, Ack), (SaveBuffer, BufferSaved), (Stage, Ack), - (FindSearchCandidates, FindSearchCandidatesResponse), + (FindSearchCandidates, Ack), (SendChannelMessage, SendChannelMessageResponse), (SetChannelMemberRole, Ack), (SetChannelVisibility, Ack), @@ -535,6 +536,7 @@ request_messages!( (GitCreateWorktree, Ack), (TrustWorktrees, Ack), (RestrictWorktrees, Ack), + (FindSearchCandidatesChunk, Ack), ); lsp_messages!( @@ -710,6 +712,8 @@ entity_messages!( GitCreateWorktree, TrustWorktrees, RestrictWorktrees, + FindSearchCandidatesChunk, + FindSearchCandidatesCancelled, ); entity_messages!( diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 7c93b02c42d801bd2d9412cf95a914fc3ddc3435..87807fab71776a4bdc2a681be193574112cc15d6 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -32,7 +32,6 @@ use rpc::{ }; use settings::initial_server_settings_content; -use smol::stream::StreamExt; use std::{ num::NonZeroU64, path::{Path, PathBuf}, @@ -288,6 +287,7 @@ impl HeadlessProject { session.add_entity_request_handler(Self::handle_trust_worktrees); session.add_entity_request_handler(Self::handle_restrict_worktrees); + session.add_entity_message_handler(Self::handle_find_search_candidates_cancel); session.add_entity_request_handler(BufferStore::handle_update_buffer); session.add_entity_message_handler(BufferStore::handle_close_buffer); @@ -779,41 +779,99 @@ impl HeadlessProject { this: Entity, envelope: TypedEnvelope, mut cx: AsyncApp, - ) -> Result { + ) -> Result { + use futures::stream::StreamExt as _; + + let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id); let message = envelope.payload; let query = SearchQuery::from_proto( message.query.context("missing query field")?, PathStyle::local(), )?; - let results = this.update(&mut cx, |this, cx| { - project::Search::local( - this.fs.clone(), - this.buffer_store.clone(), - this.worktree_store.clone(), - message.limit as _, - cx, - ) - .into_handle(query, cx) - .matching_buffers(cx) - }); - - let mut response = proto::FindSearchCandidatesResponse { - buffer_ids: Vec::new(), - }; + let project_id = message.project_id; let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone()); + let handle = message.handle; + let _buffer_store = buffer_store.clone(); + let client = this.read_with(&cx, |this, _| this.session.clone()); + let task = cx.spawn(async move |cx| { + let results = this.update(cx, |this, cx| { + project::Search::local( + this.fs.clone(), + this.buffer_store.clone(), + this.worktree_store.clone(), + message.limit as _, + cx, + ) + .into_handle(query, cx) + .matching_buffers(cx) + }); + let (batcher, batches) = + project::project_search::AdaptiveBatcher::new(cx.background_executor()); + let mut new_matches = Box::pin(results.rx); + + let sender_task = cx.background_executor().spawn({ + let client = client.clone(); + async move { + let mut batches = std::pin::pin!(batches); + while let Some(buffer_ids) = batches.next().await { + client + .request(proto::FindSearchCandidatesChunk { + handle, + peer_id: Some(peer_id), + project_id, + variant: Some( + proto::find_search_candidates_chunk::Variant::Matches( + proto::FindSearchCandidatesMatches { buffer_ids }, + ), + ), + }) + .await?; + } + anyhow::Ok(()) + } + }); - while let Ok(buffer) = results.rx.recv().await { - let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id()); - response.buffer_ids.push(buffer_id.to_proto()); - buffer_store - .update(&mut cx, |buffer_store, cx| { - buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx) + while let Some(buffer) = new_matches.next().await { + let _ = buffer_store + .update(cx, |this, cx| { + this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx) + }) + .await; + let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto()); + batcher.push(buffer_id).await; + } + batcher.flush().await; + + sender_task.await?; + + client + .request(proto::FindSearchCandidatesChunk { + handle, + peer_id: Some(peer_id), + project_id, + variant: Some(proto::find_search_candidates_chunk::Variant::Done( + proto::FindSearchCandidatesDone {}, + )), }) .await?; - } + anyhow::Ok(()) + }); + _buffer_store.update(&mut cx, |this, _| { + this.register_ongoing_project_search((peer_id, handle), task); + }); - Ok(response) + Ok(proto::Ack {}) + } + + // Goes from client to host. + async fn handle_find_search_candidates_cancel( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result<()> { + let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone()); + BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await } async fn handle_list_remote_directory( @@ -821,6 +879,7 @@ impl HeadlessProject { envelope: TypedEnvelope, cx: AsyncApp, ) -> Result { + use smol::stream::StreamExt; let fs = cx.read_entity(&this, |this, _| this.fs.clone()); let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string()); let check_info = envelope diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index fa140aeef46caae32031a912a950a45619bcafe0..abefc873b720668c237bcd436759e235107d24c5 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -237,11 +237,12 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes assert!(headless.buffer_store.read(cx).has_shared_buffers()) }); do_search(&project, cx.clone()).await; - + server_cx.run_until_parked(); cx.update(|_| { drop(buffer); }); cx.run_until_parked(); + server_cx.run_until_parked(); headless.update(server_cx, |headless, cx| { assert!(!headless.buffer_store.read(cx).has_shared_buffers()) });