project search: Stream result buffers sooner in remote scenarios

Piotr Osiewicz and Smit Barmase created

This improves latency, but kills throughput of remote search.

Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>

Change summary

crates/project/src/buffer_store.rs           | 51 ++++++++++++++
crates/project/src/project.rs                | 62 +++++++++++++++--
crates/project/src/project_search.rs         | 12 ++
crates/proto/proto/buffer.proto              | 17 ++++
crates/proto/proto/zed.proto                 |  4 
crates/proto/src/proto.rs                    |  5 +
crates/remote_server/src/headless_project.rs | 76 +++++++++++++++------
7 files changed, 188 insertions(+), 39 deletions(-)

Detailed changes

crates/project/src/buffer_store.rs 🔗

@@ -38,6 +38,8 @@ pub struct BufferStore {
     downstream_client: Option<(AnyProtoClient, u64)>,
     shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
     non_searchable_buffers: HashSet<BufferId>,
+    project_search_chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
+    pub(crate) next_project_search_id: u64,
 }
 
 #[derive(Hash, Eq, PartialEq, Clone)]
@@ -756,6 +758,8 @@ impl BufferStore {
             loading_buffers: Default::default(),
             non_searchable_buffers: Default::default(),
             worktree_store,
+            project_search_chunks: Default::default(),
+            next_project_search_id: 0,
         }
     }
 
@@ -781,6 +785,8 @@ impl BufferStore {
             shared_buffers: Default::default(),
             non_searchable_buffers: Default::default(),
             worktree_store,
+            project_search_chunks: Default::default(),
+            next_project_search_id: 0,
         }
     }
 
@@ -1673,6 +1679,51 @@ impl BufferStore {
         }
         serialized_transaction
     }
+
+    pub(crate) fn register_project_search_result_handle(
+        &mut self,
+        handle: u64,
+        sender: smol::channel::Sender<BufferId>,
+    ) {
+        self.project_search_chunks.insert(handle, sender);
+    }
+
+    pub(crate) async fn handle_find_search_candidates_chunk(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
+        mut cx: AsyncApp,
+    ) -> Result<proto::Ack> {
+        use proto::find_search_candidates_chunk::Variant;
+        let handle = envelope.payload.handle;
+
+        let buffer_ids = match envelope
+            .payload
+            .variant
+            .context("Expected non-null variant")?
+        {
+            Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
+                .buffer_ids
+                .into_iter()
+                .filter_map(|buffer_id| BufferId::new(buffer_id).ok())
+                .collect::<Vec<_>>(),
+            Variant::Done(_) => {
+                this.update(&mut cx, |this, _| {
+                    this.project_search_chunks.remove(&handle)
+                })?;
+                return Ok(proto::Ack {});
+            }
+        };
+        let Some(sender) = this.read_with(&mut cx, |this, _| {
+            this.project_search_chunks.get(&handle).cloned()
+        })?
+        else {
+            return Ok(proto::Ack {});
+        };
+        for buffer_id in buffer_ids {
+            sender.send(buffer_id).await?;
+        }
+        Ok(proto::Ack {})
+    }
 }
 
 impl OpenBuffer {

crates/project/src/project.rs 🔗

@@ -1488,6 +1488,7 @@ impl Project {
             remote_proto.add_entity_request_handler(Self::handle_update_buffer_from_remote_server);
             remote_proto.add_entity_request_handler(Self::handle_trust_worktrees);
             remote_proto.add_entity_request_handler(Self::handle_restrict_worktrees);
+            remote_proto.add_entity_request_handler(Self::handle_find_search_candidates_chunk);
 
             BufferStore::init(&remote_proto);
             LspStore::init(&remote_proto);
@@ -4912,6 +4913,15 @@ impl Project {
         Ok(proto::Ack {})
     }
 
+    async fn handle_find_search_candidates_chunk(
+        this: Entity<Self>,
+        envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
+        mut cx: AsyncApp,
+    ) -> Result<proto::Ack> {
+        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone())?;
+        BufferStore::handle_find_search_candidates_chunk(buffer_store, envelope, cx).await
+    }
+
     async fn handle_update_buffer(
         this: Entity<Self>,
         envelope: TypedEnvelope<proto::UpdateBuffer>,
@@ -5022,23 +5032,55 @@ impl Project {
     ) -> Result<proto::FindSearchCandidatesResponse> {
         let peer_id = envelope.original_sender_id()?;
         let message = envelope.payload;
+        let project_id = message.project_id;
         let path_style = this.read_with(&cx, |this, cx| this.path_style(cx))?;
         let query =
             SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
-        let results = this.update(&mut cx, |this, cx| {
-            this.search_impl(query, cx).matching_buffers(cx)
+
+        let next_project_search_id = this.update(&mut cx, |this, cx| {
+            this.buffer_store.update(cx, |this, _| {
+                util::post_inc(&mut this.next_project_search_id)
+            })
         })?;
 
-        let mut response = proto::FindSearchCandidatesResponse {
-            buffer_ids: Vec::new(),
+        let response = proto::FindSearchCandidatesResponse {
+            handle: next_project_search_id,
         };
-
-        while let Ok(buffer) = results.recv().await {
-            this.update(&mut cx, |this, cx| {
-                let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
-                response.buffer_ids.push(buffer_id.to_proto());
+        let client = this.read_with(&cx, |this, _| this.client())?;
+        cx.spawn(async move |cx| {
+            let results = this.update(cx, |this, cx| {
+                this.search_impl(query, cx).matching_buffers(cx)
             })?;
-        }
+            let mut buffer_ids = vec![];
+            while let Ok(buffer) = results.recv().await {
+                this.update(cx, |this, cx| {
+                    let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
+                    buffer_ids.push(buffer_id.to_proto());
+                })?;
+                let _ = client
+                    .request(proto::FindSearchCandidatesChunk {
+                        handle: next_project_search_id,
+                        project_id,
+                        variant: Some(proto::find_search_candidates_chunk::Variant::Matches(
+                            proto::FindSearchCandidatesMatches {
+                                buffer_ids: std::mem::take(&mut buffer_ids),
+                            },
+                        )),
+                    })
+                    .await?;
+            }
+            let _ = client
+                .request(proto::FindSearchCandidatesChunk {
+                    handle: next_project_search_id,
+                    project_id,
+                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
+                        proto::FindSearchCandidatesDone {},
+                    )),
+                })
+                .await?;
+            anyhow::Ok(())
+        })
+        .detach();
 
         Ok(response)
     }

crates/project/src/project_search.rs 🔗

@@ -22,7 +22,6 @@ use smol::{
     future::FutureExt,
 };
 
-use text::BufferId;
 use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
 use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
 
@@ -263,8 +262,15 @@ impl Search {
                             .spawn(async move |cx| {
                                 let _ = maybe!(async move {
                                     let response = request.await?;
-                                    for buffer_id in response.buffer_ids {
-                                        let buffer_id = BufferId::new(buffer_id)?;
+                                    let (tx, rx) = unbounded();
+                                    buffer_store.update(cx, |this, _| {
+                                        this.register_project_search_result_handle(
+                                            response.handle,
+                                            tx,
+                                        );
+                                    })?;
+
+                                    while let Ok(buffer_id) = rx.recv().await {
                                         let buffer = buffer_store
                                             .update(cx, |buffer_store, cx| {
                                                 buffer_store.wait_for_remote_buffer(buffer_id, cx)

crates/proto/proto/buffer.proto 🔗

@@ -311,5 +311,20 @@ message FindSearchCandidates {
 }
 
 message FindSearchCandidatesResponse {
-    repeated uint64 buffer_ids = 1;
+  uint64 handle = 1;
+}
+
+message FindSearchCandidatesDone {}
+
+message FindSearchCandidatesMatches {
+  repeated uint64 buffer_ids = 1;
+}
+message FindSearchCandidatesChunk {
+  uint64 project_id = 1;
+  uint64 handle = 2;
+
+  oneof variant {
+    FindSearchCandidatesMatches matches = 3;
+    FindSearchCandidatesDone done = 4;
+  }
 }

crates/proto/proto/zed.proto 🔗

@@ -451,7 +451,9 @@ message Envelope {
         GitRemoveRemote git_remove_remote = 403;
 
         TrustWorktrees trust_worktrees = 404;
-        RestrictWorktrees restrict_worktrees = 405; // current max
+        RestrictWorktrees restrict_worktrees = 405;
+
+        FindSearchCandidatesChunk find_search_candidates_chunk = 406; // current max
     }
 
     reserved 87 to 88, 396;

crates/proto/src/proto.rs 🔗

@@ -345,7 +345,8 @@ messages!(
     (RemoteStarted, Background),
     (GitGetWorktrees, Background),
     (GitWorktreesResponse, Background),
-    (GitCreateWorktree, Background)
+    (GitCreateWorktree, Background),
+    (FindSearchCandidatesChunk, Background)
 );
 
 request_messages!(
@@ -534,6 +535,7 @@ request_messages!(
     (GitCreateWorktree, Ack),
     (TrustWorktrees, Ack),
     (RestrictWorktrees, Ack),
+    (FindSearchCandidatesChunk, Ack),
 );
 
 lsp_messages!(
@@ -709,6 +711,7 @@ entity_messages!(
     GitCreateWorktree,
     TrustWorktrees,
     RestrictWorktrees,
+    FindSearchCandidatesChunk,
 );
 
 entity_messages!(

crates/remote_server/src/headless_project.rs 🔗

@@ -771,33 +771,63 @@ impl HeadlessProject {
             message.query.context("missing query field")?,
             PathStyle::local(),
         )?;
-        let results = this.update(&mut cx, |this, cx| {
-            project::Search::local(
-                this.fs.clone(),
-                this.buffer_store.clone(),
-                this.worktree_store.clone(),
-                message.limit as _,
-                cx,
-            )
-            .into_handle(query, cx)
-            .matching_buffers(cx)
-        })?;
-
-        let mut response = proto::FindSearchCandidatesResponse {
-            buffer_ids: Vec::new(),
-        };
 
+        let project_id = message.project_id;
         let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
+        let next_project_search_id = this.update(&mut cx, |this, cx| {
+            this.buffer_store.update(cx, |this, _| {
+                util::post_inc(&mut this.next_project_search_id)
+            })
+        })?;
 
-        while let Ok(buffer) = results.recv().await {
-            let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
-            response.buffer_ids.push(buffer_id.to_proto());
-            buffer_store
-                .update(&mut cx, |buffer_store, cx| {
-                    buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
-                })?
+        let response = proto::FindSearchCandidatesResponse {
+            handle: next_project_search_id,
+        };
+        let client = this.read_with(&cx, |this, _| this.session.clone())?;
+        cx.spawn(async move |cx| {
+            let results = this.update(cx, |this, cx| {
+                project::Search::local(
+                    this.fs.clone(),
+                    this.buffer_store.clone(),
+                    this.worktree_store.clone(),
+                    message.limit as _,
+                    cx,
+                )
+                .into_handle(query, cx)
+                .matching_buffers(cx)
+            })?;
+            let mut buffer_ids = vec![];
+            while let Ok(buffer) = results.recv().await {
+                buffer_store
+                    .update(cx, |buffer_store, cx| {
+                        buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
+                    })?
+                    .await?;
+                buffer_ids.push(buffer.read_with(cx, |this, _| this.remote_id().to_proto())?);
+                let _ = client
+                    .request(proto::FindSearchCandidatesChunk {
+                        handle: next_project_search_id,
+                        project_id,
+                        variant: Some(proto::find_search_candidates_chunk::Variant::Matches(
+                            proto::FindSearchCandidatesMatches {
+                                buffer_ids: std::mem::take(&mut buffer_ids),
+                            },
+                        )),
+                    })
+                    .await?;
+            }
+            let _ = client
+                .request(proto::FindSearchCandidatesChunk {
+                    handle: next_project_search_id,
+                    project_id,
+                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
+                        proto::FindSearchCandidatesDone {},
+                    )),
+                })
                 .await?;
-        }
+            anyhow::Ok(())
+        })
+        .detach();
 
         Ok(response)
     }