semantic_index.rs

  1mod db;
  2mod embedding;
  3mod parsing;
  4pub mod semantic_index_settings;
  5
  6#[cfg(test)]
  7mod semantic_index_tests;
  8
  9use crate::semantic_index_settings::SemanticIndexSettings;
 10use anyhow::{anyhow, Result};
 11use db::VectorDatabase;
 12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
 13use futures::{channel::oneshot, Future};
 14use globset::GlobMatcher;
 15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 16use language::{Anchor, Buffer, Language, LanguageRegistry};
 17use parking_lot::Mutex;
 18use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
 19use postage::watch;
 20use project::{Fs, Project, WorktreeId};
 21use smol::channel;
 22use std::{
 23    cmp::Ordering,
 24    collections::HashMap,
 25    mem,
 26    ops::Range,
 27    path::{Path, PathBuf},
 28    sync::{Arc, Weak},
 29    time::{Instant, SystemTime},
 30};
 31use util::{
 32    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
 33    http::HttpClient,
 34    paths::EMBEDDINGS_DIR,
 35    ResultExt,
 36};
 37
 38const SEMANTIC_INDEX_VERSION: usize = 6;
 39const EMBEDDINGS_BATCH_SIZE: usize = 80;
 40
 41pub fn init(
 42    fs: Arc<dyn Fs>,
 43    http_client: Arc<dyn HttpClient>,
 44    language_registry: Arc<LanguageRegistry>,
 45    cx: &mut AppContext,
 46) {
 47    settings::register::<SemanticIndexSettings>(cx);
 48
 49    let db_file_path = EMBEDDINGS_DIR
 50        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
 51        .join("embeddings_db");
 52
 53    if *RELEASE_CHANNEL == ReleaseChannel::Stable
 54        || !settings::get::<SemanticIndexSettings>(cx).enabled
 55    {
 56        return;
 57    }
 58
 59    cx.spawn(move |mut cx| async move {
 60        let semantic_index = SemanticIndex::new(
 61            fs,
 62            db_file_path,
 63            Arc::new(OpenAIEmbeddings {
 64                client: http_client,
 65                executor: cx.background(),
 66            }),
 67            language_registry,
 68            cx.clone(),
 69        )
 70        .await?;
 71
 72        cx.update(|cx| {
 73            cx.set_global(semantic_index.clone());
 74        });
 75
 76        anyhow::Ok(())
 77    })
 78    .detach();
 79}
 80
 81pub struct SemanticIndex {
 82    fs: Arc<dyn Fs>,
 83    database_url: Arc<PathBuf>,
 84    embedding_provider: Arc<dyn EmbeddingProvider>,
 85    language_registry: Arc<LanguageRegistry>,
 86    db_update_tx: channel::Sender<DbOperation>,
 87    parsing_files_tx: channel::Sender<PendingFile>,
 88    _db_update_task: Task<()>,
 89    _embed_batch_tasks: Vec<Task<()>>,
 90    _batch_files_task: Task<()>,
 91    _parsing_files_tasks: Vec<Task<()>>,
 92    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 93}
 94
 95struct ProjectState {
 96    worktree_db_ids: Vec<(WorktreeId, i64)>,
 97    outstanding_job_count_rx: watch::Receiver<usize>,
 98    _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
 99}
100
101struct JobHandle {
102    tx: Weak<Mutex<watch::Sender<usize>>>,
103}
104
105impl ProjectState {
106    fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
107        self.worktree_db_ids
108            .iter()
109            .find_map(|(worktree_id, db_id)| {
110                if *worktree_id == id {
111                    Some(*db_id)
112                } else {
113                    None
114                }
115            })
116    }
117
118    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
119        self.worktree_db_ids
120            .iter()
121            .find_map(|(worktree_id, db_id)| {
122                if *db_id == id {
123                    Some(*worktree_id)
124                } else {
125                    None
126                }
127            })
128    }
129}
130
131pub struct PendingFile {
132    worktree_db_id: i64,
133    relative_path: PathBuf,
134    absolute_path: PathBuf,
135    language: Arc<Language>,
136    modified_time: SystemTime,
137    job_handle: JobHandle,
138}
139
140pub struct SearchResult {
141    pub buffer: ModelHandle<Buffer>,
142    pub range: Range<Anchor>,
143}
144
145enum DbOperation {
146    InsertFile {
147        worktree_id: i64,
148        documents: Vec<Document>,
149        path: PathBuf,
150        mtime: SystemTime,
151        job_handle: JobHandle,
152    },
153    Delete {
154        worktree_id: i64,
155        path: PathBuf,
156    },
157    FindOrCreateWorktree {
158        path: PathBuf,
159        sender: oneshot::Sender<Result<i64>>,
160    },
161    FileMTimes {
162        worktree_id: i64,
163        sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
164    },
165    WorktreePreviouslyIndexed {
166        path: Arc<Path>,
167        sender: oneshot::Sender<Result<bool>>,
168    },
169}
170
171enum EmbeddingJob {
172    Enqueue {
173        worktree_id: i64,
174        path: PathBuf,
175        mtime: SystemTime,
176        documents: Vec<Document>,
177        job_handle: JobHandle,
178    },
179    Flush,
180}
181
182impl SemanticIndex {
183    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
184        if cx.has_global::<ModelHandle<Self>>() {
185            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
186        } else {
187            None
188        }
189    }
190
191    pub fn enabled(cx: &AppContext) -> bool {
192        settings::get::<SemanticIndexSettings>(cx).enabled
193            && *RELEASE_CHANNEL != ReleaseChannel::Stable
194    }
195
196    async fn new(
197        fs: Arc<dyn Fs>,
198        database_url: PathBuf,
199        embedding_provider: Arc<dyn EmbeddingProvider>,
200        language_registry: Arc<LanguageRegistry>,
201        mut cx: AsyncAppContext,
202    ) -> Result<ModelHandle<Self>> {
203        let t0 = Instant::now();
204        let database_url = Arc::new(database_url);
205
206        let db = cx
207            .background()
208            .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
209            .await?;
210
211        log::trace!(
212            "db initialization took {:?} milliseconds",
213            t0.elapsed().as_millis()
214        );
215
216        Ok(cx.add_model(|cx| {
217            let t0 = Instant::now();
218            // Perform database operations
219            let (db_update_tx, db_update_rx) = channel::unbounded();
220            let _db_update_task = cx.background().spawn({
221                async move {
222                    while let Ok(job) = db_update_rx.recv().await {
223                        Self::run_db_operation(&db, job)
224                    }
225                }
226            });
227
228            // Group documents into batches and send them to the embedding provider.
229            let (embed_batch_tx, embed_batch_rx) =
230                channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
231            let mut _embed_batch_tasks = Vec::new();
232            for _ in 0..cx.background().num_cpus() {
233                let embed_batch_rx = embed_batch_rx.clone();
234                _embed_batch_tasks.push(cx.background().spawn({
235                    let db_update_tx = db_update_tx.clone();
236                    let embedding_provider = embedding_provider.clone();
237                    async move {
238                        while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
239                            Self::compute_embeddings_for_batch(
240                                embeddings_queue,
241                                &embedding_provider,
242                                &db_update_tx,
243                            )
244                            .await;
245                        }
246                    }
247                }));
248            }
249
250            // Group documents into batches and send them to the embedding provider.
251            let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
252            let _batch_files_task = cx.background().spawn(async move {
253                let mut queue_len = 0;
254                let mut embeddings_queue = vec![];
255                while let Ok(job) = batch_files_rx.recv().await {
256                    Self::enqueue_documents_to_embed(
257                        job,
258                        &mut queue_len,
259                        &mut embeddings_queue,
260                        &embed_batch_tx,
261                    );
262                }
263            });
264
265            // Parse files into embeddable documents.
266            let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
267            let mut _parsing_files_tasks = Vec::new();
268            for _ in 0..cx.background().num_cpus() {
269                let fs = fs.clone();
270                let parsing_files_rx = parsing_files_rx.clone();
271                let batch_files_tx = batch_files_tx.clone();
272                let db_update_tx = db_update_tx.clone();
273                _parsing_files_tasks.push(cx.background().spawn(async move {
274                    let mut retriever = CodeContextRetriever::new();
275                    while let Ok(pending_file) = parsing_files_rx.recv().await {
276                        Self::parse_file(
277                            &fs,
278                            pending_file,
279                            &mut retriever,
280                            &batch_files_tx,
281                            &parsing_files_rx,
282                            &db_update_tx,
283                        )
284                        .await;
285                    }
286                }));
287            }
288
289            log::trace!(
290                "semantic index task initialization took {:?} milliseconds",
291                t0.elapsed().as_millis()
292            );
293            Self {
294                fs,
295                database_url,
296                embedding_provider,
297                language_registry,
298                db_update_tx,
299                parsing_files_tx,
300                _db_update_task,
301                _embed_batch_tasks,
302                _batch_files_task,
303                _parsing_files_tasks,
304                projects: HashMap::new(),
305            }
306        }))
307    }
308
309    fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
310        match job {
311            DbOperation::InsertFile {
312                worktree_id,
313                documents,
314                path,
315                mtime,
316                job_handle,
317            } => {
318                db.insert_file(worktree_id, path, mtime, documents)
319                    .log_err();
320                drop(job_handle)
321            }
322            DbOperation::Delete { worktree_id, path } => {
323                db.delete_file(worktree_id, path).log_err();
324            }
325            DbOperation::FindOrCreateWorktree { path, sender } => {
326                let id = db.find_or_create_worktree(&path);
327                sender.send(id).ok();
328            }
329            DbOperation::FileMTimes {
330                worktree_id: worktree_db_id,
331                sender,
332            } => {
333                let file_mtimes = db.get_file_mtimes(worktree_db_id);
334                sender.send(file_mtimes).ok();
335            }
336            DbOperation::WorktreePreviouslyIndexed { path, sender } => {
337                let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
338                sender.send(worktree_indexed).ok();
339            }
340        }
341    }
342
343    async fn compute_embeddings_for_batch(
344        mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
345        embedding_provider: &Arc<dyn EmbeddingProvider>,
346        db_update_tx: &channel::Sender<DbOperation>,
347    ) {
348        let mut batch_documents = vec![];
349        for (_, documents, _, _, _) in embeddings_queue.iter() {
350            batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
351        }
352
353        if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
354            log::trace!(
355                "created {} embeddings for {} files",
356                embeddings.len(),
357                embeddings_queue.len(),
358            );
359
360            let mut i = 0;
361            let mut j = 0;
362
363            for embedding in embeddings.iter() {
364                while embeddings_queue[i].1.len() == j {
365                    i += 1;
366                    j = 0;
367                }
368
369                embeddings_queue[i].1[j].embedding = embedding.to_owned();
370                j += 1;
371            }
372
373            for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
374                db_update_tx
375                    .send(DbOperation::InsertFile {
376                        worktree_id,
377                        documents,
378                        path,
379                        mtime,
380                        job_handle,
381                    })
382                    .await
383                    .unwrap();
384            }
385        }
386    }
387
388    fn enqueue_documents_to_embed(
389        job: EmbeddingJob,
390        queue_len: &mut usize,
391        embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
392        embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
393    ) {
394        let should_flush = match job {
395            EmbeddingJob::Enqueue {
396                documents,
397                worktree_id,
398                path,
399                mtime,
400                job_handle,
401            } => {
402                *queue_len += &documents.len();
403                embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
404                *queue_len >= EMBEDDINGS_BATCH_SIZE
405            }
406            EmbeddingJob::Flush => true,
407        };
408
409        if should_flush {
410            embed_batch_tx
411                .try_send(mem::take(embeddings_queue))
412                .unwrap();
413            *queue_len = 0;
414        }
415    }
416
417    async fn parse_file(
418        fs: &Arc<dyn Fs>,
419        pending_file: PendingFile,
420        retriever: &mut CodeContextRetriever,
421        batch_files_tx: &channel::Sender<EmbeddingJob>,
422        parsing_files_rx: &channel::Receiver<PendingFile>,
423        db_update_tx: &channel::Sender<DbOperation>,
424    ) {
425        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
426            if let Some(documents) = retriever
427                .parse_file_with_template(
428                    &pending_file.relative_path,
429                    &content,
430                    pending_file.language,
431                )
432                .log_err()
433            {
434                log::trace!(
435                    "parsed path {:?}: {} documents",
436                    pending_file.relative_path,
437                    documents.len()
438                );
439
440                if documents.len() == 0 {
441                    db_update_tx
442                        .send(DbOperation::InsertFile {
443                            worktree_id: pending_file.worktree_db_id,
444                            documents,
445                            path: pending_file.relative_path,
446                            mtime: pending_file.modified_time,
447                            job_handle: pending_file.job_handle,
448                        })
449                        .await
450                        .unwrap();
451                } else {
452                    batch_files_tx
453                        .try_send(EmbeddingJob::Enqueue {
454                            worktree_id: pending_file.worktree_db_id,
455                            path: pending_file.relative_path,
456                            mtime: pending_file.modified_time,
457                            job_handle: pending_file.job_handle,
458                            documents,
459                        })
460                        .unwrap();
461                }
462            }
463        }
464
465        if parsing_files_rx.len() == 0 {
466            batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
467        }
468    }
469
470    fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
471        let (tx, rx) = oneshot::channel();
472        self.db_update_tx
473            .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
474            .unwrap();
475        async move { rx.await? }
476    }
477
478    fn get_file_mtimes(
479        &self,
480        worktree_id: i64,
481    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
482        let (tx, rx) = oneshot::channel();
483        self.db_update_tx
484            .try_send(DbOperation::FileMTimes {
485                worktree_id,
486                sender: tx,
487            })
488            .unwrap();
489        async move { rx.await? }
490    }
491
492    fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
493        let (tx, rx) = oneshot::channel();
494        self.db_update_tx
495            .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
496            .unwrap();
497        async move { rx.await? }
498    }
499
500    pub fn project_previously_indexed(
501        &mut self,
502        project: ModelHandle<Project>,
503        cx: &mut ModelContext<Self>,
504    ) -> Task<Result<bool>> {
505        let worktree_scans_complete = project
506            .read(cx)
507            .worktrees(cx)
508            .map(|worktree| {
509                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
510                async move {
511                    scan_complete.await;
512                }
513            })
514            .collect::<Vec<_>>();
515
516        let worktrees_indexed_previously = project
517            .read(cx)
518            .worktrees(cx)
519            .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
520            .collect::<Vec<_>>();
521
522        cx.spawn(|_, _cx| async move {
523            futures::future::join_all(worktree_scans_complete).await;
524
525            let worktree_indexed_previously =
526                futures::future::join_all(worktrees_indexed_previously).await;
527
528            Ok(worktree_indexed_previously
529                .iter()
530                .filter(|worktree| worktree.is_ok())
531                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
532        })
533    }
534
535    pub fn index_project(
536        &mut self,
537        project: ModelHandle<Project>,
538        cx: &mut ModelContext<Self>,
539    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
540        let t0 = Instant::now();
541        let worktree_scans_complete = project
542            .read(cx)
543            .worktrees(cx)
544            .map(|worktree| {
545                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
546                async move {
547                    scan_complete.await;
548                }
549            })
550            .collect::<Vec<_>>();
551        let worktree_db_ids = project
552            .read(cx)
553            .worktrees(cx)
554            .map(|worktree| {
555                self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
556            })
557            .collect::<Vec<_>>();
558
559        let language_registry = self.language_registry.clone();
560        let db_update_tx = self.db_update_tx.clone();
561        let parsing_files_tx = self.parsing_files_tx.clone();
562
563        cx.spawn(|this, mut cx| async move {
564            futures::future::join_all(worktree_scans_complete).await;
565
566            let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
567
568            let worktrees = project.read_with(&cx, |project, cx| {
569                project
570                    .worktrees(cx)
571                    .map(|worktree| worktree.read(cx).snapshot())
572                    .collect::<Vec<_>>()
573            });
574
575            let mut worktree_file_mtimes = HashMap::new();
576            let mut db_ids_by_worktree_id = HashMap::new();
577            for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
578                let db_id = db_id?;
579                db_ids_by_worktree_id.insert(worktree.id(), db_id);
580                worktree_file_mtimes.insert(
581                    worktree.id(),
582                    this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
583                        .await?,
584                );
585            }
586
587            let (job_count_tx, job_count_rx) = watch::channel_with(0);
588            let job_count_tx = Arc::new(Mutex::new(job_count_tx));
589            this.update(&mut cx, |this, _| {
590                this.projects.insert(
591                    project.downgrade(),
592                    ProjectState {
593                        worktree_db_ids: db_ids_by_worktree_id
594                            .iter()
595                            .map(|(a, b)| (*a, *b))
596                            .collect(),
597                        outstanding_job_count_rx: job_count_rx.clone(),
598                        _outstanding_job_count_tx: job_count_tx.clone(),
599                    },
600                );
601            });
602
603            cx.background()
604                .spawn(async move {
605                    let mut count = 0;
606                    for worktree in worktrees.into_iter() {
607                        let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
608                        for file in worktree.files(false, 0) {
609                            let absolute_path = worktree.absolutize(&file.path);
610
611                            if let Ok(language) = language_registry
612                                .language_for_file(&absolute_path, None)
613                                .await
614                            {
615                                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
616                                    && language
617                                        .grammar()
618                                        .and_then(|grammar| grammar.embedding_config.as_ref())
619                                        .is_none()
620                                {
621                                    continue;
622                                }
623
624                                let path_buf = file.path.to_path_buf();
625                                let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
626                                let already_stored = stored_mtime
627                                    .map_or(false, |existing_mtime| existing_mtime == file.mtime);
628
629                                if !already_stored {
630                                    count += 1;
631                                    *job_count_tx.lock().borrow_mut() += 1;
632                                    let job_handle = JobHandle {
633                                        tx: Arc::downgrade(&job_count_tx),
634                                    };
635                                    parsing_files_tx
636                                        .try_send(PendingFile {
637                                            worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
638                                            relative_path: path_buf,
639                                            absolute_path,
640                                            language,
641                                            job_handle,
642                                            modified_time: file.mtime,
643                                        })
644                                        .unwrap();
645                                }
646                            }
647                        }
648                        for file in file_mtimes.keys() {
649                            db_update_tx
650                                .try_send(DbOperation::Delete {
651                                    worktree_id: db_ids_by_worktree_id[&worktree.id()],
652                                    path: file.to_owned(),
653                                })
654                                .unwrap();
655                        }
656                    }
657
658                    log::trace!(
659                        "walking worktree took {:?} milliseconds",
660                        t0.elapsed().as_millis()
661                    );
662                    anyhow::Ok((count, job_count_rx))
663                })
664                .await
665        })
666    }
667
668    pub fn outstanding_job_count_rx(
669        &self,
670        project: &ModelHandle<Project>,
671    ) -> Option<watch::Receiver<usize>> {
672        Some(
673            self.projects
674                .get(&project.downgrade())?
675                .outstanding_job_count_rx
676                .clone(),
677        )
678    }
679
680    pub fn search_project(
681        &mut self,
682        project: ModelHandle<Project>,
683        phrase: String,
684        limit: usize,
685        include_globs: Vec<GlobMatcher>,
686        exclude_globs: Vec<GlobMatcher>,
687        cx: &mut ModelContext<Self>,
688    ) -> Task<Result<Vec<SearchResult>>> {
689        let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
690            state
691        } else {
692            return Task::ready(Err(anyhow!("project not added")));
693        };
694
695        let worktree_db_ids = project
696            .read(cx)
697            .worktrees(cx)
698            .filter_map(|worktree| {
699                let worktree_id = worktree.read(cx).id();
700                project_state.db_id_for_worktree_id(worktree_id)
701            })
702            .collect::<Vec<_>>();
703
704        let embedding_provider = self.embedding_provider.clone();
705        let database_url = self.database_url.clone();
706        let fs = self.fs.clone();
707        cx.spawn(|this, mut cx| async move {
708            let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
709
710            let phrase_embedding = embedding_provider
711                .embed_batch(vec![&phrase])
712                .await?
713                .into_iter()
714                .next()
715                .unwrap();
716
717            let file_ids = database.retrieve_included_file_ids(
718                &worktree_db_ids,
719                include_globs,
720                exclude_globs,
721            )?;
722
723            let batch_n = cx.background().num_cpus();
724            let ids_len = file_ids.clone().len();
725            let batch_size = if ids_len <= batch_n {
726                ids_len
727            } else {
728                ids_len / batch_n
729            };
730
731            let mut result_tasks = Vec::new();
732            for batch in file_ids.chunks(batch_size) {
733                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
734                let limit = limit.clone();
735                let fs = fs.clone();
736                let database_url = database_url.clone();
737                let phrase_embedding = phrase_embedding.clone();
738                let task = cx.background().spawn(async move {
739                    let database = VectorDatabase::new(fs, database_url).await.log_err();
740                    if database.is_none() {
741                        return Err(anyhow!("failed to acquire database connection"));
742                    } else {
743                        database
744                            .unwrap()
745                            .top_k_search(&phrase_embedding, limit, batch.as_slice())
746                    }
747                });
748                result_tasks.push(task);
749            }
750
751            let batch_results = futures::future::join_all(result_tasks).await;
752
753            let mut results = Vec::new();
754            for batch_result in batch_results {
755                if batch_result.is_ok() {
756                    for (id, similarity) in batch_result.unwrap() {
757                        let ix = match results.binary_search_by(|(_, s)| {
758                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
759                        }) {
760                            Ok(ix) => ix,
761                            Err(ix) => ix,
762                        };
763                        results.insert(ix, (id, similarity));
764                        results.truncate(limit);
765                    }
766                }
767            }
768
769            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
770            let documents = database.get_documents_by_ids(ids.as_slice())?;
771
772            let mut tasks = Vec::new();
773            let mut ranges = Vec::new();
774            let weak_project = project.downgrade();
775            project.update(&mut cx, |project, cx| {
776                for (worktree_db_id, file_path, byte_range) in documents {
777                    let project_state =
778                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
779                            state
780                        } else {
781                            return Err(anyhow!("project not added"));
782                        };
783                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
784                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
785                        ranges.push(byte_range);
786                    }
787                }
788
789                Ok(())
790            })?;
791
792            let buffers = futures::future::join_all(tasks).await;
793
794            Ok(buffers
795                .into_iter()
796                .zip(ranges)
797                .filter_map(|(buffer, range)| {
798                    let buffer = buffer.log_err()?;
799                    let range = buffer.read_with(&cx, |buffer, _| {
800                        buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
801                    });
802                    Some(SearchResult { buffer, range })
803                })
804                .collect::<Vec<_>>())
805        })
806    }
807}
808
809impl Entity for SemanticIndex {
810    type Event = ();
811}
812
813impl Drop for JobHandle {
814    fn drop(&mut self) {
815        if let Some(tx) = self.tx.upgrade() {
816            let mut tx = tx.lock();
817            *tx.borrow_mut() -= 1;
818        }
819    }
820}