flush embeddings queue when no files are parsed for 250 milliseconds

KCaverly and Antonio created

Co-authored-by: Antonio <antonio@zed.dev>

Change summary

crates/semantic_index/src/semantic_index.rs       | 50 +++++++++-------
crates/semantic_index/src/semantic_index_tests.rs | 12 ++--
2 files changed, 33 insertions(+), 29 deletions(-)

Detailed changes

crates/semantic_index/src/semantic_index.rs 🔗

@@ -12,6 +12,7 @@ use anyhow::{anyhow, Result};
 use db::VectorDatabase;
 use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
 use embedding_queue::{EmbeddingQueue, FileToEmbed};
+use futures::{FutureExt, StreamExt};
 use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 use language::{Anchor, Buffer, Language, LanguageRegistry};
 use parking_lot::Mutex;
@@ -39,6 +40,7 @@ use workspace::WorkspaceCreated;
 
 const SEMANTIC_INDEX_VERSION: usize = 8;
 const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(600);
+const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
 
 pub fn init(
     fs: Arc<dyn Fs>,
@@ -253,24 +255,34 @@ impl SemanticIndex {
             let mut _parsing_files_tasks = Vec::new();
             for _ in 0..cx.background().num_cpus() {
                 let fs = fs.clone();
-                let parsing_files_rx = parsing_files_rx.clone();
+                let mut parsing_files_rx = parsing_files_rx.clone();
                 let embedding_provider = embedding_provider.clone();
                 let embedding_queue = embedding_queue.clone();
-                let db = db.clone();
+                let background = cx.background().clone();
                 _parsing_files_tasks.push(cx.background().spawn(async move {
                     let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
-                    while let Ok((embeddings_for_digest, pending_file)) =
-                        parsing_files_rx.recv().await
-                    {
-                        Self::parse_file(
-                            &fs,
-                            pending_file,
-                            &mut retriever,
-                            &embedding_queue,
-                            &parsing_files_rx,
-                            &embeddings_for_digest,
-                        )
-                        .await;
+                    loop {
+                        let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
+                        let mut next_file_to_parse = parsing_files_rx.next().fuse();
+                        futures::select_biased! {
+                            next_file_to_parse = next_file_to_parse => {
+                                if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
+                                    Self::parse_file(
+                                        &fs,
+                                        pending_file,
+                                        &mut retriever,
+                                        &embedding_queue,
+                                        &embeddings_for_digest,
+                                    )
+                                    .await
+                                } else {
+                                    break;
+                                }
+                            },
+                            _ = timer => {
+                                embedding_queue.lock().flush();
+                            }
+                        }
                     }
                 }));
             }
@@ -297,10 +309,6 @@ impl SemanticIndex {
         pending_file: PendingFile,
         retriever: &mut CodeContextRetriever,
         embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
-        parsing_files_rx: &channel::Receiver<(
-            Arc<HashMap<DocumentDigest, Embedding>>,
-            PendingFile,
-        )>,
         embeddings_for_digest: &HashMap<DocumentDigest, Embedding>,
     ) {
         let Some(language) = pending_file.language else {
@@ -333,10 +341,6 @@ impl SemanticIndex {
                 });
             }
         }
-
-        if parsing_files_rx.len() == 0 {
-            embedding_queue.lock().flush();
-        }
     }
 
     pub fn project_previously_indexed(
@@ -581,7 +585,7 @@ impl SemanticIndex {
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
         cx.spawn(|this, mut cx| async move {
-            let embeddings_for_digest = this.read_with(&cx, |this, cx| {
+            let embeddings_for_digest = this.read_with(&cx, |this, _| {
                 if let Some(state) = this.projects.get(&project.downgrade()) {
                     let mut worktree_id_file_paths = HashMap::default();
                     for (path, _) in &state.changed_paths {

crates/semantic_index/src/semantic_index_tests.rs 🔗

@@ -3,11 +3,11 @@ use crate::{
     embedding_queue::EmbeddingQueue,
     parsing::{subtract_ranges, CodeContextRetriever, Document, DocumentDigest},
     semantic_index_settings::SemanticIndexSettings,
-    FileToEmbed, JobHandle, SearchResult, SemanticIndex,
+    FileToEmbed, JobHandle, SearchResult, SemanticIndex, EMBEDDING_QUEUE_FLUSH_TIMEOUT,
 };
 use anyhow::Result;
 use async_trait::async_trait;
-use gpui::{Task, TestAppContext};
+use gpui::{executor::Deterministic, Task, TestAppContext};
 use language::{Language, LanguageConfig, LanguageRegistry, ToOffset};
 use parking_lot::Mutex;
 use pretty_assertions::assert_eq;
@@ -34,7 +34,7 @@ fn init_logger() {
 }
 
 #[gpui::test]
-async fn test_semantic_index(cx: &mut TestAppContext) {
+async fn test_semantic_index(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
     init_test(cx);
 
     let fs = FakeFs::new(cx.background());
@@ -98,7 +98,7 @@ async fn test_semantic_index(cx: &mut TestAppContext) {
         .await
         .unwrap();
     assert_eq!(file_count, 3);
-    cx.foreground().run_until_parked();
+    deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT);
     assert_eq!(*outstanding_file_count.borrow(), 0);
 
     let search_results = semantic_index
@@ -188,7 +188,7 @@ async fn test_semantic_index(cx: &mut TestAppContext) {
     .await
     .unwrap();
 
-    cx.foreground().run_until_parked();
+    deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT);
 
     let prev_embedding_count = embedding_provider.embedding_count();
     let (file_count, outstanding_file_count) = semantic_index
@@ -197,7 +197,7 @@ async fn test_semantic_index(cx: &mut TestAppContext) {
         .unwrap();
     assert_eq!(file_count, 1);
 
-    cx.foreground().run_until_parked();
+    deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT);
     assert_eq!(*outstanding_file_count.borrow(), 0);
 
     assert_eq!(