semantic_index.rs

  1mod db;
  2mod embedding;
  3mod embedding_queue;
  4mod parsing;
  5pub mod semantic_index_settings;
  6
  7#[cfg(test)]
  8mod semantic_index_tests;
  9
 10use crate::semantic_index_settings::SemanticIndexSettings;
 11use anyhow::{anyhow, Result};
 12use db::VectorDatabase;
 13use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
 14use embedding_queue::{EmbeddingQueue, FileToEmbed};
 15use futures::{FutureExt, StreamExt};
 16use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 17use language::{Anchor, Buffer, Language, LanguageRegistry};
 18use parking_lot::Mutex;
 19use parsing::{CodeContextRetriever, DocumentDigest, PARSEABLE_ENTIRE_FILE_TYPES};
 20use postage::watch;
 21use project::{
 22    search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId,
 23};
 24use smol::channel;
 25use std::{
 26    cmp::Ordering,
 27    collections::{BTreeMap, HashMap},
 28    ops::Range,
 29    path::{Path, PathBuf},
 30    sync::{Arc, Weak},
 31    time::{Duration, Instant, SystemTime},
 32};
 33use util::{
 34    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
 35    http::HttpClient,
 36    paths::EMBEDDINGS_DIR,
 37    ResultExt,
 38};
 39use workspace::WorkspaceCreated;
 40
 41const SEMANTIC_INDEX_VERSION: usize = 9;
 42const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60);
 43const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
 44
 45pub fn init(
 46    fs: Arc<dyn Fs>,
 47    http_client: Arc<dyn HttpClient>,
 48    language_registry: Arc<LanguageRegistry>,
 49    cx: &mut AppContext,
 50) {
 51    settings::register::<SemanticIndexSettings>(cx);
 52
 53    let db_file_path = EMBEDDINGS_DIR
 54        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
 55        .join("embeddings_db");
 56
 57    // This needs to be removed at some point before stable.
 58    if *RELEASE_CHANNEL == ReleaseChannel::Stable {
 59        return;
 60    }
 61
 62    cx.subscribe_global::<WorkspaceCreated, _>({
 63        move |event, cx| {
 64            let Some(semantic_index) = SemanticIndex::global(cx) else {
 65                return;
 66            };
 67            let workspace = &event.0;
 68            if let Some(workspace) = workspace.upgrade(cx) {
 69                let project = workspace.read(cx).project().clone();
 70                if project.read(cx).is_local() {
 71                    semantic_index.update(cx, |index, cx| {
 72                        index.initialize_project(project, cx).detach_and_log_err(cx)
 73                    });
 74                }
 75            }
 76        }
 77    })
 78    .detach();
 79
 80    cx.spawn(move |mut cx| async move {
 81        let semantic_index = SemanticIndex::new(
 82            fs,
 83            db_file_path,
 84            Arc::new(OpenAIEmbeddings {
 85                client: http_client,
 86                executor: cx.background(),
 87            }),
 88            language_registry,
 89            cx.clone(),
 90        )
 91        .await?;
 92
 93        cx.update(|cx| {
 94            cx.set_global(semantic_index.clone());
 95        });
 96
 97        anyhow::Ok(())
 98    })
 99    .detach();
100}
101
102pub struct SemanticIndex {
103    fs: Arc<dyn Fs>,
104    db: VectorDatabase,
105    embedding_provider: Arc<dyn EmbeddingProvider>,
106    language_registry: Arc<LanguageRegistry>,
107    parsing_files_tx: channel::Sender<(Arc<HashMap<DocumentDigest, Embedding>>, PendingFile)>,
108    _embedding_task: Task<()>,
109    _parsing_files_tasks: Vec<Task<()>>,
110    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
111}
112
113struct ProjectState {
114    worktree_db_ids: Vec<(WorktreeId, i64)>,
115    _subscription: gpui::Subscription,
116    outstanding_job_count_rx: watch::Receiver<usize>,
117    outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
118    changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
119}
120
121struct ChangedPathInfo {
122    changed_at: Instant,
123    mtime: SystemTime,
124    is_deleted: bool,
125}
126
127#[derive(Clone)]
128pub struct JobHandle {
129    /// The outer Arc is here to count the clones of a JobHandle instance;
130    /// when the last handle to a given job is dropped, we decrement a counter (just once).
131    tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
132}
133
134impl JobHandle {
135    fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
136        *tx.lock().borrow_mut() += 1;
137        Self {
138            tx: Arc::new(Arc::downgrade(&tx)),
139        }
140    }
141}
142
143impl ProjectState {
144    fn new(
145        subscription: gpui::Subscription,
146        worktree_db_ids: Vec<(WorktreeId, i64)>,
147        changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
148    ) -> Self {
149        let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0);
150        let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx));
151        Self {
152            worktree_db_ids,
153            outstanding_job_count_rx,
154            outstanding_job_count_tx,
155            changed_paths,
156            _subscription: subscription,
157        }
158    }
159
160    pub fn get_outstanding_count(&self) -> usize {
161        self.outstanding_job_count_rx.borrow().clone()
162    }
163
164    fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
165        self.worktree_db_ids
166            .iter()
167            .find_map(|(worktree_id, db_id)| {
168                if *worktree_id == id {
169                    Some(*db_id)
170                } else {
171                    None
172                }
173            })
174    }
175
176    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
177        self.worktree_db_ids
178            .iter()
179            .find_map(|(worktree_id, db_id)| {
180                if *db_id == id {
181                    Some(*worktree_id)
182                } else {
183                    None
184                }
185            })
186    }
187}
188
189#[derive(Clone)]
190pub struct PendingFile {
191    worktree_db_id: i64,
192    relative_path: PathBuf,
193    absolute_path: PathBuf,
194    language: Option<Arc<Language>>,
195    modified_time: SystemTime,
196    job_handle: JobHandle,
197}
198
199pub struct SearchResult {
200    pub buffer: ModelHandle<Buffer>,
201    pub range: Range<Anchor>,
202}
203
204impl SemanticIndex {
205    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
206        if cx.has_global::<ModelHandle<Self>>() {
207            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
208        } else {
209            None
210        }
211    }
212
213    pub fn enabled(cx: &AppContext) -> bool {
214        settings::get::<SemanticIndexSettings>(cx).enabled
215            && *RELEASE_CHANNEL != ReleaseChannel::Stable
216    }
217
218    async fn new(
219        fs: Arc<dyn Fs>,
220        database_path: PathBuf,
221        embedding_provider: Arc<dyn EmbeddingProvider>,
222        language_registry: Arc<LanguageRegistry>,
223        mut cx: AsyncAppContext,
224    ) -> Result<ModelHandle<Self>> {
225        let t0 = Instant::now();
226        let database_path = Arc::from(database_path);
227        let db = VectorDatabase::new(fs.clone(), database_path, cx.background()).await?;
228
229        log::trace!(
230            "db initialization took {:?} milliseconds",
231            t0.elapsed().as_millis()
232        );
233
234        Ok(cx.add_model(|cx| {
235            let t0 = Instant::now();
236            let embedding_queue =
237                EmbeddingQueue::new(embedding_provider.clone(), cx.background().clone());
238            let _embedding_task = cx.background().spawn({
239                let embedded_files = embedding_queue.finished_files();
240                let db = db.clone();
241                async move {
242                    while let Ok(file) = embedded_files.recv().await {
243                        db.insert_file(file.worktree_id, file.path, file.mtime, file.documents)
244                            .await
245                            .log_err();
246                    }
247                }
248            });
249
250            // Parse files into embeddable documents.
251            let (parsing_files_tx, parsing_files_rx) =
252                channel::unbounded::<(Arc<HashMap<DocumentDigest, Embedding>>, PendingFile)>();
253            let embedding_queue = Arc::new(Mutex::new(embedding_queue));
254            let mut _parsing_files_tasks = Vec::new();
255            for _ in 0..cx.background().num_cpus() {
256                let fs = fs.clone();
257                let mut parsing_files_rx = parsing_files_rx.clone();
258                let embedding_provider = embedding_provider.clone();
259                let embedding_queue = embedding_queue.clone();
260                let background = cx.background().clone();
261                _parsing_files_tasks.push(cx.background().spawn(async move {
262                    let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
263                    loop {
264                        let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
265                        let mut next_file_to_parse = parsing_files_rx.next().fuse();
266                        futures::select_biased! {
267                            next_file_to_parse = next_file_to_parse => {
268                                if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
269                                    Self::parse_file(
270                                        &fs,
271                                        pending_file,
272                                        &mut retriever,
273                                        &embedding_queue,
274                                        &embeddings_for_digest,
275                                    )
276                                    .await
277                                } else {
278                                    break;
279                                }
280                            },
281                            _ = timer => {
282                                embedding_queue.lock().flush();
283                            }
284                        }
285                    }
286                }));
287            }
288
289            log::trace!(
290                "semantic index task initialization took {:?} milliseconds",
291                t0.elapsed().as_millis()
292            );
293            Self {
294                fs,
295                db,
296                embedding_provider,
297                language_registry,
298                parsing_files_tx,
299                _embedding_task,
300                _parsing_files_tasks,
301                projects: HashMap::new(),
302            }
303        }))
304    }
305
306    async fn parse_file(
307        fs: &Arc<dyn Fs>,
308        pending_file: PendingFile,
309        retriever: &mut CodeContextRetriever,
310        embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
311        embeddings_for_digest: &HashMap<DocumentDigest, Embedding>,
312    ) {
313        let Some(language) = pending_file.language else {
314            return;
315        };
316
317        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
318            if let Some(mut documents) = retriever
319                .parse_file_with_template(&pending_file.relative_path, &content, language)
320                .log_err()
321            {
322                log::trace!(
323                    "parsed path {:?}: {} documents",
324                    pending_file.relative_path,
325                    documents.len()
326                );
327
328                for document in documents.iter_mut() {
329                    if let Some(embedding) = embeddings_for_digest.get(&document.digest) {
330                        document.embedding = Some(embedding.to_owned());
331                    }
332                }
333
334                embedding_queue.lock().push(FileToEmbed {
335                    worktree_id: pending_file.worktree_db_id,
336                    path: pending_file.relative_path,
337                    mtime: pending_file.modified_time,
338                    job_handle: pending_file.job_handle,
339                    documents,
340                });
341            }
342        }
343    }
344
345    pub fn project_previously_indexed(
346        &mut self,
347        project: ModelHandle<Project>,
348        cx: &mut ModelContext<Self>,
349    ) -> Task<Result<bool>> {
350        let worktrees_indexed_previously = project
351            .read(cx)
352            .worktrees(cx)
353            .map(|worktree| {
354                self.db
355                    .worktree_previously_indexed(&worktree.read(cx).abs_path())
356            })
357            .collect::<Vec<_>>();
358        cx.spawn(|_, _cx| async move {
359            let worktree_indexed_previously =
360                futures::future::join_all(worktrees_indexed_previously).await;
361
362            Ok(worktree_indexed_previously
363                .iter()
364                .filter(|worktree| worktree.is_ok())
365                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
366        })
367    }
368
369    fn project_entries_changed(
370        &mut self,
371        project: ModelHandle<Project>,
372        changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
373        cx: &mut ModelContext<'_, SemanticIndex>,
374        worktree_id: &WorktreeId,
375    ) {
376        let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
377            return;
378        };
379        let project = project.downgrade();
380        let Some(project_state) = self.projects.get_mut(&project) else {
381            return;
382        };
383
384        let embeddings_for_digest = {
385            let mut worktree_id_file_paths = HashMap::new();
386            for (path, _) in &project_state.changed_paths {
387                if let Some(worktree_db_id) = project_state.db_id_for_worktree_id(path.worktree_id)
388                {
389                    worktree_id_file_paths
390                        .entry(worktree_db_id)
391                        .or_insert(Vec::new())
392                        .push(path.path.clone());
393                }
394            }
395            self.db.embeddings_for_files(worktree_id_file_paths)
396        };
397
398        let worktree = worktree.read(cx);
399        let change_time = Instant::now();
400        for (path, entry_id, change) in changes.iter() {
401            let Some(entry) = worktree.entry_for_id(*entry_id) else {
402                continue;
403            };
404            if entry.is_ignored || entry.is_symlink || entry.is_external {
405                continue;
406            }
407            let project_path = ProjectPath {
408                worktree_id: *worktree_id,
409                path: path.clone(),
410            };
411            project_state.changed_paths.insert(
412                project_path,
413                ChangedPathInfo {
414                    changed_at: change_time,
415                    mtime: entry.mtime,
416                    is_deleted: *change == PathChange::Removed,
417                },
418            );
419        }
420
421        cx.spawn_weak(|this, mut cx| async move {
422            let embeddings_for_digest = embeddings_for_digest.await.log_err().unwrap_or_default();
423
424            cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
425            if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
426                Self::reindex_changed_paths(
427                    this,
428                    project,
429                    Some(change_time),
430                    &mut cx,
431                    Arc::new(embeddings_for_digest),
432                )
433                .await;
434            }
435        })
436        .detach();
437    }
438
439    pub fn initialize_project(
440        &mut self,
441        project: ModelHandle<Project>,
442        cx: &mut ModelContext<Self>,
443    ) -> Task<Result<()>> {
444        log::trace!("Initializing Project for Semantic Index");
445        let worktree_scans_complete = project
446            .read(cx)
447            .worktrees(cx)
448            .map(|worktree| {
449                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
450                async move {
451                    scan_complete.await;
452                }
453            })
454            .collect::<Vec<_>>();
455
456        let worktree_db_ids = project
457            .read(cx)
458            .worktrees(cx)
459            .map(|worktree| {
460                self.db
461                    .find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
462            })
463            .collect::<Vec<_>>();
464
465        let _subscription = cx.subscribe(&project, |this, project, event, cx| {
466            if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
467                this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
468            };
469        });
470
471        let language_registry = self.language_registry.clone();
472
473        cx.spawn(|this, mut cx| async move {
474            futures::future::join_all(worktree_scans_complete).await;
475
476            let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
477            let worktrees = project.read_with(&cx, |project, cx| {
478                project
479                    .worktrees(cx)
480                    .map(|worktree| worktree.read(cx).snapshot())
481                    .collect::<Vec<_>>()
482            });
483
484            let mut worktree_file_mtimes = HashMap::new();
485            let mut db_ids_by_worktree_id = HashMap::new();
486
487            for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
488                let db_id = db_id?;
489                db_ids_by_worktree_id.insert(worktree.id(), db_id);
490                worktree_file_mtimes.insert(
491                    worktree.id(),
492                    this.read_with(&cx, |this, _| this.db.get_file_mtimes(db_id))
493                        .await?,
494                );
495            }
496
497            let worktree_db_ids = db_ids_by_worktree_id
498                .iter()
499                .map(|(a, b)| (*a, *b))
500                .collect();
501
502            let changed_paths = cx
503                .background()
504                .spawn(async move {
505                    let mut changed_paths = BTreeMap::new();
506                    let now = Instant::now();
507                    for worktree in worktrees.into_iter() {
508                        let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
509                        for file in worktree.files(false, 0) {
510                            let absolute_path = worktree.absolutize(&file.path);
511
512                            if file.is_external || file.is_ignored || file.is_symlink {
513                                continue;
514                            }
515
516                            if let Ok(language) = language_registry
517                                .language_for_file(&absolute_path, None)
518                                .await
519                            {
520                                // Test if file is valid parseable file
521                                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
522                                    && &language.name().as_ref() != &"Markdown"
523                                    && language
524                                        .grammar()
525                                        .and_then(|grammar| grammar.embedding_config.as_ref())
526                                        .is_none()
527                                {
528                                    continue;
529                                }
530
531                                let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
532                                let already_stored = stored_mtime
533                                    .map_or(false, |existing_mtime| existing_mtime == file.mtime);
534
535                                if !already_stored {
536                                    changed_paths.insert(
537                                        ProjectPath {
538                                            worktree_id: worktree.id(),
539                                            path: file.path.clone(),
540                                        },
541                                        ChangedPathInfo {
542                                            changed_at: now,
543                                            mtime: file.mtime,
544                                            is_deleted: false,
545                                        },
546                                    );
547                                }
548                            }
549                        }
550
551                        // Clean up entries from database that are no longer in the worktree.
552                        for (path, mtime) in file_mtimes {
553                            changed_paths.insert(
554                                ProjectPath {
555                                    worktree_id: worktree.id(),
556                                    path: path.into(),
557                                },
558                                ChangedPathInfo {
559                                    changed_at: now,
560                                    mtime,
561                                    is_deleted: true,
562                                },
563                            );
564                        }
565                    }
566
567                    anyhow::Ok(changed_paths)
568                })
569                .await?;
570
571            this.update(&mut cx, |this, _| {
572                this.projects.insert(
573                    project.downgrade(),
574                    ProjectState::new(_subscription, worktree_db_ids, changed_paths),
575                );
576            });
577            Result::<(), _>::Ok(())
578        })
579    }
580
581    pub fn index_project(
582        &mut self,
583        project: ModelHandle<Project>,
584        cx: &mut ModelContext<Self>,
585    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
586        cx.spawn(|this, mut cx| async move {
587            let embeddings_for_digest = this.read_with(&cx, |this, _| {
588                if let Some(state) = this.projects.get(&project.downgrade()) {
589                    let mut worktree_id_file_paths = HashMap::default();
590                    for (path, _) in &state.changed_paths {
591                        if let Some(worktree_db_id) = state.db_id_for_worktree_id(path.worktree_id)
592                        {
593                            worktree_id_file_paths
594                                .entry(worktree_db_id)
595                                .or_insert(Vec::new())
596                                .push(path.path.clone());
597                        }
598                    }
599
600                    Ok(this.db.embeddings_for_files(worktree_id_file_paths))
601                } else {
602                    Err(anyhow!("Project not yet initialized"))
603                }
604            })?;
605
606            let embeddings_for_digest = Arc::new(embeddings_for_digest.await?);
607
608            Self::reindex_changed_paths(
609                this.clone(),
610                project.clone(),
611                None,
612                &mut cx,
613                embeddings_for_digest,
614            )
615            .await;
616
617            this.update(&mut cx, |this, _cx| {
618                let Some(state) = this.projects.get(&project.downgrade()) else {
619                    return Err(anyhow!("Project not yet initialized"));
620                };
621                let job_count_rx = state.outstanding_job_count_rx.clone();
622                let count = state.get_outstanding_count();
623                Ok((count, job_count_rx))
624            })
625        })
626    }
627
628    pub fn outstanding_job_count_rx(
629        &self,
630        project: &ModelHandle<Project>,
631    ) -> Option<watch::Receiver<usize>> {
632        Some(
633            self.projects
634                .get(&project.downgrade())?
635                .outstanding_job_count_rx
636                .clone(),
637        )
638    }
639
640    pub fn search_project(
641        &mut self,
642        project: ModelHandle<Project>,
643        phrase: String,
644        limit: usize,
645        includes: Vec<PathMatcher>,
646        excludes: Vec<PathMatcher>,
647        cx: &mut ModelContext<Self>,
648    ) -> Task<Result<Vec<SearchResult>>> {
649        let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
650            state
651        } else {
652            return Task::ready(Err(anyhow!("project not added")));
653        };
654
655        let worktree_db_ids = project
656            .read(cx)
657            .worktrees(cx)
658            .filter_map(|worktree| {
659                let worktree_id = worktree.read(cx).id();
660                project_state.db_id_for_worktree_id(worktree_id)
661            })
662            .collect::<Vec<_>>();
663
664        let embedding_provider = self.embedding_provider.clone();
665        let db_path = self.db.path().clone();
666        let fs = self.fs.clone();
667        cx.spawn(|this, mut cx| async move {
668            let t0 = Instant::now();
669            let database =
670                VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?;
671
672            let phrase_embedding = embedding_provider
673                .embed_batch(vec![phrase])
674                .await?
675                .into_iter()
676                .next()
677                .unwrap();
678
679            log::trace!(
680                "Embedding search phrase took: {:?} milliseconds",
681                t0.elapsed().as_millis()
682            );
683
684            let file_ids = database
685                .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)
686                .await?;
687
688            let batch_n = cx.background().num_cpus();
689            let ids_len = file_ids.clone().len();
690            let batch_size = if ids_len <= batch_n {
691                ids_len
692            } else {
693                ids_len / batch_n
694            };
695
696            let mut batch_results = Vec::new();
697            for batch in file_ids.chunks(batch_size) {
698                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
699                let limit = limit.clone();
700                let fs = fs.clone();
701                let db_path = db_path.clone();
702                let phrase_embedding = phrase_embedding.clone();
703                if let Some(db) = VectorDatabase::new(fs, db_path.clone(), cx.background())
704                    .await
705                    .log_err()
706                {
707                    batch_results.push(async move {
708                        db.top_k_search(&phrase_embedding, limit, batch.as_slice())
709                            .await
710                    });
711                }
712            }
713            let batch_results = futures::future::join_all(batch_results).await;
714
715            let mut results = Vec::new();
716            for batch_result in batch_results {
717                if batch_result.is_ok() {
718                    for (id, similarity) in batch_result.unwrap() {
719                        let ix = match results.binary_search_by(|(_, s)| {
720                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
721                        }) {
722                            Ok(ix) => ix,
723                            Err(ix) => ix,
724                        };
725                        results.insert(ix, (id, similarity));
726                        results.truncate(limit);
727                    }
728                }
729            }
730
731            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
732            let documents = database.get_documents_by_ids(ids.as_slice()).await?;
733
734            let mut tasks = Vec::new();
735            let mut ranges = Vec::new();
736            let weak_project = project.downgrade();
737            project.update(&mut cx, |project, cx| {
738                for (worktree_db_id, file_path, byte_range) in documents {
739                    let project_state =
740                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
741                            state
742                        } else {
743                            return Err(anyhow!("project not added"));
744                        };
745                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
746                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
747                        ranges.push(byte_range);
748                    }
749                }
750
751                Ok(())
752            })?;
753
754            let buffers = futures::future::join_all(tasks).await;
755
756            log::trace!(
757                "Semantic Searching took: {:?} milliseconds in total",
758                t0.elapsed().as_millis()
759            );
760
761            Ok(buffers
762                .into_iter()
763                .zip(ranges)
764                .filter_map(|(buffer, range)| {
765                    let buffer = buffer.log_err()?;
766                    let range = buffer.read_with(&cx, |buffer, _| {
767                        buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
768                    });
769                    Some(SearchResult { buffer, range })
770                })
771                .collect::<Vec<_>>())
772        })
773    }
774
775    async fn reindex_changed_paths(
776        this: ModelHandle<SemanticIndex>,
777        project: ModelHandle<Project>,
778        last_changed_before: Option<Instant>,
779        cx: &mut AsyncAppContext,
780        embeddings_for_digest: Arc<HashMap<DocumentDigest, Embedding>>,
781    ) {
782        let mut pending_files = Vec::new();
783        let mut files_to_delete = Vec::new();
784        let (db, language_registry, parsing_files_tx) = this.update(cx, |this, cx| {
785            if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
786                let outstanding_job_count_tx = &project_state.outstanding_job_count_tx;
787                let db_ids = &project_state.worktree_db_ids;
788                let mut worktree: Option<ModelHandle<Worktree>> = None;
789
790                project_state.changed_paths.retain(|path, info| {
791                    if let Some(last_changed_before) = last_changed_before {
792                        if info.changed_at > last_changed_before {
793                            return true;
794                        }
795                    }
796
797                    if worktree
798                        .as_ref()
799                        .map_or(true, |tree| tree.read(cx).id() != path.worktree_id)
800                    {
801                        worktree = project.read(cx).worktree_for_id(path.worktree_id, cx);
802                    }
803                    let Some(worktree) = &worktree else {
804                        return false;
805                    };
806
807                    let Some(worktree_db_id) = db_ids
808                        .iter()
809                        .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1))
810                    else {
811                        return false;
812                    };
813
814                    if info.is_deleted {
815                        files_to_delete.push((worktree_db_id, path.path.to_path_buf()));
816                    } else {
817                        let absolute_path = worktree.read(cx).absolutize(&path.path);
818                        let job_handle = JobHandle::new(&outstanding_job_count_tx);
819                        pending_files.push(PendingFile {
820                            absolute_path,
821                            relative_path: path.path.to_path_buf(),
822                            language: None,
823                            job_handle,
824                            modified_time: info.mtime,
825                            worktree_db_id,
826                        });
827                    }
828
829                    false
830                });
831            }
832
833            (
834                this.db.clone(),
835                this.language_registry.clone(),
836                this.parsing_files_tx.clone(),
837            )
838        });
839
840        for (worktree_db_id, path) in files_to_delete {
841            db.delete_file(worktree_db_id, path).await.log_err();
842        }
843
844        for mut pending_file in pending_files {
845            if let Ok(language) = language_registry
846                .language_for_file(&pending_file.relative_path, None)
847                .await
848            {
849                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
850                    && &language.name().as_ref() != &"Markdown"
851                    && language
852                        .grammar()
853                        .and_then(|grammar| grammar.embedding_config.as_ref())
854                        .is_none()
855                {
856                    continue;
857                }
858                pending_file.language = Some(language);
859            }
860            parsing_files_tx
861                .try_send((embeddings_for_digest.clone(), pending_file))
862                .ok();
863        }
864    }
865}
866
867impl Entity for SemanticIndex {
868    type Event = ();
869}
870
871impl Drop for JobHandle {
872    fn drop(&mut self) {
873        if let Some(inner) = Arc::get_mut(&mut self.tx) {
874            // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
875            if let Some(tx) = inner.upgrade() {
876                let mut tx = tx.lock();
877                *tx.borrow_mut() -= 1;
878            }
879        }
880    }
881}
882
883#[cfg(test)]
884mod tests {
885
886    use super::*;
887    #[test]
888    fn test_job_handle() {
889        let (job_count_tx, job_count_rx) = watch::channel_with(0);
890        let tx = Arc::new(Mutex::new(job_count_tx));
891        let job_handle = JobHandle::new(&tx);
892
893        assert_eq!(1, *job_count_rx.borrow());
894        let new_job_handle = job_handle.clone();
895        assert_eq!(1, *job_count_rx.borrow());
896        drop(job_handle);
897        assert_eq!(1, *job_count_rx.borrow());
898        drop(new_job_handle);
899        assert_eq!(0, *job_count_rx.borrow());
900    }
901}