search: Stream project search results sooner (#45245)

Piotr Osiewicz , Smit , Zed Zippy , Smit Barmase , and Conrad created

- **project search: Stream result buffers sooner in remote scenarios**
- **Fix remote server build**

Closes #ISSUE

Release Notes:

- Improved performance of project search in remote projects.

---------

Co-authored-by: Smit <smit@zed.dev>
Co-authored-by: Zed Zippy <234243425+zed-zippy[bot]@users.noreply.github.com>
Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>
Co-authored-by: Conrad <conrad@zed.dev>

Change summary

Cargo.lock                                       |  26 ++
crates/collab/src/rpc.rs                         |  17 +
crates/project/src/buffer_store.rs               |  91 +++++++++
crates/project/src/project.rs                    | 102 +++++++++-
crates/project/src/project_search.rs             | 165 ++++++++++++++++-
crates/project_benchmarks/Cargo.toml             |   5 
crates/project_benchmarks/src/main.rs            | 138 ++++++++++++--
crates/proto/proto/buffer.proto                  |  24 ++
crates/proto/proto/zed.proto                     |   9 
crates/proto/src/proto.rs                        |  10 
crates/remote_server/src/headless_project.rs     | 109 +++++++++--
crates/remote_server/src/remote_editing_tests.rs |   3 
12 files changed, 609 insertions(+), 90 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -12597,6 +12597,7 @@ name = "project_benchmarks"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "askpass",
  "clap",
  "client",
  "futures 0.3.31",
@@ -12605,6 +12606,10 @@ dependencies = [
  "language",
  "node_runtime",
  "project",
+ "release_channel",
+ "remote",
+ "rpassword",
+ "semver",
  "settings",
  "watch",
 ]
@@ -13880,6 +13885,17 @@ version = "0.20.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97"
 
+[[package]]
+name = "rpassword"
+version = "7.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "66d4c8b64f049c6721ec8ccec37ddfc3d641c4a7fca57e8f2a89de509c73df39"
+dependencies = [
+ "libc",
+ "rtoolbox",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "rpc"
 version = "0.1.0"
@@ -13925,6 +13941,16 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "rtoolbox"
+version = "0.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a7cc970b249fbe527d6e02e0a227762c9108b2f49d81094fe357ffc6d14d7f6f"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "rtrb"
 version = "0.3.2"

crates/collab/src/rpc.rs 🔗

@@ -468,7 +468,8 @@ impl Server {
             .add_request_handler(forward_mutating_project_request::<proto::ToggleLspLogs>)
             .add_message_handler(broadcast_project_message_from_host::<proto::LanguageServerLog>)
             .add_request_handler(share_agent_thread)
-            .add_request_handler(get_shared_agent_thread);
+            .add_request_handler(get_shared_agent_thread)
+            .add_request_handler(forward_project_search_chunk);
 
         Arc::new(server)
     }
@@ -2426,6 +2427,20 @@ async fn update_context(message: proto::UpdateContext, session: MessageContext)
     Ok(())
 }
 
+async fn forward_project_search_chunk(
+    message: proto::FindSearchCandidatesChunk,
+    response: Response<proto::FindSearchCandidatesChunk>,
+    session: MessageContext,
+) -> Result<()> {
+    let peer_id = message.peer_id.context("missing peer_id")?;
+    let payload = session
+        .peer
+        .forward_request(session.connection_id, peer_id.into(), message)
+        .await?;
+    response.send(payload)?;
+    Ok(())
+}
+
 /// Notify other participants that a project has been updated.
 async fn broadcast_project_message_from_host<T: EntityMessage<Entity = ShareProject>>(
     request: T,

crates/project/src/buffer_store.rs 🔗

@@ -19,7 +19,7 @@ use language::{
 };
 use rpc::{
     AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
-    proto::{self},
+    proto::{self, PeerId},
 };
 
 use settings::Settings;
@@ -39,6 +39,17 @@ pub struct BufferStore {
     downstream_client: Option<(AnyProtoClient, u64)>,
     shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
     non_searchable_buffers: HashSet<BufferId>,
+    project_search: RemoteProjectSearchState,
+}
+
+#[derive(Default)]
+struct RemoteProjectSearchState {
+    // List of ongoing project search chunks from our remote host. Used by the side issuing a search RPC request.
+    chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
+    // Monotonously-increasing handle to hand out to remote host in order to identify the project search result chunk.
+    next_id: u64,
+    // Used by the side running the actual search for match candidates to potentially cancel the search prematurely.
+    searches_in_progress: HashMap<(PeerId, u64), Task<Result<()>>>,
 }
 
 #[derive(Hash, Eq, PartialEq, Clone)]
@@ -771,6 +782,7 @@ impl BufferStore {
             loading_buffers: Default::default(),
             non_searchable_buffers: Default::default(),
             worktree_store,
+            project_search: Default::default(),
         }
     }
 
@@ -796,6 +808,7 @@ impl BufferStore {
             shared_buffers: Default::default(),
             non_searchable_buffers: Default::default(),
             worktree_store,
+            project_search: Default::default(),
         }
     }
 
@@ -1691,6 +1704,82 @@ impl BufferStore {
         }
         serialized_transaction
     }
+
+    pub(crate) fn register_project_search_result_handle(
+        &mut self,
+    ) -> (u64, smol::channel::Receiver<BufferId>) {
+        let (tx, rx) = smol::channel::unbounded();
+        let handle = util::post_inc(&mut self.project_search.next_id);
+        let _old_entry = self.project_search.chunks.insert(handle, tx);
+        debug_assert!(_old_entry.is_none());
+        (handle, rx)
+    }
+
+    pub fn register_ongoing_project_search(
+        &mut self,
+        id: (PeerId, u64),
+        search: Task<anyhow::Result<()>>,
+    ) {
+        let _old = self.project_search.searches_in_progress.insert(id, search);
+        debug_assert!(_old.is_none());
+    }
+
+    pub async fn handle_find_search_candidates_cancel(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
+        mut cx: AsyncApp,
+    ) -> Result<()> {
+        let id = (
+            envelope.original_sender_id.unwrap_or(envelope.sender_id),
+            envelope.payload.handle,
+        );
+        let _ = this.update(&mut cx, |this, _| {
+            this.project_search.searches_in_progress.remove(&id)
+        });
+        Ok(())
+    }
+
+    pub(crate) async fn handle_find_search_candidates_chunk(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
+        mut cx: AsyncApp,
+    ) -> Result<proto::Ack> {
+        use proto::find_search_candidates_chunk::Variant;
+        let handle = envelope.payload.handle;
+
+        let buffer_ids = match envelope
+            .payload
+            .variant
+            .context("Expected non-null variant")?
+        {
+            Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
+                .buffer_ids
+                .into_iter()
+                .filter_map(|buffer_id| BufferId::new(buffer_id).ok())
+                .collect::<Vec<_>>(),
+            Variant::Done(_) => {
+                this.update(&mut cx, |this, _| {
+                    this.project_search.chunks.remove(&handle)
+                });
+                return Ok(proto::Ack {});
+            }
+        };
+        let Some(sender) = this.read_with(&mut cx, |this, _| {
+            this.project_search.chunks.get(&handle).cloned()
+        }) else {
+            return Ok(proto::Ack {});
+        };
+
+        for buffer_id in buffer_ids {
+            let Ok(_) = sender.send(buffer_id).await else {
+                this.update(&mut cx, |this, _| {
+                    this.project_search.chunks.remove(&handle)
+                });
+                return Ok(proto::Ack {});
+            };
+        }
+        Ok(proto::Ack {})
+    }
 }
 
 impl OpenBuffer {

crates/project/src/project.rs 🔗

@@ -11,7 +11,7 @@ pub mod lsp_command;
 pub mod lsp_store;
 mod manifest_tree;
 pub mod prettier_store;
-mod project_search;
+pub mod project_search;
 pub mod project_settings;
 pub mod search;
 mod task_inventory;
@@ -1057,6 +1057,8 @@ impl Project {
         client.add_entity_message_handler(Self::handle_create_buffer_for_peer);
         client.add_entity_message_handler(Self::handle_toggle_lsp_logs);
         client.add_entity_message_handler(Self::handle_create_image_for_peer);
+        client.add_entity_request_handler(Self::handle_find_search_candidates_chunk);
+        client.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 
         WorktreeStore::init(&client);
         BufferStore::init(&client);
@@ -1499,7 +1501,9 @@ impl Project {
             remote_proto.add_entity_request_handler(Self::handle_update_buffer_from_remote_server);
             remote_proto.add_entity_request_handler(Self::handle_trust_worktrees);
             remote_proto.add_entity_request_handler(Self::handle_restrict_worktrees);
+            remote_proto.add_entity_request_handler(Self::handle_find_search_candidates_chunk);
 
+            remote_proto.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
             BufferStore::init(&remote_proto);
             LspStore::init(&remote_proto);
             SettingsObserver::init(&remote_proto);
@@ -4924,6 +4928,26 @@ impl Project {
         Ok(proto::Ack {})
     }
 
+    // Goes from host to client.
+    async fn handle_find_search_candidates_chunk(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
+        mut cx: AsyncApp,
+    ) -> Result<proto::Ack> {
+        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
+        BufferStore::handle_find_search_candidates_chunk(buffer_store, envelope, cx).await
+    }
+
+    // Goes from client to host.
+    async fn handle_find_search_candidates_cancel(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
+        mut cx: AsyncApp,
+    ) -> Result<()> {
+        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
+        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
+    }
+
     async fn handle_update_buffer(
         this: Entity<Self>,
         envelope: TypedEnvelope<proto::UpdateBuffer>,
@@ -5027,32 +5051,78 @@ impl Project {
         Ok(response)
     }
 
+    // Goes from client to host.
     async fn handle_search_candidate_buffers(
         this: Entity<Self>,
         envelope: TypedEnvelope<proto::FindSearchCandidates>,
         mut cx: AsyncApp,
-    ) -> Result<proto::FindSearchCandidatesResponse> {
-        let peer_id = envelope.original_sender_id()?;
+    ) -> Result<proto::Ack> {
+        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
         let message = envelope.payload;
+        let project_id = message.project_id;
         let path_style = this.read_with(&cx, |this, cx| this.path_style(cx));
         let query =
             SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
-        let results = this.update(&mut cx, |this, cx| {
-            this.search_impl(query, cx).matching_buffers(cx)
-        });
 
-        let mut response = proto::FindSearchCandidatesResponse {
-            buffer_ids: Vec::new(),
-        };
-
-        while let Ok(buffer) = results.rx.recv().await {
-            this.update(&mut cx, |this, cx| {
-                let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
-                response.buffer_ids.push(buffer_id.to_proto());
+        let handle = message.handle;
+        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store().clone());
+        let client = this.read_with(&cx, |this, _| this.client());
+        let task = cx.spawn(async move |cx| {
+            let results = this.update(cx, |this, cx| {
+                this.search_impl(query, cx).matching_buffers(cx)
+            });
+            let (batcher, batches) = project_search::AdaptiveBatcher::new(cx.background_executor());
+            let mut new_matches = Box::pin(results.rx);
+
+            let sender_task = cx.background_executor().spawn({
+                let client = client.clone();
+                async move {
+                    let mut batches = std::pin::pin!(batches);
+                    while let Some(buffer_ids) = batches.next().await {
+                        client
+                            .request(proto::FindSearchCandidatesChunk {
+                                handle,
+                                peer_id: Some(peer_id),
+                                project_id,
+                                variant: Some(
+                                    proto::find_search_candidates_chunk::Variant::Matches(
+                                        proto::FindSearchCandidatesMatches { buffer_ids },
+                                    ),
+                                ),
+                            })
+                            .await?;
+                    }
+                    anyhow::Ok(())
+                }
             });
-        }
 
-        Ok(response)
+            while let Some(buffer) = new_matches.next().await {
+                let buffer_id = this.update(cx, |this, cx| {
+                    this.create_buffer_for_peer(&buffer, peer_id, cx).to_proto()
+                });
+                batcher.push(buffer_id).await;
+            }
+            batcher.flush().await;
+
+            sender_task.await?;
+
+            let _ = client
+                .request(proto::FindSearchCandidatesChunk {
+                    handle,
+                    peer_id: Some(peer_id),
+                    project_id,
+                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
+                        proto::FindSearchCandidatesDone {},
+                    )),
+                })
+                .await?;
+            anyhow::Ok(())
+        });
+        buffer_store.update(&mut cx, |this, _| {
+            this.register_ongoing_project_search((peer_id, handle), task);
+        });
+
+        Ok(proto::Ack {})
     }
 
     async fn handle_open_buffer_by_id(

crates/project/src/project_search.rs 🔗

@@ -6,23 +6,21 @@ use std::{
     path::{Path, PathBuf},
     pin::pin,
     sync::Arc,
+    time::Duration,
 };
 
 use anyhow::Context;
 use collections::HashSet;
 use fs::Fs;
+use futures::FutureExt as _;
 use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
-use gpui::{App, AppContext, AsyncApp, Entity, Task};
+use gpui::{App, AppContext, AsyncApp, BackgroundExecutor, Entity, Priority, Task};
 use language::{Buffer, BufferSnapshot};
 use parking_lot::Mutex;
 use postage::oneshot;
 use rpc::{AnyProtoClient, proto};
-use smol::{
-    channel::{Receiver, Sender, bounded, unbounded},
-    future::FutureExt,
-};
+use smol::channel::{Receiver, Sender, bounded, unbounded};
 
-use text::BufferId;
 use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
 use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
 
@@ -249,12 +247,26 @@ impl Search {
                         remote_id,
                         models,
                     } => {
+                        let (handle, rx) = self
+                            .buffer_store
+                            .update(cx, |this, _| this.register_project_search_result_handle());
+
+                        let cancel_ongoing_search = util::defer({
+                            let client = client.clone();
+                            move || {
+                                _ = client.send(proto::FindSearchCandidatesCancelled {
+                                    project_id: remote_id,
+                                    handle,
+                                });
+                            }
+                        });
                         let request = client.request(proto::FindSearchCandidates {
                             project_id: remote_id,
                             query: Some(query.to_proto()),
                             limit: self.limit as _,
+                            handle,
                         });
-                        let weak_buffer_store = self.buffer_store.downgrade();
+
                         let buffer_store = self.buffer_store;
                         let guard = cx.update(|cx| {
                             Project::retain_remotely_created_models_impl(
@@ -268,18 +280,39 @@ impl Search {
                         let issue_remote_buffers_request = cx
                             .spawn(async move |cx| {
                                 let _ = maybe!(async move {
-                                    let response = request.await?;
-                                    for buffer_id in response.buffer_ids {
-                                        let buffer_id = BufferId::new(buffer_id)?;
-                                        let buffer = weak_buffer_store
-                                            .update(cx, |buffer_store, cx| {
-                                                buffer_store.wait_for_remote_buffer(buffer_id, cx)
-                                            })?
-                                            .await?;
-                                        let _ = grab_buffer_snapshot_tx.send(buffer).await;
-                                    }
+                                    request.await?;
+
+                                    let (buffer_tx, buffer_rx) = bounded(24);
+
+                                    let wait_for_remote_buffers = cx.spawn(async move |cx| {
+                                        while let Ok(buffer_id) = rx.recv().await {
+                                            let buffer =
+                                                buffer_store.update(cx, |buffer_store, cx| {
+                                                    buffer_store
+                                                        .wait_for_remote_buffer(buffer_id, cx)
+                                                });
+                                            buffer_tx.send(buffer).await?;
+                                        }
+                                        anyhow::Ok(())
+                                    });
+
+                                    let forward_buffers = cx.background_spawn(async move {
+                                        while let Ok(buffer) = buffer_rx.recv().await {
+                                            let _ =
+                                                grab_buffer_snapshot_tx.send(buffer.await?).await;
+                                        }
+                                        anyhow::Ok(())
+                                    });
+                                    let (left, right) = futures::future::join(
+                                        wait_for_remote_buffers,
+                                        forward_buffers,
+                                    )
+                                    .await;
+                                    left?;
+                                    right?;
 
                                     drop(guard);
+                                    cancel_ongoing_search.abort();
                                     anyhow::Ok(())
                                 })
                                 .await
@@ -894,6 +927,104 @@ impl PathInclusionMatcher {
     }
 }
 
+type IsTerminating = bool;
+/// Adaptive batcher that starts eager (small batches) and grows batch size
+/// when items arrive quickly, reducing RPC overhead while preserving low latency
+/// for slow streams.
+pub struct AdaptiveBatcher<T> {
+    items: Sender<T>,
+    flush_batch: Sender<IsTerminating>,
+    _batch_task: Task<()>,
+}
+
+impl<T: 'static + Send> AdaptiveBatcher<T> {
+    pub fn new(cx: &BackgroundExecutor) -> (Self, Receiver<Vec<T>>) {
+        let (items, rx) = unbounded();
+        let (batch_tx, batch_rx) = unbounded();
+        let (flush_batch_tx, flush_batch_rx) = unbounded();
+        let flush_batch = flush_batch_tx.clone();
+        let executor = cx.clone();
+        let _batch_task = cx.spawn_with_priority(gpui::Priority::High, async move {
+            let mut current_batch = vec![];
+            let mut items_produced_so_far = 0_u64;
+
+            let mut _schedule_flush_after_delay: Option<Task<()>> = None;
+            let _time_elapsed_since_start_of_search = std::time::Instant::now();
+            let mut flush = pin!(flush_batch_rx);
+            let mut terminating = false;
+            loop {
+                select_biased! {
+                    item = rx.recv().fuse() => {
+                        match item {
+                            Ok(new_item) => {
+                                let is_fresh_batch = current_batch.is_empty();
+                                items_produced_so_far += 1;
+                                current_batch.push(new_item);
+                                if is_fresh_batch {
+                                    // Chosen arbitrarily based on some experimentation with plots.
+                                    let desired_duration_ms = (20 * (items_produced_so_far + 2).ilog2() as u64).min(300);
+                                    let desired_duration = Duration::from_millis(desired_duration_ms);
+                                    let _executor = executor.clone();
+                                    let _flush = flush_batch_tx.clone();
+                                    let new_timer = executor.spawn_with_priority(Priority::High, async move {
+                                        _executor.timer(desired_duration).await;
+                                        _ = _flush.send(false).await;
+                                    });
+                                    _schedule_flush_after_delay = Some(new_timer);
+                                }
+                            }
+                            Err(_) => {
+                                // Items channel closed - send any remaining batch before exiting
+                                if !current_batch.is_empty() {
+                                    _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                    should_break_afterwards = flush.next() => {
+                        if !current_batch.is_empty() {
+                            _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
+                            _schedule_flush_after_delay = None;
+                        }
+                        if should_break_afterwards.unwrap_or_default() {
+                            terminating = true;
+                        }
+                    }
+                    complete => {
+                        break;
+                    }
+                }
+                if terminating {
+                    // Drain any remaining items before exiting
+                    while let Ok(new_item) = rx.try_recv() {
+                        current_batch.push(new_item);
+                    }
+                    if !current_batch.is_empty() {
+                        _ = batch_tx.send(std::mem::take(&mut current_batch)).await;
+                    }
+                    break;
+                }
+            }
+        });
+        let this = Self {
+            items,
+            _batch_task,
+            flush_batch,
+        };
+        (this, batch_rx)
+    }
+
+    pub async fn push(&self, item: T) {
+        _ = self.items.send(item).await;
+    }
+
+    pub async fn flush(self) {
+        _ = self.flush_batch.send(true).await;
+        self._batch_task.await;
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

crates/project_benchmarks/Cargo.toml 🔗

@@ -6,6 +6,7 @@ edition.workspace = true
 
 [dependencies]
 anyhow.workspace = true
+askpass.workspace = true
 clap.workspace = true
 client.workspace = true
 futures.workspace = true
@@ -14,6 +15,10 @@ http_client = { workspace = true, features = ["test-support"]}
 language.workspace = true
 node_runtime.workspace = true
 project.workspace = true
+release_channel.workspace = true
+remote = { workspace = true, features = ["build-remote-server-binary"] }
+rpassword = "7.4"
+semver.workspace = true
 settings.workspace = true
 watch.workspace = true
 

crates/project_benchmarks/src/main.rs 🔗

@@ -1,7 +1,10 @@
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
 
+use anyhow::anyhow;
+use askpass::EncryptedPassword;
 use clap::Parser;
 use client::{Client, UserStore};
+use futures::channel::oneshot;
 use gpui::{AppContext as _, Application};
 use http_client::FakeHttpClient;
 use language::LanguageRegistry;
@@ -10,13 +13,19 @@ use project::{
     Project, RealFs,
     search::{SearchQuery, SearchResult},
 };
+use release_channel::ReleaseChannel;
+use remote::{ConnectionIdentifier, RemoteClientDelegate, SshConnectionOptions};
+use semver::Version;
 
 #[derive(Parser)]
 struct Args {
     /// List of worktrees to run the search against.
     worktrees: Vec<String>,
     #[clap(short)]
-    query: String,
+    query: Option<String>,
+    /// Askpass socket for SSH authentication
+    #[clap(long)]
+    askpass: Option<String>,
     /// Treat query as a regex.
     #[clap(short, long)]
     regex: bool,
@@ -29,13 +38,72 @@ struct Args {
     /// Include gitignored files in the search.
     #[clap(long)]
     include_ignored: bool,
+    #[clap(long)]
+    ssh: Option<String>,
 }
 
+struct BenchmarkRemoteClient;
+impl RemoteClientDelegate for BenchmarkRemoteClient {
+    fn ask_password(
+        &self,
+        prompt: String,
+        tx: oneshot::Sender<EncryptedPassword>,
+        _cx: &mut gpui::AsyncApp,
+    ) {
+        eprintln!("SSH asking for password: {}", prompt);
+        match rpassword::prompt_password(&prompt) {
+            Ok(password) => match EncryptedPassword::try_from(password.as_ref()) {
+                Ok(encrypted) => {
+                    if tx.send(encrypted).is_err() {
+                        eprintln!("Failed to send password");
+                    }
+                }
+                Err(e) => eprintln!("Failed to encrypt password: {e}"),
+            },
+            Err(e) => eprintln!("Failed to read password: {e}"),
+        }
+    }
+
+    fn get_download_url(
+        &self,
+        _platform: remote::RemotePlatform,
+        _release_channel: ReleaseChannel,
+        _version: Option<Version>,
+        _cx: &mut gpui::AsyncApp,
+    ) -> gpui::Task<gpui::Result<Option<String>>> {
+        unimplemented!()
+    }
+
+    fn download_server_binary_locally(
+        &self,
+        _platform: remote::RemotePlatform,
+        _release_channel: ReleaseChannel,
+        _version: Option<Version>,
+        _cx: &mut gpui::AsyncApp,
+    ) -> gpui::Task<gpui::Result<std::path::PathBuf>> {
+        unimplemented!()
+    }
+
+    fn set_status(&self, status: Option<&str>, _: &mut gpui::AsyncApp) {
+        if let Some(status) = status {
+            println!("SSH status: {status}");
+        }
+    }
+}
 fn main() -> Result<(), anyhow::Error> {
     let args = Args::parse();
+
+    if let Some(socket) = &args.askpass {
+        askpass::main(socket);
+        return Ok(());
+    }
+
+    let query_str = args
+        .query
+        .ok_or_else(|| anyhow!("-q/--query is required"))?;
     let query = if args.regex {
         SearchQuery::regex(
-            args.query,
+            query_str,
             args.whole_word,
             args.case_sensitive,
             args.include_ignored,
@@ -47,7 +115,7 @@ fn main() -> Result<(), anyhow::Error> {
         )
     } else {
         SearchQuery::text(
-            args.query,
+            query_str,
             args.whole_word,
             args.case_sensitive,
             args.include_ignored,
@@ -58,6 +126,7 @@ fn main() -> Result<(), anyhow::Error> {
         )
     }?;
     Application::headless().run(|cx| {
+        release_channel::init_test(semver::Version::new(0, 0, 0), ReleaseChannel::Dev, cx);
         settings::init(cx);
         let client = Client::production(cx);
         let http_client = FakeHttpClient::with_200_response();
@@ -66,19 +135,35 @@ fn main() -> Result<(), anyhow::Error> {
         let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
         let registry = Arc::new(LanguageRegistry::new(cx.background_executor().clone()));
         let fs = Arc::new(RealFs::new(None, cx.background_executor().clone()));
-        let project = Project::local(
-            client,
-            node,
-            user_store,
-            registry,
-            fs,
-            Some(Default::default()),
-            false,
-            cx,
-        );
 
-        project.clone().update(cx, move |_, cx| {
-            cx.spawn(async move |_, cx| {
+
+
+            cx.spawn(async move |cx| {
+                let project = if let Some(ssh_target) = args.ssh {
+                    println!("Setting up SSH connection for {}", &ssh_target);
+                    let ssh_connection_options = SshConnectionOptions::parse_command_line(&ssh_target)?;
+
+                    let connection_options = remote::RemoteConnectionOptions::from(ssh_connection_options);
+                    let delegate = Arc::new(BenchmarkRemoteClient);
+                    let remote_connection = remote::connect(connection_options.clone(), delegate.clone(), cx).await.unwrap();
+
+                    let (_tx, rx) = oneshot::channel();
+                    let remote_client =  cx.update(|cx| remote::RemoteClient::new(ConnectionIdentifier::setup(), remote_connection, rx, delegate.clone(), cx )).await?.ok_or_else(|| anyhow!("ssh initialization returned None"))?;
+
+                    cx.update(|cx| Project::remote(remote_client,  client, node, user_store, registry, fs, false, cx))
+                } else {
+                    println!("Setting up local project");
+                    cx.update(|cx| Project::local(
+                    client,
+                    node,
+                    user_store,
+                    registry,
+                    fs,
+                    Some(Default::default()),
+                    false,
+                    cx,
+                ))
+                };
                 println!("Loading worktrees");
                 let worktrees = project.update(cx, |this, cx| {
                     args.worktrees
@@ -93,9 +178,20 @@ fn main() -> Result<(), anyhow::Error> {
                     .collect::<Result<Vec<_>, anyhow::Error>>()?;
 
                 for (worktree, _) in &worktrees {
-                    worktree
-                        .update(cx, |this, _| this.as_local().unwrap().scan_complete())
-                        .await;
+                    let scan_complete = worktree
+                        .update(cx, |this, _| {
+                            if let Some(local) = this.as_local() {
+                                Some(local.scan_complete())
+                            } else {
+                                None
+                            }
+                        });
+                    if let Some(scan_complete) = scan_complete {
+                        scan_complete.await;
+                    } else {
+                        cx.background_executor().timer(Duration::from_secs(10)).await;
+                    }
+
                 }
                 println!("Worktrees loaded");
 
@@ -127,8 +223,8 @@ fn main() -> Result<(), anyhow::Error> {
 
                 anyhow::Ok(())
             })
-            .detach();
-        });
+            .detach_and_log_err(cx);
+
     });
     Ok(())
 }

crates/proto/proto/buffer.proto 🔗

@@ -308,8 +308,28 @@ message FindSearchCandidates {
     uint64 project_id = 1;
     SearchQuery query = 2;
     uint64 limit = 3;
+    uint64 handle = 4;
 }
 
-message FindSearchCandidatesResponse {
-    repeated uint64 buffer_ids = 1;
+
+message FindSearchCandidatesDone {}
+
+message FindSearchCandidatesMatches {
+  repeated uint64 buffer_ids = 1;
+}
+
+message FindSearchCandidatesChunk {
+  uint64 project_id = 1;
+  PeerId peer_id = 2;
+  uint64 handle = 3;
+
+  oneof variant {
+    FindSearchCandidatesMatches matches = 4;
+    FindSearchCandidatesDone done = 5;
+  }
+}
+
+message FindSearchCandidatesCancelled {
+    uint64 project_id = 1;
+    uint64 handle = 2;
 }

crates/proto/proto/zed.proto 🔗

@@ -244,7 +244,6 @@ message Envelope {
         LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242;
 
         FindSearchCandidates find_search_candidates = 243;
-        FindSearchCandidatesResponse find_search_candidates_response = 244;
 
         CloseBuffer close_buffer = 245;
 
@@ -448,10 +447,13 @@ message Envelope {
 
         TrustWorktrees trust_worktrees = 404;
         RestrictWorktrees restrict_worktrees = 405;
-        
+
         ShareAgentThread share_agent_thread = 406;
         GetSharedAgentThread get_shared_agent_thread = 407;
-        GetSharedAgentThreadResponse get_shared_agent_thread_response = 408; // current max
+        GetSharedAgentThreadResponse get_shared_agent_thread_response = 408;
+
+        FindSearchCandidatesChunk find_search_candidates_chunk = 409;
+        FindSearchCandidatesCancelled find_search_candidates_cancelled = 410; // current max
     }
 
     reserved 87 to 88;
@@ -469,6 +471,7 @@ message Envelope {
     reserved 224 to 231;
     reserved 234 to 236;
     reserved 239 to 240;
+    reserved 244;
     reserved 246 to 256;
     reserved 259;
     reserved 270;

crates/proto/src/proto.rs 🔗

@@ -68,7 +68,6 @@ messages!(
     (Error, Foreground),
     (ExpandProjectEntry, Foreground),
     (ExpandProjectEntryResponse, Foreground),
-    (FindSearchCandidatesResponse, Background),
     (FindSearchCandidates, Background),
     (FlushBufferedMessages, Foreground),
     (ExpandAllForProjectEntry, Foreground),
@@ -345,7 +344,9 @@ messages!(
     (GitCreateWorktree, Background),
     (ShareAgentThread, Foreground),
     (GetSharedAgentThread, Foreground),
-    (GetSharedAgentThreadResponse, Foreground)
+    (GetSharedAgentThreadResponse, Foreground),
+    (FindSearchCandidatesChunk, Background),
+    (FindSearchCandidatesCancelled, Background),
 );
 
 request_messages!(
@@ -440,7 +441,7 @@ request_messages!(
     (RespondToContactRequest, Ack),
     (SaveBuffer, BufferSaved),
     (Stage, Ack),
-    (FindSearchCandidates, FindSearchCandidatesResponse),
+    (FindSearchCandidates, Ack),
     (SendChannelMessage, SendChannelMessageResponse),
     (SetChannelMemberRole, Ack),
     (SetChannelVisibility, Ack),
@@ -535,6 +536,7 @@ request_messages!(
     (GitCreateWorktree, Ack),
     (TrustWorktrees, Ack),
     (RestrictWorktrees, Ack),
+    (FindSearchCandidatesChunk, Ack),
 );
 
 lsp_messages!(
@@ -710,6 +712,8 @@ entity_messages!(
     GitCreateWorktree,
     TrustWorktrees,
     RestrictWorktrees,
+    FindSearchCandidatesChunk,
+    FindSearchCandidatesCancelled,
 );
 
 entity_messages!(

crates/remote_server/src/headless_project.rs 🔗

@@ -32,7 +32,6 @@ use rpc::{
 };
 
 use settings::initial_server_settings_content;
-use smol::stream::StreamExt;
 use std::{
     num::NonZeroU64,
     path::{Path, PathBuf},
@@ -288,6 +287,7 @@ impl HeadlessProject {
         session.add_entity_request_handler(Self::handle_trust_worktrees);
         session.add_entity_request_handler(Self::handle_restrict_worktrees);
 
+        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
         session.add_entity_request_handler(BufferStore::handle_update_buffer);
         session.add_entity_message_handler(BufferStore::handle_close_buffer);
 
@@ -779,41 +779,99 @@ impl HeadlessProject {
         this: Entity<Self>,
         envelope: TypedEnvelope<proto::FindSearchCandidates>,
         mut cx: AsyncApp,
-    ) -> Result<proto::FindSearchCandidatesResponse> {
+    ) -> Result<proto::Ack> {
+        use futures::stream::StreamExt as _;
+
+        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
         let message = envelope.payload;
         let query = SearchQuery::from_proto(
             message.query.context("missing query field")?,
             PathStyle::local(),
         )?;
-        let results = this.update(&mut cx, |this, cx| {
-            project::Search::local(
-                this.fs.clone(),
-                this.buffer_store.clone(),
-                this.worktree_store.clone(),
-                message.limit as _,
-                cx,
-            )
-            .into_handle(query, cx)
-            .matching_buffers(cx)
-        });
-
-        let mut response = proto::FindSearchCandidatesResponse {
-            buffer_ids: Vec::new(),
-        };
 
+        let project_id = message.project_id;
         let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
+        let handle = message.handle;
+        let _buffer_store = buffer_store.clone();
+        let client = this.read_with(&cx, |this, _| this.session.clone());
+        let task = cx.spawn(async move |cx| {
+            let results = this.update(cx, |this, cx| {
+                project::Search::local(
+                    this.fs.clone(),
+                    this.buffer_store.clone(),
+                    this.worktree_store.clone(),
+                    message.limit as _,
+                    cx,
+                )
+                .into_handle(query, cx)
+                .matching_buffers(cx)
+            });
+            let (batcher, batches) =
+                project::project_search::AdaptiveBatcher::new(cx.background_executor());
+            let mut new_matches = Box::pin(results.rx);
+
+            let sender_task = cx.background_executor().spawn({
+                let client = client.clone();
+                async move {
+                    let mut batches = std::pin::pin!(batches);
+                    while let Some(buffer_ids) = batches.next().await {
+                        client
+                            .request(proto::FindSearchCandidatesChunk {
+                                handle,
+                                peer_id: Some(peer_id),
+                                project_id,
+                                variant: Some(
+                                    proto::find_search_candidates_chunk::Variant::Matches(
+                                        proto::FindSearchCandidatesMatches { buffer_ids },
+                                    ),
+                                ),
+                            })
+                            .await?;
+                    }
+                    anyhow::Ok(())
+                }
+            });
 
-        while let Ok(buffer) = results.rx.recv().await {
-            let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id());
-            response.buffer_ids.push(buffer_id.to_proto());
-            buffer_store
-                .update(&mut cx, |buffer_store, cx| {
-                    buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
+            while let Some(buffer) = new_matches.next().await {
+                let _ = buffer_store
+                    .update(cx, |this, cx| {
+                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
+                    })
+                    .await;
+                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
+                batcher.push(buffer_id).await;
+            }
+            batcher.flush().await;
+
+            sender_task.await?;
+
+            client
+                .request(proto::FindSearchCandidatesChunk {
+                    handle,
+                    peer_id: Some(peer_id),
+                    project_id,
+                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
+                        proto::FindSearchCandidatesDone {},
+                    )),
                 })
                 .await?;
-        }
+            anyhow::Ok(())
+        });
+        _buffer_store.update(&mut cx, |this, _| {
+            this.register_ongoing_project_search((peer_id, handle), task);
+        });
 
-        Ok(response)
+        Ok(proto::Ack {})
+    }
+
+    // Goes from client to host.
+    async fn handle_find_search_candidates_cancel(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
+        mut cx: AsyncApp,
+    ) -> Result<()> {
+        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
+        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
     }
 
     async fn handle_list_remote_directory(
@@ -821,6 +879,7 @@ impl HeadlessProject {
         envelope: TypedEnvelope<proto::ListRemoteDirectory>,
         cx: AsyncApp,
     ) -> Result<proto::ListRemoteDirectoryResponse> {
+        use smol::stream::StreamExt;
         let fs = cx.read_entity(&this, |this, _| this.fs.clone());
         let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
         let check_info = envelope

crates/remote_server/src/remote_editing_tests.rs 🔗

@@ -237,11 +237,12 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
         assert!(headless.buffer_store.read(cx).has_shared_buffers())
     });
     do_search(&project, cx.clone()).await;
-
+    server_cx.run_until_parked();
     cx.update(|_| {
         drop(buffer);
     });
     cx.run_until_parked();
+    server_cx.run_until_parked();
     headless.update(server_cx, |headless, cx| {
         assert!(!headless.buffer_store.read(cx).has_shared_buffers())
     });