semantic_index.rs

  1mod db;
  2mod embedding;
  3mod parsing;
  4pub mod semantic_index_settings;
  5
  6#[cfg(test)]
  7mod semantic_index_tests;
  8
  9use crate::semantic_index_settings::SemanticIndexSettings;
 10use anyhow::{anyhow, Result};
 11use db::VectorDatabase;
 12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
 13use futures::{channel::oneshot, Future};
 14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 15use language::{Anchor, Buffer, Language, LanguageRegistry};
 16use parking_lot::Mutex;
 17use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
 18use postage::watch;
 19use project::{search::PathMatcher, Fs, Project, WorktreeId};
 20use smol::channel;
 21use std::{
 22    cmp::Ordering,
 23    collections::HashMap,
 24    mem,
 25    ops::Range,
 26    path::{Path, PathBuf},
 27    sync::{Arc, Weak},
 28    time::{Instant, SystemTime},
 29};
 30use util::{
 31    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
 32    http::HttpClient,
 33    paths::EMBEDDINGS_DIR,
 34    ResultExt,
 35};
 36
 37const SEMANTIC_INDEX_VERSION: usize = 6;
 38const EMBEDDINGS_BATCH_SIZE: usize = 80;
 39
 40pub fn init(
 41    fs: Arc<dyn Fs>,
 42    http_client: Arc<dyn HttpClient>,
 43    language_registry: Arc<LanguageRegistry>,
 44    cx: &mut AppContext,
 45) {
 46    settings::register::<SemanticIndexSettings>(cx);
 47
 48    let db_file_path = EMBEDDINGS_DIR
 49        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
 50        .join("embeddings_db");
 51
 52    if *RELEASE_CHANNEL == ReleaseChannel::Stable
 53        || !settings::get::<SemanticIndexSettings>(cx).enabled
 54    {
 55        return;
 56    }
 57
 58    cx.spawn(move |mut cx| async move {
 59        let semantic_index = SemanticIndex::new(
 60            fs,
 61            db_file_path,
 62            Arc::new(OpenAIEmbeddings {
 63                client: http_client,
 64                executor: cx.background(),
 65            }),
 66            language_registry,
 67            cx.clone(),
 68        )
 69        .await?;
 70
 71        cx.update(|cx| {
 72            cx.set_global(semantic_index.clone());
 73        });
 74
 75        anyhow::Ok(())
 76    })
 77    .detach();
 78}
 79
 80pub struct SemanticIndex {
 81    fs: Arc<dyn Fs>,
 82    database_url: Arc<PathBuf>,
 83    embedding_provider: Arc<dyn EmbeddingProvider>,
 84    language_registry: Arc<LanguageRegistry>,
 85    db_update_tx: channel::Sender<DbOperation>,
 86    parsing_files_tx: channel::Sender<PendingFile>,
 87    _db_update_task: Task<()>,
 88    _embed_batch_tasks: Vec<Task<()>>,
 89    _batch_files_task: Task<()>,
 90    _parsing_files_tasks: Vec<Task<()>>,
 91    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 92}
 93
 94struct ProjectState {
 95    worktree_db_ids: Vec<(WorktreeId, i64)>,
 96    outstanding_job_count_rx: watch::Receiver<usize>,
 97    _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
 98}
 99
100struct JobHandle {
101    tx: Weak<Mutex<watch::Sender<usize>>>,
102}
103
104impl ProjectState {
105    fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
106        self.worktree_db_ids
107            .iter()
108            .find_map(|(worktree_id, db_id)| {
109                if *worktree_id == id {
110                    Some(*db_id)
111                } else {
112                    None
113                }
114            })
115    }
116
117    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
118        self.worktree_db_ids
119            .iter()
120            .find_map(|(worktree_id, db_id)| {
121                if *db_id == id {
122                    Some(*worktree_id)
123                } else {
124                    None
125                }
126            })
127    }
128}
129
130pub struct PendingFile {
131    worktree_db_id: i64,
132    relative_path: PathBuf,
133    absolute_path: PathBuf,
134    language: Arc<Language>,
135    modified_time: SystemTime,
136    job_handle: JobHandle,
137}
138
139pub struct SearchResult {
140    pub buffer: ModelHandle<Buffer>,
141    pub range: Range<Anchor>,
142}
143
144enum DbOperation {
145    InsertFile {
146        worktree_id: i64,
147        documents: Vec<Document>,
148        path: PathBuf,
149        mtime: SystemTime,
150        job_handle: JobHandle,
151    },
152    Delete {
153        worktree_id: i64,
154        path: PathBuf,
155    },
156    FindOrCreateWorktree {
157        path: PathBuf,
158        sender: oneshot::Sender<Result<i64>>,
159    },
160    FileMTimes {
161        worktree_id: i64,
162        sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
163    },
164    WorktreePreviouslyIndexed {
165        path: Arc<Path>,
166        sender: oneshot::Sender<Result<bool>>,
167    },
168}
169
170enum EmbeddingJob {
171    Enqueue {
172        worktree_id: i64,
173        path: PathBuf,
174        mtime: SystemTime,
175        documents: Vec<Document>,
176        job_handle: JobHandle,
177    },
178    Flush,
179}
180
181impl SemanticIndex {
182    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
183        if cx.has_global::<ModelHandle<Self>>() {
184            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
185        } else {
186            None
187        }
188    }
189
190    pub fn enabled(cx: &AppContext) -> bool {
191        settings::get::<SemanticIndexSettings>(cx).enabled
192            && *RELEASE_CHANNEL != ReleaseChannel::Stable
193    }
194
195    async fn new(
196        fs: Arc<dyn Fs>,
197        database_url: PathBuf,
198        embedding_provider: Arc<dyn EmbeddingProvider>,
199        language_registry: Arc<LanguageRegistry>,
200        mut cx: AsyncAppContext,
201    ) -> Result<ModelHandle<Self>> {
202        let t0 = Instant::now();
203        let database_url = Arc::new(database_url);
204
205        let db = cx
206            .background()
207            .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
208            .await?;
209
210        log::trace!(
211            "db initialization took {:?} milliseconds",
212            t0.elapsed().as_millis()
213        );
214
215        Ok(cx.add_model(|cx| {
216            let t0 = Instant::now();
217            // Perform database operations
218            let (db_update_tx, db_update_rx) = channel::unbounded();
219            let _db_update_task = cx.background().spawn({
220                async move {
221                    while let Ok(job) = db_update_rx.recv().await {
222                        Self::run_db_operation(&db, job)
223                    }
224                }
225            });
226
227            // Group documents into batches and send them to the embedding provider.
228            let (embed_batch_tx, embed_batch_rx) =
229                channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
230            let mut _embed_batch_tasks = Vec::new();
231            for _ in 0..cx.background().num_cpus() {
232                let embed_batch_rx = embed_batch_rx.clone();
233                _embed_batch_tasks.push(cx.background().spawn({
234                    let db_update_tx = db_update_tx.clone();
235                    let embedding_provider = embedding_provider.clone();
236                    async move {
237                        while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
238                            Self::compute_embeddings_for_batch(
239                                embeddings_queue,
240                                &embedding_provider,
241                                &db_update_tx,
242                            )
243                            .await;
244                        }
245                    }
246                }));
247            }
248
249            // Group documents into batches and send them to the embedding provider.
250            let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
251            let _batch_files_task = cx.background().spawn(async move {
252                let mut queue_len = 0;
253                let mut embeddings_queue = vec![];
254                while let Ok(job) = batch_files_rx.recv().await {
255                    Self::enqueue_documents_to_embed(
256                        job,
257                        &mut queue_len,
258                        &mut embeddings_queue,
259                        &embed_batch_tx,
260                    );
261                }
262            });
263
264            // Parse files into embeddable documents.
265            let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
266            let mut _parsing_files_tasks = Vec::new();
267            for _ in 0..cx.background().num_cpus() {
268                let fs = fs.clone();
269                let parsing_files_rx = parsing_files_rx.clone();
270                let batch_files_tx = batch_files_tx.clone();
271                let db_update_tx = db_update_tx.clone();
272                _parsing_files_tasks.push(cx.background().spawn(async move {
273                    let mut retriever = CodeContextRetriever::new();
274                    while let Ok(pending_file) = parsing_files_rx.recv().await {
275                        Self::parse_file(
276                            &fs,
277                            pending_file,
278                            &mut retriever,
279                            &batch_files_tx,
280                            &parsing_files_rx,
281                            &db_update_tx,
282                        )
283                        .await;
284                    }
285                }));
286            }
287
288            log::trace!(
289                "semantic index task initialization took {:?} milliseconds",
290                t0.elapsed().as_millis()
291            );
292            Self {
293                fs,
294                database_url,
295                embedding_provider,
296                language_registry,
297                db_update_tx,
298                parsing_files_tx,
299                _db_update_task,
300                _embed_batch_tasks,
301                _batch_files_task,
302                _parsing_files_tasks,
303                projects: HashMap::new(),
304            }
305        }))
306    }
307
308    fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
309        match job {
310            DbOperation::InsertFile {
311                worktree_id,
312                documents,
313                path,
314                mtime,
315                job_handle,
316            } => {
317                db.insert_file(worktree_id, path, mtime, documents)
318                    .log_err();
319                drop(job_handle)
320            }
321            DbOperation::Delete { worktree_id, path } => {
322                db.delete_file(worktree_id, path).log_err();
323            }
324            DbOperation::FindOrCreateWorktree { path, sender } => {
325                let id = db.find_or_create_worktree(&path);
326                sender.send(id).ok();
327            }
328            DbOperation::FileMTimes {
329                worktree_id: worktree_db_id,
330                sender,
331            } => {
332                let file_mtimes = db.get_file_mtimes(worktree_db_id);
333                sender.send(file_mtimes).ok();
334            }
335            DbOperation::WorktreePreviouslyIndexed { path, sender } => {
336                let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
337                sender.send(worktree_indexed).ok();
338            }
339        }
340    }
341
342    async fn compute_embeddings_for_batch(
343        mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
344        embedding_provider: &Arc<dyn EmbeddingProvider>,
345        db_update_tx: &channel::Sender<DbOperation>,
346    ) {
347        let mut batch_documents = vec![];
348        for (_, documents, _, _, _) in embeddings_queue.iter() {
349            batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
350        }
351
352        if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
353            log::trace!(
354                "created {} embeddings for {} files",
355                embeddings.len(),
356                embeddings_queue.len(),
357            );
358
359            let mut i = 0;
360            let mut j = 0;
361
362            for embedding in embeddings.iter() {
363                while embeddings_queue[i].1.len() == j {
364                    i += 1;
365                    j = 0;
366                }
367
368                embeddings_queue[i].1[j].embedding = embedding.to_owned();
369                j += 1;
370            }
371
372            for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
373                db_update_tx
374                    .send(DbOperation::InsertFile {
375                        worktree_id,
376                        documents,
377                        path,
378                        mtime,
379                        job_handle,
380                    })
381                    .await
382                    .unwrap();
383            }
384        }
385    }
386
387    fn enqueue_documents_to_embed(
388        job: EmbeddingJob,
389        queue_len: &mut usize,
390        embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
391        embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
392    ) {
393        let should_flush = match job {
394            EmbeddingJob::Enqueue {
395                documents,
396                worktree_id,
397                path,
398                mtime,
399                job_handle,
400            } => {
401                *queue_len += &documents.len();
402                embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
403                *queue_len >= EMBEDDINGS_BATCH_SIZE
404            }
405            EmbeddingJob::Flush => true,
406        };
407
408        if should_flush {
409            embed_batch_tx
410                .try_send(mem::take(embeddings_queue))
411                .unwrap();
412            *queue_len = 0;
413        }
414    }
415
416    async fn parse_file(
417        fs: &Arc<dyn Fs>,
418        pending_file: PendingFile,
419        retriever: &mut CodeContextRetriever,
420        batch_files_tx: &channel::Sender<EmbeddingJob>,
421        parsing_files_rx: &channel::Receiver<PendingFile>,
422        db_update_tx: &channel::Sender<DbOperation>,
423    ) {
424        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
425            if let Some(documents) = retriever
426                .parse_file_with_template(
427                    &pending_file.relative_path,
428                    &content,
429                    pending_file.language,
430                )
431                .log_err()
432            {
433                log::trace!(
434                    "parsed path {:?}: {} documents",
435                    pending_file.relative_path,
436                    documents.len()
437                );
438
439                if documents.len() == 0 {
440                    db_update_tx
441                        .send(DbOperation::InsertFile {
442                            worktree_id: pending_file.worktree_db_id,
443                            documents,
444                            path: pending_file.relative_path,
445                            mtime: pending_file.modified_time,
446                            job_handle: pending_file.job_handle,
447                        })
448                        .await
449                        .unwrap();
450                } else {
451                    batch_files_tx
452                        .try_send(EmbeddingJob::Enqueue {
453                            worktree_id: pending_file.worktree_db_id,
454                            path: pending_file.relative_path,
455                            mtime: pending_file.modified_time,
456                            job_handle: pending_file.job_handle,
457                            documents,
458                        })
459                        .unwrap();
460                }
461            }
462        }
463
464        if parsing_files_rx.len() == 0 {
465            batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
466        }
467    }
468
469    fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
470        let (tx, rx) = oneshot::channel();
471        self.db_update_tx
472            .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
473            .unwrap();
474        async move { rx.await? }
475    }
476
477    fn get_file_mtimes(
478        &self,
479        worktree_id: i64,
480    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
481        let (tx, rx) = oneshot::channel();
482        self.db_update_tx
483            .try_send(DbOperation::FileMTimes {
484                worktree_id,
485                sender: tx,
486            })
487            .unwrap();
488        async move { rx.await? }
489    }
490
491    fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
492        let (tx, rx) = oneshot::channel();
493        self.db_update_tx
494            .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
495            .unwrap();
496        async move { rx.await? }
497    }
498
499    pub fn project_previously_indexed(
500        &mut self,
501        project: ModelHandle<Project>,
502        cx: &mut ModelContext<Self>,
503    ) -> Task<Result<bool>> {
504        let worktree_scans_complete = project
505            .read(cx)
506            .worktrees(cx)
507            .map(|worktree| {
508                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
509                async move {
510                    scan_complete.await;
511                }
512            })
513            .collect::<Vec<_>>();
514
515        let worktrees_indexed_previously = project
516            .read(cx)
517            .worktrees(cx)
518            .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
519            .collect::<Vec<_>>();
520
521        cx.spawn(|_, _cx| async move {
522            futures::future::join_all(worktree_scans_complete).await;
523
524            let worktree_indexed_previously =
525                futures::future::join_all(worktrees_indexed_previously).await;
526
527            Ok(worktree_indexed_previously
528                .iter()
529                .filter(|worktree| worktree.is_ok())
530                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
531        })
532    }
533
534    pub fn index_project(
535        &mut self,
536        project: ModelHandle<Project>,
537        cx: &mut ModelContext<Self>,
538    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
539        let t0 = Instant::now();
540        let worktree_scans_complete = project
541            .read(cx)
542            .worktrees(cx)
543            .map(|worktree| {
544                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
545                async move {
546                    scan_complete.await;
547                }
548            })
549            .collect::<Vec<_>>();
550        let worktree_db_ids = project
551            .read(cx)
552            .worktrees(cx)
553            .map(|worktree| {
554                self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
555            })
556            .collect::<Vec<_>>();
557
558        let language_registry = self.language_registry.clone();
559        let db_update_tx = self.db_update_tx.clone();
560        let parsing_files_tx = self.parsing_files_tx.clone();
561
562        cx.spawn(|this, mut cx| async move {
563            futures::future::join_all(worktree_scans_complete).await;
564
565            let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
566
567            let worktrees = project.read_with(&cx, |project, cx| {
568                project
569                    .worktrees(cx)
570                    .map(|worktree| worktree.read(cx).snapshot())
571                    .collect::<Vec<_>>()
572            });
573
574            let mut worktree_file_mtimes = HashMap::new();
575            let mut db_ids_by_worktree_id = HashMap::new();
576            for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
577                let db_id = db_id?;
578                db_ids_by_worktree_id.insert(worktree.id(), db_id);
579                worktree_file_mtimes.insert(
580                    worktree.id(),
581                    this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
582                        .await?,
583                );
584            }
585
586            let (job_count_tx, job_count_rx) = watch::channel_with(0);
587            let job_count_tx = Arc::new(Mutex::new(job_count_tx));
588            this.update(&mut cx, |this, _| {
589                this.projects.insert(
590                    project.downgrade(),
591                    ProjectState {
592                        worktree_db_ids: db_ids_by_worktree_id
593                            .iter()
594                            .map(|(a, b)| (*a, *b))
595                            .collect(),
596                        outstanding_job_count_rx: job_count_rx.clone(),
597                        _outstanding_job_count_tx: job_count_tx.clone(),
598                    },
599                );
600            });
601
602            cx.background()
603                .spawn(async move {
604                    let mut count = 0;
605                    for worktree in worktrees.into_iter() {
606                        let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
607                        for file in worktree.files(false, 0) {
608                            let absolute_path = worktree.absolutize(&file.path);
609
610                            if let Ok(language) = language_registry
611                                .language_for_file(&absolute_path, None)
612                                .await
613                            {
614                                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
615                                    && &language.name().as_ref() != &"Markdown"
616                                    && language
617                                        .grammar()
618                                        .and_then(|grammar| grammar.embedding_config.as_ref())
619                                        .is_none()
620                                {
621                                    continue;
622                                }
623
624                                let path_buf = file.path.to_path_buf();
625                                let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
626                                let already_stored = stored_mtime
627                                    .map_or(false, |existing_mtime| existing_mtime == file.mtime);
628
629                                if !already_stored {
630                                    count += 1;
631                                    *job_count_tx.lock().borrow_mut() += 1;
632                                    let job_handle = JobHandle {
633                                        tx: Arc::downgrade(&job_count_tx),
634                                    };
635                                    parsing_files_tx
636                                        .try_send(PendingFile {
637                                            worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
638                                            relative_path: path_buf,
639                                            absolute_path,
640                                            language,
641                                            job_handle,
642                                            modified_time: file.mtime,
643                                        })
644                                        .unwrap();
645                                }
646                            }
647                        }
648                        for file in file_mtimes.keys() {
649                            db_update_tx
650                                .try_send(DbOperation::Delete {
651                                    worktree_id: db_ids_by_worktree_id[&worktree.id()],
652                                    path: file.to_owned(),
653                                })
654                                .unwrap();
655                        }
656                    }
657
658                    log::trace!(
659                        "walking worktree took {:?} milliseconds",
660                        t0.elapsed().as_millis()
661                    );
662                    anyhow::Ok((count, job_count_rx))
663                })
664                .await
665        })
666    }
667
668    pub fn outstanding_job_count_rx(
669        &self,
670        project: &ModelHandle<Project>,
671    ) -> Option<watch::Receiver<usize>> {
672        Some(
673            self.projects
674                .get(&project.downgrade())?
675                .outstanding_job_count_rx
676                .clone(),
677        )
678    }
679
680    pub fn search_project(
681        &mut self,
682        project: ModelHandle<Project>,
683        phrase: String,
684        limit: usize,
685        includes: Vec<PathMatcher>,
686        excludes: Vec<PathMatcher>,
687        cx: &mut ModelContext<Self>,
688    ) -> Task<Result<Vec<SearchResult>>> {
689        let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
690            state
691        } else {
692            return Task::ready(Err(anyhow!("project not added")));
693        };
694
695        let worktree_db_ids = project
696            .read(cx)
697            .worktrees(cx)
698            .filter_map(|worktree| {
699                let worktree_id = worktree.read(cx).id();
700                project_state.db_id_for_worktree_id(worktree_id)
701            })
702            .collect::<Vec<_>>();
703
704        let embedding_provider = self.embedding_provider.clone();
705        let database_url = self.database_url.clone();
706        let fs = self.fs.clone();
707        cx.spawn(|this, mut cx| async move {
708            let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
709
710            let phrase_embedding = embedding_provider
711                .embed_batch(vec![&phrase])
712                .await?
713                .into_iter()
714                .next()
715                .unwrap();
716
717            let file_ids =
718                database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?;
719
720            let batch_n = cx.background().num_cpus();
721            let ids_len = file_ids.clone().len();
722            let batch_size = if ids_len <= batch_n {
723                ids_len
724            } else {
725                ids_len / batch_n
726            };
727
728            let mut result_tasks = Vec::new();
729            for batch in file_ids.chunks(batch_size) {
730                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
731                let limit = limit.clone();
732                let fs = fs.clone();
733                let database_url = database_url.clone();
734                let phrase_embedding = phrase_embedding.clone();
735                let task = cx.background().spawn(async move {
736                    let database = VectorDatabase::new(fs, database_url).await.log_err();
737                    if database.is_none() {
738                        return Err(anyhow!("failed to acquire database connection"));
739                    } else {
740                        database
741                            .unwrap()
742                            .top_k_search(&phrase_embedding, limit, batch.as_slice())
743                    }
744                });
745                result_tasks.push(task);
746            }
747
748            let batch_results = futures::future::join_all(result_tasks).await;
749
750            let mut results = Vec::new();
751            for batch_result in batch_results {
752                if batch_result.is_ok() {
753                    for (id, similarity) in batch_result.unwrap() {
754                        let ix = match results.binary_search_by(|(_, s)| {
755                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
756                        }) {
757                            Ok(ix) => ix,
758                            Err(ix) => ix,
759                        };
760                        results.insert(ix, (id, similarity));
761                        results.truncate(limit);
762                    }
763                }
764            }
765
766            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
767            let documents = database.get_documents_by_ids(ids.as_slice())?;
768
769            let mut tasks = Vec::new();
770            let mut ranges = Vec::new();
771            let weak_project = project.downgrade();
772            project.update(&mut cx, |project, cx| {
773                for (worktree_db_id, file_path, byte_range) in documents {
774                    let project_state =
775                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
776                            state
777                        } else {
778                            return Err(anyhow!("project not added"));
779                        };
780                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
781                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
782                        ranges.push(byte_range);
783                    }
784                }
785
786                Ok(())
787            })?;
788
789            let buffers = futures::future::join_all(tasks).await;
790
791            Ok(buffers
792                .into_iter()
793                .zip(ranges)
794                .filter_map(|(buffer, range)| {
795                    let buffer = buffer.log_err()?;
796                    let range = buffer.read_with(&cx, |buffer, _| {
797                        buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
798                    });
799                    Some(SearchResult { buffer, range })
800                })
801                .collect::<Vec<_>>())
802        })
803    }
804}
805
806impl Entity for SemanticIndex {
807    type Event = ();
808}
809
810impl Drop for JobHandle {
811    fn drop(&mut self) {
812        if let Some(tx) = self.tx.upgrade() {
813            let mut tx = tx.lock();
814            *tx.borrow_mut() -= 1;
815        }
816    }
817}