semantic_index.rs

  1mod db;
  2mod embedding;
  3mod parsing;
  4pub mod semantic_index_settings;
  5
  6#[cfg(test)]
  7mod semantic_index_tests;
  8
  9use crate::semantic_index_settings::SemanticIndexSettings;
 10use anyhow::{anyhow, Result};
 11use db::VectorDatabase;
 12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
 13use futures::{channel::oneshot, Future};
 14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 15use language::{Anchor, Buffer, Language, LanguageRegistry};
 16use parking_lot::Mutex;
 17use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
 18use postage::watch;
 19use project::{search::PathMatcher, Fs, Project, WorktreeId};
 20use smol::channel;
 21use std::{
 22    cmp::Ordering,
 23    collections::HashMap,
 24    mem,
 25    ops::Range,
 26    path::{Path, PathBuf},
 27    sync::{Arc, Weak},
 28    time::{Instant, SystemTime},
 29};
 30use util::{
 31    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
 32    http::HttpClient,
 33    paths::EMBEDDINGS_DIR,
 34    ResultExt,
 35};
 36
 37const SEMANTIC_INDEX_VERSION: usize = 6;
 38const EMBEDDINGS_BATCH_SIZE: usize = 80;
 39
 40pub fn init(
 41    fs: Arc<dyn Fs>,
 42    http_client: Arc<dyn HttpClient>,
 43    language_registry: Arc<LanguageRegistry>,
 44    cx: &mut AppContext,
 45) {
 46    settings::register::<SemanticIndexSettings>(cx);
 47
 48    let db_file_path = EMBEDDINGS_DIR
 49        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
 50        .join("embeddings_db");
 51
 52    // This needs to be removed at some point before stable.
 53    if *RELEASE_CHANNEL == ReleaseChannel::Stable {
 54        return;
 55    }
 56
 57    cx.spawn(move |mut cx| async move {
 58        let semantic_index = SemanticIndex::new(
 59            fs,
 60            db_file_path,
 61            Arc::new(OpenAIEmbeddings {
 62                client: http_client,
 63                executor: cx.background(),
 64            }),
 65            language_registry,
 66            cx.clone(),
 67        )
 68        .await?;
 69
 70        cx.update(|cx| {
 71            cx.set_global(semantic_index.clone());
 72        });
 73
 74        anyhow::Ok(())
 75    })
 76    .detach();
 77}
 78
 79pub struct SemanticIndex {
 80    fs: Arc<dyn Fs>,
 81    database_url: Arc<PathBuf>,
 82    embedding_provider: Arc<dyn EmbeddingProvider>,
 83    language_registry: Arc<LanguageRegistry>,
 84    db_update_tx: channel::Sender<DbOperation>,
 85    parsing_files_tx: channel::Sender<PendingFile>,
 86    _db_update_task: Task<()>,
 87    _embed_batch_tasks: Vec<Task<()>>,
 88    _batch_files_task: Task<()>,
 89    _parsing_files_tasks: Vec<Task<()>>,
 90    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 91}
 92
 93struct ProjectState {
 94    worktree_db_ids: Vec<(WorktreeId, i64)>,
 95    outstanding_job_count_rx: watch::Receiver<usize>,
 96    _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
 97}
 98
 99struct JobHandle {
100    tx: Weak<Mutex<watch::Sender<usize>>>,
101}
102
103impl ProjectState {
104    fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
105        self.worktree_db_ids
106            .iter()
107            .find_map(|(worktree_id, db_id)| {
108                if *worktree_id == id {
109                    Some(*db_id)
110                } else {
111                    None
112                }
113            })
114    }
115
116    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
117        self.worktree_db_ids
118            .iter()
119            .find_map(|(worktree_id, db_id)| {
120                if *db_id == id {
121                    Some(*worktree_id)
122                } else {
123                    None
124                }
125            })
126    }
127}
128
129pub struct PendingFile {
130    worktree_db_id: i64,
131    relative_path: PathBuf,
132    absolute_path: PathBuf,
133    language: Arc<Language>,
134    modified_time: SystemTime,
135    job_handle: JobHandle,
136}
137
138pub struct SearchResult {
139    pub buffer: ModelHandle<Buffer>,
140    pub range: Range<Anchor>,
141}
142
143enum DbOperation {
144    InsertFile {
145        worktree_id: i64,
146        documents: Vec<Document>,
147        path: PathBuf,
148        mtime: SystemTime,
149        job_handle: JobHandle,
150    },
151    Delete {
152        worktree_id: i64,
153        path: PathBuf,
154    },
155    FindOrCreateWorktree {
156        path: PathBuf,
157        sender: oneshot::Sender<Result<i64>>,
158    },
159    FileMTimes {
160        worktree_id: i64,
161        sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
162    },
163    WorktreePreviouslyIndexed {
164        path: Arc<Path>,
165        sender: oneshot::Sender<Result<bool>>,
166    },
167}
168
169enum EmbeddingJob {
170    Enqueue {
171        worktree_id: i64,
172        path: PathBuf,
173        mtime: SystemTime,
174        documents: Vec<Document>,
175        job_handle: JobHandle,
176    },
177    Flush,
178}
179
180impl SemanticIndex {
181    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
182        if cx.has_global::<ModelHandle<Self>>() {
183            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
184        } else {
185            None
186        }
187    }
188
189    pub fn enabled(cx: &AppContext) -> bool {
190        settings::get::<SemanticIndexSettings>(cx).enabled
191            && *RELEASE_CHANNEL != ReleaseChannel::Stable
192    }
193
194    async fn new(
195        fs: Arc<dyn Fs>,
196        database_url: PathBuf,
197        embedding_provider: Arc<dyn EmbeddingProvider>,
198        language_registry: Arc<LanguageRegistry>,
199        mut cx: AsyncAppContext,
200    ) -> Result<ModelHandle<Self>> {
201        let t0 = Instant::now();
202        let database_url = Arc::new(database_url);
203
204        let db = cx
205            .background()
206            .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
207            .await?;
208
209        log::trace!(
210            "db initialization took {:?} milliseconds",
211            t0.elapsed().as_millis()
212        );
213
214        Ok(cx.add_model(|cx| {
215            let t0 = Instant::now();
216            // Perform database operations
217            let (db_update_tx, db_update_rx) = channel::unbounded();
218            let _db_update_task = cx.background().spawn({
219                async move {
220                    while let Ok(job) = db_update_rx.recv().await {
221                        Self::run_db_operation(&db, job)
222                    }
223                }
224            });
225
226            // Group documents into batches and send them to the embedding provider.
227            let (embed_batch_tx, embed_batch_rx) =
228                channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
229            let mut _embed_batch_tasks = Vec::new();
230            for _ in 0..cx.background().num_cpus() {
231                let embed_batch_rx = embed_batch_rx.clone();
232                _embed_batch_tasks.push(cx.background().spawn({
233                    let db_update_tx = db_update_tx.clone();
234                    let embedding_provider = embedding_provider.clone();
235                    async move {
236                        while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
237                            Self::compute_embeddings_for_batch(
238                                embeddings_queue,
239                                &embedding_provider,
240                                &db_update_tx,
241                            )
242                            .await;
243                        }
244                    }
245                }));
246            }
247
248            // Group documents into batches and send them to the embedding provider.
249            let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
250            let _batch_files_task = cx.background().spawn(async move {
251                let mut queue_len = 0;
252                let mut embeddings_queue = vec![];
253                while let Ok(job) = batch_files_rx.recv().await {
254                    Self::enqueue_documents_to_embed(
255                        job,
256                        &mut queue_len,
257                        &mut embeddings_queue,
258                        &embed_batch_tx,
259                    );
260                }
261            });
262
263            // Parse files into embeddable documents.
264            let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
265            let mut _parsing_files_tasks = Vec::new();
266            for _ in 0..cx.background().num_cpus() {
267                let fs = fs.clone();
268                let parsing_files_rx = parsing_files_rx.clone();
269                let batch_files_tx = batch_files_tx.clone();
270                let db_update_tx = db_update_tx.clone();
271                _parsing_files_tasks.push(cx.background().spawn(async move {
272                    let mut retriever = CodeContextRetriever::new();
273                    while let Ok(pending_file) = parsing_files_rx.recv().await {
274                        Self::parse_file(
275                            &fs,
276                            pending_file,
277                            &mut retriever,
278                            &batch_files_tx,
279                            &parsing_files_rx,
280                            &db_update_tx,
281                        )
282                        .await;
283                    }
284                }));
285            }
286
287            log::trace!(
288                "semantic index task initialization took {:?} milliseconds",
289                t0.elapsed().as_millis()
290            );
291            Self {
292                fs,
293                database_url,
294                embedding_provider,
295                language_registry,
296                db_update_tx,
297                parsing_files_tx,
298                _db_update_task,
299                _embed_batch_tasks,
300                _batch_files_task,
301                _parsing_files_tasks,
302                projects: HashMap::new(),
303            }
304        }))
305    }
306
307    fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
308        match job {
309            DbOperation::InsertFile {
310                worktree_id,
311                documents,
312                path,
313                mtime,
314                job_handle,
315            } => {
316                db.insert_file(worktree_id, path, mtime, documents)
317                    .log_err();
318                drop(job_handle)
319            }
320            DbOperation::Delete { worktree_id, path } => {
321                db.delete_file(worktree_id, path).log_err();
322            }
323            DbOperation::FindOrCreateWorktree { path, sender } => {
324                let id = db.find_or_create_worktree(&path);
325                sender.send(id).ok();
326            }
327            DbOperation::FileMTimes {
328                worktree_id: worktree_db_id,
329                sender,
330            } => {
331                let file_mtimes = db.get_file_mtimes(worktree_db_id);
332                sender.send(file_mtimes).ok();
333            }
334            DbOperation::WorktreePreviouslyIndexed { path, sender } => {
335                let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
336                sender.send(worktree_indexed).ok();
337            }
338        }
339    }
340
341    async fn compute_embeddings_for_batch(
342        mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
343        embedding_provider: &Arc<dyn EmbeddingProvider>,
344        db_update_tx: &channel::Sender<DbOperation>,
345    ) {
346        let mut batch_documents = vec![];
347        for (_, documents, _, _, _) in embeddings_queue.iter() {
348            batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
349        }
350
351        if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
352            log::trace!(
353                "created {} embeddings for {} files",
354                embeddings.len(),
355                embeddings_queue.len(),
356            );
357
358            let mut i = 0;
359            let mut j = 0;
360
361            for embedding in embeddings.iter() {
362                while embeddings_queue[i].1.len() == j {
363                    i += 1;
364                    j = 0;
365                }
366
367                embeddings_queue[i].1[j].embedding = embedding.to_owned();
368                j += 1;
369            }
370
371            for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
372                db_update_tx
373                    .send(DbOperation::InsertFile {
374                        worktree_id,
375                        documents,
376                        path,
377                        mtime,
378                        job_handle,
379                    })
380                    .await
381                    .unwrap();
382            }
383        }
384    }
385
386    fn enqueue_documents_to_embed(
387        job: EmbeddingJob,
388        queue_len: &mut usize,
389        embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
390        embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
391    ) {
392        let should_flush = match job {
393            EmbeddingJob::Enqueue {
394                documents,
395                worktree_id,
396                path,
397                mtime,
398                job_handle,
399            } => {
400                *queue_len += &documents.len();
401                embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
402                *queue_len >= EMBEDDINGS_BATCH_SIZE
403            }
404            EmbeddingJob::Flush => true,
405        };
406
407        if should_flush {
408            embed_batch_tx
409                .try_send(mem::take(embeddings_queue))
410                .unwrap();
411            *queue_len = 0;
412        }
413    }
414
415    async fn parse_file(
416        fs: &Arc<dyn Fs>,
417        pending_file: PendingFile,
418        retriever: &mut CodeContextRetriever,
419        batch_files_tx: &channel::Sender<EmbeddingJob>,
420        parsing_files_rx: &channel::Receiver<PendingFile>,
421        db_update_tx: &channel::Sender<DbOperation>,
422    ) {
423        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
424            if let Some(documents) = retriever
425                .parse_file_with_template(
426                    &pending_file.relative_path,
427                    &content,
428                    pending_file.language,
429                )
430                .log_err()
431            {
432                log::trace!(
433                    "parsed path {:?}: {} documents",
434                    pending_file.relative_path,
435                    documents.len()
436                );
437
438                if documents.len() == 0 {
439                    db_update_tx
440                        .send(DbOperation::InsertFile {
441                            worktree_id: pending_file.worktree_db_id,
442                            documents,
443                            path: pending_file.relative_path,
444                            mtime: pending_file.modified_time,
445                            job_handle: pending_file.job_handle,
446                        })
447                        .await
448                        .unwrap();
449                } else {
450                    batch_files_tx
451                        .try_send(EmbeddingJob::Enqueue {
452                            worktree_id: pending_file.worktree_db_id,
453                            path: pending_file.relative_path,
454                            mtime: pending_file.modified_time,
455                            job_handle: pending_file.job_handle,
456                            documents,
457                        })
458                        .unwrap();
459                }
460            }
461        }
462
463        if parsing_files_rx.len() == 0 {
464            batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
465        }
466    }
467
468    fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
469        let (tx, rx) = oneshot::channel();
470        self.db_update_tx
471            .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
472            .unwrap();
473        async move { rx.await? }
474    }
475
476    fn get_file_mtimes(
477        &self,
478        worktree_id: i64,
479    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
480        let (tx, rx) = oneshot::channel();
481        self.db_update_tx
482            .try_send(DbOperation::FileMTimes {
483                worktree_id,
484                sender: tx,
485            })
486            .unwrap();
487        async move { rx.await? }
488    }
489
490    fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
491        let (tx, rx) = oneshot::channel();
492        self.db_update_tx
493            .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
494            .unwrap();
495        async move { rx.await? }
496    }
497
498    pub fn project_previously_indexed(
499        &mut self,
500        project: ModelHandle<Project>,
501        cx: &mut ModelContext<Self>,
502    ) -> Task<Result<bool>> {
503        let worktrees_indexed_previously = project
504            .read(cx)
505            .worktrees(cx)
506            .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
507            .collect::<Vec<_>>();
508        cx.spawn(|_, _cx| async move {
509            let worktree_indexed_previously =
510                futures::future::join_all(worktrees_indexed_previously).await;
511
512            Ok(worktree_indexed_previously
513                .iter()
514                .filter(|worktree| worktree.is_ok())
515                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
516        })
517    }
518
519    pub fn index_project(
520        &mut self,
521        project: ModelHandle<Project>,
522        cx: &mut ModelContext<Self>,
523    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
524        let t0 = Instant::now();
525        let worktree_scans_complete = project
526            .read(cx)
527            .worktrees(cx)
528            .map(|worktree| {
529                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
530                async move {
531                    scan_complete.await;
532                }
533            })
534            .collect::<Vec<_>>();
535        let worktree_db_ids = project
536            .read(cx)
537            .worktrees(cx)
538            .map(|worktree| {
539                self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
540            })
541            .collect::<Vec<_>>();
542
543        let language_registry = self.language_registry.clone();
544        let db_update_tx = self.db_update_tx.clone();
545        let parsing_files_tx = self.parsing_files_tx.clone();
546
547        cx.spawn(|this, mut cx| async move {
548            futures::future::join_all(worktree_scans_complete).await;
549
550            let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
551
552            let worktrees = project.read_with(&cx, |project, cx| {
553                project
554                    .worktrees(cx)
555                    .map(|worktree| worktree.read(cx).snapshot())
556                    .collect::<Vec<_>>()
557            });
558
559            let mut worktree_file_mtimes = HashMap::new();
560            let mut db_ids_by_worktree_id = HashMap::new();
561            for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
562                let db_id = db_id?;
563                db_ids_by_worktree_id.insert(worktree.id(), db_id);
564                worktree_file_mtimes.insert(
565                    worktree.id(),
566                    this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
567                        .await?,
568                );
569            }
570
571            let (job_count_tx, job_count_rx) = watch::channel_with(0);
572            let job_count_tx = Arc::new(Mutex::new(job_count_tx));
573            this.update(&mut cx, |this, _| {
574                this.projects.insert(
575                    project.downgrade(),
576                    ProjectState {
577                        worktree_db_ids: db_ids_by_worktree_id
578                            .iter()
579                            .map(|(a, b)| (*a, *b))
580                            .collect(),
581                        outstanding_job_count_rx: job_count_rx.clone(),
582                        _outstanding_job_count_tx: job_count_tx.clone(),
583                    },
584                );
585            });
586
587            cx.background()
588                .spawn(async move {
589                    let mut count = 0;
590                    for worktree in worktrees.into_iter() {
591                        let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
592                        for file in worktree.files(false, 0) {
593                            let absolute_path = worktree.absolutize(&file.path);
594
595                            if let Ok(language) = language_registry
596                                .language_for_file(&absolute_path, None)
597                                .await
598                            {
599                                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
600                                    && &language.name().as_ref() != &"Markdown"
601                                    && language
602                                        .grammar()
603                                        .and_then(|grammar| grammar.embedding_config.as_ref())
604                                        .is_none()
605                                {
606                                    continue;
607                                }
608
609                                let path_buf = file.path.to_path_buf();
610                                let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
611                                let already_stored = stored_mtime
612                                    .map_or(false, |existing_mtime| existing_mtime == file.mtime);
613
614                                if !already_stored {
615                                    count += 1;
616                                    *job_count_tx.lock().borrow_mut() += 1;
617                                    let job_handle = JobHandle {
618                                        tx: Arc::downgrade(&job_count_tx),
619                                    };
620                                    parsing_files_tx
621                                        .try_send(PendingFile {
622                                            worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
623                                            relative_path: path_buf,
624                                            absolute_path,
625                                            language,
626                                            job_handle,
627                                            modified_time: file.mtime,
628                                        })
629                                        .unwrap();
630                                }
631                            }
632                        }
633                        for file in file_mtimes.keys() {
634                            db_update_tx
635                                .try_send(DbOperation::Delete {
636                                    worktree_id: db_ids_by_worktree_id[&worktree.id()],
637                                    path: file.to_owned(),
638                                })
639                                .unwrap();
640                        }
641                    }
642
643                    log::trace!(
644                        "walking worktree took {:?} milliseconds",
645                        t0.elapsed().as_millis()
646                    );
647                    anyhow::Ok((count, job_count_rx))
648                })
649                .await
650        })
651    }
652
653    pub fn outstanding_job_count_rx(
654        &self,
655        project: &ModelHandle<Project>,
656    ) -> Option<watch::Receiver<usize>> {
657        Some(
658            self.projects
659                .get(&project.downgrade())?
660                .outstanding_job_count_rx
661                .clone(),
662        )
663    }
664
665    pub fn search_project(
666        &mut self,
667        project: ModelHandle<Project>,
668        phrase: String,
669        limit: usize,
670        includes: Vec<PathMatcher>,
671        excludes: Vec<PathMatcher>,
672        cx: &mut ModelContext<Self>,
673    ) -> Task<Result<Vec<SearchResult>>> {
674        let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
675            state
676        } else {
677            return Task::ready(Err(anyhow!("project not added")));
678        };
679
680        let worktree_db_ids = project
681            .read(cx)
682            .worktrees(cx)
683            .filter_map(|worktree| {
684                let worktree_id = worktree.read(cx).id();
685                project_state.db_id_for_worktree_id(worktree_id)
686            })
687            .collect::<Vec<_>>();
688
689        let embedding_provider = self.embedding_provider.clone();
690        let database_url = self.database_url.clone();
691        let fs = self.fs.clone();
692        cx.spawn(|this, mut cx| async move {
693            let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
694
695            let phrase_embedding = embedding_provider
696                .embed_batch(vec![&phrase])
697                .await?
698                .into_iter()
699                .next()
700                .unwrap();
701
702            let file_ids =
703                database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?;
704
705            let batch_n = cx.background().num_cpus();
706            let ids_len = file_ids.clone().len();
707            let batch_size = if ids_len <= batch_n {
708                ids_len
709            } else {
710                ids_len / batch_n
711            };
712
713            let mut result_tasks = Vec::new();
714            for batch in file_ids.chunks(batch_size) {
715                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
716                let limit = limit.clone();
717                let fs = fs.clone();
718                let database_url = database_url.clone();
719                let phrase_embedding = phrase_embedding.clone();
720                let task = cx.background().spawn(async move {
721                    let database = VectorDatabase::new(fs, database_url).await.log_err();
722                    if database.is_none() {
723                        return Err(anyhow!("failed to acquire database connection"));
724                    } else {
725                        database
726                            .unwrap()
727                            .top_k_search(&phrase_embedding, limit, batch.as_slice())
728                    }
729                });
730                result_tasks.push(task);
731            }
732
733            let batch_results = futures::future::join_all(result_tasks).await;
734
735            let mut results = Vec::new();
736            for batch_result in batch_results {
737                if batch_result.is_ok() {
738                    for (id, similarity) in batch_result.unwrap() {
739                        let ix = match results.binary_search_by(|(_, s)| {
740                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
741                        }) {
742                            Ok(ix) => ix,
743                            Err(ix) => ix,
744                        };
745                        results.insert(ix, (id, similarity));
746                        results.truncate(limit);
747                    }
748                }
749            }
750
751            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
752            let documents = database.get_documents_by_ids(ids.as_slice())?;
753
754            let mut tasks = Vec::new();
755            let mut ranges = Vec::new();
756            let weak_project = project.downgrade();
757            project.update(&mut cx, |project, cx| {
758                for (worktree_db_id, file_path, byte_range) in documents {
759                    let project_state =
760                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
761                            state
762                        } else {
763                            return Err(anyhow!("project not added"));
764                        };
765                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
766                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
767                        ranges.push(byte_range);
768                    }
769                }
770
771                Ok(())
772            })?;
773
774            let buffers = futures::future::join_all(tasks).await;
775
776            Ok(buffers
777                .into_iter()
778                .zip(ranges)
779                .filter_map(|(buffer, range)| {
780                    let buffer = buffer.log_err()?;
781                    let range = buffer.read_with(&cx, |buffer, _| {
782                        buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
783                    });
784                    Some(SearchResult { buffer, range })
785                })
786                .collect::<Vec<_>>())
787        })
788    }
789}
790
791impl Entity for SemanticIndex {
792    type Event = ();
793}
794
795impl Drop for JobHandle {
796    fn drop(&mut self) {
797        if let Some(tx) = self.tx.upgrade() {
798            let mut tx = tx.lock();
799            *tx.borrow_mut() -= 1;
800        }
801    }
802}