semantic_index.rs

   1mod db;
   2mod embedding;
   3mod parsing;
   4pub mod semantic_index_settings;
   5
   6#[cfg(test)]
   7mod semantic_index_tests;
   8
   9use crate::semantic_index_settings::SemanticIndexSettings;
  10use anyhow::{anyhow, Result};
  11use db::VectorDatabase;
  12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
  13use futures::{channel::oneshot, Future};
  14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  15use language::{Anchor, Buffer, Language, LanguageRegistry};
  16use parking_lot::Mutex;
  17use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
  18use postage::watch;
  19use project::{
  20    search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId,
  21};
  22use smol::channel;
  23use std::{
  24    cmp::Ordering,
  25    collections::{BTreeMap, HashMap},
  26    mem,
  27    ops::Range,
  28    path::{Path, PathBuf},
  29    sync::{Arc, Weak},
  30    time::{Duration, Instant, SystemTime},
  31};
  32use util::{
  33    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
  34    http::HttpClient,
  35    paths::EMBEDDINGS_DIR,
  36    ResultExt,
  37};
  38use workspace::WorkspaceCreated;
  39
  40const SEMANTIC_INDEX_VERSION: usize = 7;
  41const EMBEDDINGS_BATCH_SIZE: usize = 80;
  42const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(600);
  43
  44pub fn init(
  45    fs: Arc<dyn Fs>,
  46    http_client: Arc<dyn HttpClient>,
  47    language_registry: Arc<LanguageRegistry>,
  48    cx: &mut AppContext,
  49) {
  50    settings::register::<SemanticIndexSettings>(cx);
  51
  52    let db_file_path = EMBEDDINGS_DIR
  53        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
  54        .join("embeddings_db");
  55
  56    // This needs to be removed at some point before stable.
  57    if *RELEASE_CHANNEL == ReleaseChannel::Stable {
  58        return;
  59    }
  60
  61    cx.subscribe_global::<WorkspaceCreated, _>({
  62        move |event, cx| {
  63            let Some(semantic_index) = SemanticIndex::global(cx) else {
  64                return;
  65            };
  66            let workspace = &event.0;
  67            if let Some(workspace) = workspace.upgrade(cx) {
  68                let project = workspace.read(cx).project().clone();
  69                if project.read(cx).is_local() {
  70                    semantic_index.update(cx, |index, cx| {
  71                        index.initialize_project(project, cx).detach_and_log_err(cx)
  72                    });
  73                }
  74            }
  75        }
  76    })
  77    .detach();
  78
  79    cx.spawn(move |mut cx| async move {
  80        let semantic_index = SemanticIndex::new(
  81            fs,
  82            db_file_path,
  83            // Arc::new(embedding::DummyEmbeddings {}),
  84            Arc::new(OpenAIEmbeddings {
  85                client: http_client,
  86                executor: cx.background(),
  87            }),
  88            language_registry,
  89            cx.clone(),
  90        )
  91        .await?;
  92
  93        cx.update(|cx| {
  94            cx.set_global(semantic_index.clone());
  95        });
  96
  97        anyhow::Ok(())
  98    })
  99    .detach();
 100}
 101
 102pub struct SemanticIndex {
 103    fs: Arc<dyn Fs>,
 104    database_url: Arc<PathBuf>,
 105    embedding_provider: Arc<dyn EmbeddingProvider>,
 106    language_registry: Arc<LanguageRegistry>,
 107    db_update_tx: channel::Sender<DbOperation>,
 108    parsing_files_tx: channel::Sender<PendingFile>,
 109    _db_update_task: Task<()>,
 110    _embed_batch_tasks: Vec<Task<()>>,
 111    _batch_files_task: Task<()>,
 112    _parsing_files_tasks: Vec<Task<()>>,
 113    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 114}
 115
 116struct ProjectState {
 117    worktree_db_ids: Vec<(WorktreeId, i64)>,
 118    _subscription: gpui::Subscription,
 119    outstanding_job_count_rx: watch::Receiver<usize>,
 120    outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
 121    changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
 122}
 123
 124struct ChangedPathInfo {
 125    changed_at: Instant,
 126    mtime: SystemTime,
 127    is_deleted: bool,
 128}
 129
 130#[derive(Clone)]
 131struct JobHandle {
 132    /// The outer Arc is here to count the clones of a JobHandle instance;
 133    /// when the last handle to a given job is dropped, we decrement a counter (just once).
 134    tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
 135}
 136
 137impl JobHandle {
 138    fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
 139        *tx.lock().borrow_mut() += 1;
 140        Self {
 141            tx: Arc::new(Arc::downgrade(&tx)),
 142        }
 143    }
 144}
 145
 146impl ProjectState {
 147    fn new(
 148        subscription: gpui::Subscription,
 149        worktree_db_ids: Vec<(WorktreeId, i64)>,
 150        changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
 151    ) -> Self {
 152        let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0);
 153        let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx));
 154        Self {
 155            worktree_db_ids,
 156            outstanding_job_count_rx,
 157            outstanding_job_count_tx,
 158            changed_paths,
 159            _subscription: subscription,
 160        }
 161    }
 162
 163    pub fn get_outstanding_count(&self) -> usize {
 164        self.outstanding_job_count_rx.borrow().clone()
 165    }
 166
 167    fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
 168        self.worktree_db_ids
 169            .iter()
 170            .find_map(|(worktree_id, db_id)| {
 171                if *worktree_id == id {
 172                    Some(*db_id)
 173                } else {
 174                    None
 175                }
 176            })
 177    }
 178
 179    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
 180        self.worktree_db_ids
 181            .iter()
 182            .find_map(|(worktree_id, db_id)| {
 183                if *db_id == id {
 184                    Some(*worktree_id)
 185                } else {
 186                    None
 187                }
 188            })
 189    }
 190}
 191
 192#[derive(Clone)]
 193pub struct PendingFile {
 194    worktree_db_id: i64,
 195    relative_path: PathBuf,
 196    absolute_path: PathBuf,
 197    language: Option<Arc<Language>>,
 198    modified_time: SystemTime,
 199    job_handle: JobHandle,
 200}
 201
 202pub struct SearchResult {
 203    pub buffer: ModelHandle<Buffer>,
 204    pub range: Range<Anchor>,
 205}
 206
 207enum DbOperation {
 208    InsertFile {
 209        worktree_id: i64,
 210        documents: Vec<Document>,
 211        path: PathBuf,
 212        mtime: SystemTime,
 213        job_handle: JobHandle,
 214    },
 215    Delete {
 216        worktree_id: i64,
 217        path: PathBuf,
 218    },
 219    FindOrCreateWorktree {
 220        path: PathBuf,
 221        sender: oneshot::Sender<Result<i64>>,
 222    },
 223    FileMTimes {
 224        worktree_id: i64,
 225        sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
 226    },
 227    WorktreePreviouslyIndexed {
 228        path: Arc<Path>,
 229        sender: oneshot::Sender<Result<bool>>,
 230    },
 231}
 232
 233enum EmbeddingJob {
 234    Enqueue {
 235        worktree_id: i64,
 236        path: PathBuf,
 237        mtime: SystemTime,
 238        documents: Vec<Document>,
 239        job_handle: JobHandle,
 240    },
 241    Flush,
 242}
 243
 244impl SemanticIndex {
 245    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
 246        if cx.has_global::<ModelHandle<Self>>() {
 247            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
 248        } else {
 249            None
 250        }
 251    }
 252
 253    pub fn enabled(cx: &AppContext) -> bool {
 254        settings::get::<SemanticIndexSettings>(cx).enabled
 255            && *RELEASE_CHANNEL != ReleaseChannel::Stable
 256    }
 257
 258    async fn new(
 259        fs: Arc<dyn Fs>,
 260        database_url: PathBuf,
 261        embedding_provider: Arc<dyn EmbeddingProvider>,
 262        language_registry: Arc<LanguageRegistry>,
 263        mut cx: AsyncAppContext,
 264    ) -> Result<ModelHandle<Self>> {
 265        let t0 = Instant::now();
 266        let database_url = Arc::new(database_url);
 267
 268        let db = cx
 269            .background()
 270            .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
 271            .await?;
 272
 273        log::trace!(
 274            "db initialization took {:?} milliseconds",
 275            t0.elapsed().as_millis()
 276        );
 277
 278        Ok(cx.add_model(|cx| {
 279            let t0 = Instant::now();
 280            // Perform database operations
 281            let (db_update_tx, db_update_rx) = channel::unbounded();
 282            let _db_update_task = cx.background().spawn({
 283                async move {
 284                    while let Ok(job) = db_update_rx.recv().await {
 285                        Self::run_db_operation(&db, job)
 286                    }
 287                }
 288            });
 289
 290            // Group documents into batches and send them to the embedding provider.
 291            let (embed_batch_tx, embed_batch_rx) =
 292                channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
 293            let mut _embed_batch_tasks = Vec::new();
 294            for _ in 0..cx.background().num_cpus() {
 295                let embed_batch_rx = embed_batch_rx.clone();
 296                _embed_batch_tasks.push(cx.background().spawn({
 297                    let db_update_tx = db_update_tx.clone();
 298                    let embedding_provider = embedding_provider.clone();
 299                    async move {
 300                        while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
 301                            Self::compute_embeddings_for_batch(
 302                                embeddings_queue,
 303                                &embedding_provider,
 304                                &db_update_tx,
 305                            )
 306                            .await;
 307                        }
 308                    }
 309                }));
 310            }
 311
 312            // Group documents into batches and send them to the embedding provider.
 313            let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
 314            let _batch_files_task = cx.background().spawn(async move {
 315                let mut queue_len = 0;
 316                let mut embeddings_queue = vec![];
 317                while let Ok(job) = batch_files_rx.recv().await {
 318                    Self::enqueue_documents_to_embed(
 319                        job,
 320                        &mut queue_len,
 321                        &mut embeddings_queue,
 322                        &embed_batch_tx,
 323                    );
 324                }
 325            });
 326
 327            // Parse files into embeddable documents.
 328            let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
 329            let mut _parsing_files_tasks = Vec::new();
 330            for _ in 0..cx.background().num_cpus() {
 331                let fs = fs.clone();
 332                let parsing_files_rx = parsing_files_rx.clone();
 333                let batch_files_tx = batch_files_tx.clone();
 334                let db_update_tx = db_update_tx.clone();
 335                let embedding_provider = embedding_provider.clone();
 336                _parsing_files_tasks.push(cx.background().spawn(async move {
 337                    let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
 338                    while let Ok(pending_file) = parsing_files_rx.recv().await {
 339                        Self::parse_file(
 340                            &fs,
 341                            pending_file,
 342                            &mut retriever,
 343                            &batch_files_tx,
 344                            &parsing_files_rx,
 345                            &db_update_tx,
 346                        )
 347                        .await;
 348                    }
 349                }));
 350            }
 351
 352            log::trace!(
 353                "semantic index task initialization took {:?} milliseconds",
 354                t0.elapsed().as_millis()
 355            );
 356            Self {
 357                fs,
 358                database_url,
 359                embedding_provider,
 360                language_registry,
 361                db_update_tx,
 362                parsing_files_tx,
 363                _db_update_task,
 364                _embed_batch_tasks,
 365                _batch_files_task,
 366                _parsing_files_tasks,
 367                projects: HashMap::new(),
 368            }
 369        }))
 370    }
 371
 372    fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
 373        match job {
 374            DbOperation::InsertFile {
 375                worktree_id,
 376                documents,
 377                path,
 378                mtime,
 379                job_handle,
 380            } => {
 381                db.insert_file(worktree_id, path, mtime, documents)
 382                    .log_err();
 383                drop(job_handle)
 384            }
 385            DbOperation::Delete { worktree_id, path } => {
 386                db.delete_file(worktree_id, path).log_err();
 387            }
 388            DbOperation::FindOrCreateWorktree { path, sender } => {
 389                let id = db.find_or_create_worktree(&path);
 390                sender.send(id).ok();
 391            }
 392            DbOperation::FileMTimes {
 393                worktree_id: worktree_db_id,
 394                sender,
 395            } => {
 396                let file_mtimes = db.get_file_mtimes(worktree_db_id);
 397                sender.send(file_mtimes).ok();
 398            }
 399            DbOperation::WorktreePreviouslyIndexed { path, sender } => {
 400                let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
 401                sender.send(worktree_indexed).ok();
 402            }
 403        }
 404    }
 405
 406    async fn compute_embeddings_for_batch(
 407        mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
 408        embedding_provider: &Arc<dyn EmbeddingProvider>,
 409        db_update_tx: &channel::Sender<DbOperation>,
 410    ) {
 411        let mut batch_documents = vec![];
 412        for (_, documents, _, _, _) in embeddings_queue.iter() {
 413            batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
 414        }
 415
 416        if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
 417            log::trace!(
 418                "created {} embeddings for {} files",
 419                embeddings.len(),
 420                embeddings_queue.len(),
 421            );
 422
 423            let mut i = 0;
 424            let mut j = 0;
 425
 426            for embedding in embeddings.iter() {
 427                while embeddings_queue[i].1.len() == j {
 428                    i += 1;
 429                    j = 0;
 430                }
 431
 432                embeddings_queue[i].1[j].embedding = embedding.to_owned();
 433                j += 1;
 434            }
 435
 436            for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
 437                db_update_tx
 438                    .send(DbOperation::InsertFile {
 439                        worktree_id,
 440                        documents,
 441                        path,
 442                        mtime,
 443                        job_handle,
 444                    })
 445                    .await
 446                    .unwrap();
 447            }
 448        } else {
 449            // Insert the file in spite of failure so that future attempts to index it do not take place (unless the file is changed).
 450            for (worktree_id, _, path, mtime, job_handle) in embeddings_queue.into_iter() {
 451                db_update_tx
 452                    .send(DbOperation::InsertFile {
 453                        worktree_id,
 454                        documents: vec![],
 455                        path,
 456                        mtime,
 457                        job_handle,
 458                    })
 459                    .await
 460                    .unwrap();
 461            }
 462        }
 463    }
 464
 465    fn enqueue_documents_to_embed(
 466        job: EmbeddingJob,
 467        queue_len: &mut usize,
 468        embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
 469        embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
 470    ) {
 471        // Handle edge case where individual file has more documents than max batch size
 472        let should_flush = match job {
 473            EmbeddingJob::Enqueue {
 474                documents,
 475                worktree_id,
 476                path,
 477                mtime,
 478                job_handle,
 479            } => {
 480                // If documents is greater than embeddings batch size, recursively batch existing rows.
 481                if &documents.len() > &EMBEDDINGS_BATCH_SIZE {
 482                    let first_job = EmbeddingJob::Enqueue {
 483                        documents: documents[..EMBEDDINGS_BATCH_SIZE].to_vec(),
 484                        worktree_id,
 485                        path: path.clone(),
 486                        mtime,
 487                        job_handle: job_handle.clone(),
 488                    };
 489
 490                    Self::enqueue_documents_to_embed(
 491                        first_job,
 492                        queue_len,
 493                        embeddings_queue,
 494                        embed_batch_tx,
 495                    );
 496
 497                    let second_job = EmbeddingJob::Enqueue {
 498                        documents: documents[EMBEDDINGS_BATCH_SIZE..].to_vec(),
 499                        worktree_id,
 500                        path: path.clone(),
 501                        mtime,
 502                        job_handle: job_handle.clone(),
 503                    };
 504
 505                    Self::enqueue_documents_to_embed(
 506                        second_job,
 507                        queue_len,
 508                        embeddings_queue,
 509                        embed_batch_tx,
 510                    );
 511                    return;
 512                } else {
 513                    *queue_len += &documents.len();
 514                    embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
 515                    *queue_len >= EMBEDDINGS_BATCH_SIZE
 516                }
 517            }
 518            EmbeddingJob::Flush => true,
 519        };
 520
 521        if should_flush {
 522            embed_batch_tx
 523                .try_send(mem::take(embeddings_queue))
 524                .unwrap();
 525            *queue_len = 0;
 526        }
 527    }
 528
 529    async fn parse_file(
 530        fs: &Arc<dyn Fs>,
 531        pending_file: PendingFile,
 532        retriever: &mut CodeContextRetriever,
 533        batch_files_tx: &channel::Sender<EmbeddingJob>,
 534        parsing_files_rx: &channel::Receiver<PendingFile>,
 535        db_update_tx: &channel::Sender<DbOperation>,
 536    ) {
 537        let Some(language) = pending_file.language else {
 538            return;
 539        };
 540
 541        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
 542            if let Some(documents) = retriever
 543                .parse_file_with_template(&pending_file.relative_path, &content, language)
 544                .log_err()
 545            {
 546                log::trace!(
 547                    "parsed path {:?}: {} documents",
 548                    pending_file.relative_path,
 549                    documents.len()
 550                );
 551
 552                if documents.len() == 0 {
 553                    db_update_tx
 554                        .send(DbOperation::InsertFile {
 555                            worktree_id: pending_file.worktree_db_id,
 556                            documents,
 557                            path: pending_file.relative_path,
 558                            mtime: pending_file.modified_time,
 559                            job_handle: pending_file.job_handle,
 560                        })
 561                        .await
 562                        .unwrap();
 563                } else {
 564                    batch_files_tx
 565                        .try_send(EmbeddingJob::Enqueue {
 566                            worktree_id: pending_file.worktree_db_id,
 567                            path: pending_file.relative_path,
 568                            mtime: pending_file.modified_time,
 569                            job_handle: pending_file.job_handle,
 570                            documents,
 571                        })
 572                        .unwrap();
 573                }
 574            }
 575        }
 576
 577        if parsing_files_rx.len() == 0 {
 578            batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
 579        }
 580    }
 581
 582    fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
 583        let (tx, rx) = oneshot::channel();
 584        self.db_update_tx
 585            .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
 586            .unwrap();
 587        async move { rx.await? }
 588    }
 589
 590    fn get_file_mtimes(
 591        &self,
 592        worktree_id: i64,
 593    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
 594        let (tx, rx) = oneshot::channel();
 595        self.db_update_tx
 596            .try_send(DbOperation::FileMTimes {
 597                worktree_id,
 598                sender: tx,
 599            })
 600            .unwrap();
 601        async move { rx.await? }
 602    }
 603
 604    fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
 605        let (tx, rx) = oneshot::channel();
 606        self.db_update_tx
 607            .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
 608            .unwrap();
 609        async move { rx.await? }
 610    }
 611
 612    pub fn project_previously_indexed(
 613        &mut self,
 614        project: ModelHandle<Project>,
 615        cx: &mut ModelContext<Self>,
 616    ) -> Task<Result<bool>> {
 617        let worktrees_indexed_previously = project
 618            .read(cx)
 619            .worktrees(cx)
 620            .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
 621            .collect::<Vec<_>>();
 622        cx.spawn(|_, _cx| async move {
 623            let worktree_indexed_previously =
 624                futures::future::join_all(worktrees_indexed_previously).await;
 625
 626            Ok(worktree_indexed_previously
 627                .iter()
 628                .filter(|worktree| worktree.is_ok())
 629                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
 630        })
 631    }
 632
 633    fn project_entries_changed(
 634        &mut self,
 635        project: ModelHandle<Project>,
 636        changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
 637        cx: &mut ModelContext<'_, SemanticIndex>,
 638        worktree_id: &WorktreeId,
 639    ) {
 640        let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
 641            return;
 642        };
 643        let project = project.downgrade();
 644        let Some(project_state) = self.projects.get_mut(&project) else {
 645            return;
 646        };
 647
 648        let worktree = worktree.read(cx);
 649        let change_time = Instant::now();
 650        for (path, entry_id, change) in changes.iter() {
 651            let Some(entry) = worktree.entry_for_id(*entry_id) else {
 652                continue;
 653            };
 654            if entry.is_ignored || entry.is_symlink || entry.is_external {
 655                continue;
 656            }
 657            let project_path = ProjectPath {
 658                worktree_id: *worktree_id,
 659                path: path.clone(),
 660            };
 661            project_state.changed_paths.insert(
 662                project_path,
 663                ChangedPathInfo {
 664                    changed_at: change_time,
 665                    mtime: entry.mtime,
 666                    is_deleted: *change == PathChange::Removed,
 667                },
 668            );
 669        }
 670
 671        cx.spawn_weak(|this, mut cx| async move {
 672            cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
 673            if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
 674                Self::reindex_changed_paths(this, project, Some(change_time), &mut cx).await;
 675            }
 676        })
 677        .detach();
 678    }
 679
 680    pub fn initialize_project(
 681        &mut self,
 682        project: ModelHandle<Project>,
 683        cx: &mut ModelContext<Self>,
 684    ) -> Task<Result<()>> {
 685        log::trace!("Initializing Project for Semantic Index");
 686        let worktree_scans_complete = project
 687            .read(cx)
 688            .worktrees(cx)
 689            .map(|worktree| {
 690                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
 691                async move {
 692                    scan_complete.await;
 693                }
 694            })
 695            .collect::<Vec<_>>();
 696
 697        let worktree_db_ids = project
 698            .read(cx)
 699            .worktrees(cx)
 700            .map(|worktree| {
 701                self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
 702            })
 703            .collect::<Vec<_>>();
 704
 705        let _subscription = cx.subscribe(&project, |this, project, event, cx| {
 706            if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
 707                this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
 708            };
 709        });
 710
 711        let language_registry = self.language_registry.clone();
 712
 713        cx.spawn(|this, mut cx| async move {
 714            futures::future::join_all(worktree_scans_complete).await;
 715
 716            let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
 717            let worktrees = project.read_with(&cx, |project, cx| {
 718                project
 719                    .worktrees(cx)
 720                    .map(|worktree| worktree.read(cx).snapshot())
 721                    .collect::<Vec<_>>()
 722            });
 723
 724            let mut worktree_file_mtimes = HashMap::new();
 725            let mut db_ids_by_worktree_id = HashMap::new();
 726
 727            for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
 728                let db_id = db_id?;
 729                db_ids_by_worktree_id.insert(worktree.id(), db_id);
 730                worktree_file_mtimes.insert(
 731                    worktree.id(),
 732                    this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
 733                        .await?,
 734                );
 735            }
 736
 737            let worktree_db_ids = db_ids_by_worktree_id
 738                .iter()
 739                .map(|(a, b)| (*a, *b))
 740                .collect();
 741
 742            let changed_paths = cx
 743                .background()
 744                .spawn(async move {
 745                    let mut changed_paths = BTreeMap::new();
 746                    let now = Instant::now();
 747                    for worktree in worktrees.into_iter() {
 748                        let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
 749                        for file in worktree.files(false, 0) {
 750                            let absolute_path = worktree.absolutize(&file.path);
 751
 752                            if file.is_external || file.is_ignored || file.is_symlink {
 753                                continue;
 754                            }
 755
 756                            if let Ok(language) = language_registry
 757                                .language_for_file(&absolute_path, None)
 758                                .await
 759                            {
 760                                // Test if file is valid parseable file
 761                                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
 762                                    && &language.name().as_ref() != &"Markdown"
 763                                    && language
 764                                        .grammar()
 765                                        .and_then(|grammar| grammar.embedding_config.as_ref())
 766                                        .is_none()
 767                                {
 768                                    continue;
 769                                }
 770
 771                                let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
 772                                let already_stored = stored_mtime
 773                                    .map_or(false, |existing_mtime| existing_mtime == file.mtime);
 774
 775                                if !already_stored {
 776                                    changed_paths.insert(
 777                                        ProjectPath {
 778                                            worktree_id: worktree.id(),
 779                                            path: file.path.clone(),
 780                                        },
 781                                        ChangedPathInfo {
 782                                            changed_at: now,
 783                                            mtime: file.mtime,
 784                                            is_deleted: false,
 785                                        },
 786                                    );
 787                                }
 788                            }
 789                        }
 790
 791                        // Clean up entries from database that are no longer in the worktree.
 792                        for (path, mtime) in file_mtimes {
 793                            changed_paths.insert(
 794                                ProjectPath {
 795                                    worktree_id: worktree.id(),
 796                                    path: path.into(),
 797                                },
 798                                ChangedPathInfo {
 799                                    changed_at: now,
 800                                    mtime,
 801                                    is_deleted: true,
 802                                },
 803                            );
 804                        }
 805                    }
 806
 807                    anyhow::Ok(changed_paths)
 808                })
 809                .await?;
 810
 811            this.update(&mut cx, |this, _| {
 812                this.projects.insert(
 813                    project.downgrade(),
 814                    ProjectState::new(_subscription, worktree_db_ids, changed_paths),
 815                );
 816            });
 817            Result::<(), _>::Ok(())
 818        })
 819    }
 820
 821    pub fn index_project(
 822        &mut self,
 823        project: ModelHandle<Project>,
 824        cx: &mut ModelContext<Self>,
 825    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
 826        cx.spawn(|this, mut cx| async move {
 827            Self::reindex_changed_paths(this.clone(), project.clone(), None, &mut cx).await;
 828
 829            this.update(&mut cx, |this, _cx| {
 830                let Some(state) = this.projects.get(&project.downgrade()) else {
 831                    return Err(anyhow!("Project not yet initialized"));
 832                };
 833                let job_count_rx = state.outstanding_job_count_rx.clone();
 834                let count = state.get_outstanding_count();
 835                Ok((count, job_count_rx))
 836            })
 837        })
 838    }
 839
 840    pub fn outstanding_job_count_rx(
 841        &self,
 842        project: &ModelHandle<Project>,
 843    ) -> Option<watch::Receiver<usize>> {
 844        Some(
 845            self.projects
 846                .get(&project.downgrade())?
 847                .outstanding_job_count_rx
 848                .clone(),
 849        )
 850    }
 851
 852    pub fn search_project(
 853        &mut self,
 854        project: ModelHandle<Project>,
 855        phrase: String,
 856        limit: usize,
 857        includes: Vec<PathMatcher>,
 858        excludes: Vec<PathMatcher>,
 859        cx: &mut ModelContext<Self>,
 860    ) -> Task<Result<Vec<SearchResult>>> {
 861        let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
 862            state
 863        } else {
 864            return Task::ready(Err(anyhow!("project not added")));
 865        };
 866
 867        let worktree_db_ids = project
 868            .read(cx)
 869            .worktrees(cx)
 870            .filter_map(|worktree| {
 871                let worktree_id = worktree.read(cx).id();
 872                project_state.db_id_for_worktree_id(worktree_id)
 873            })
 874            .collect::<Vec<_>>();
 875
 876        let embedding_provider = self.embedding_provider.clone();
 877        let database_url = self.database_url.clone();
 878        let fs = self.fs.clone();
 879        cx.spawn(|this, mut cx| async move {
 880            let t0 = Instant::now();
 881            let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
 882
 883            let phrase_embedding = embedding_provider
 884                .embed_batch(vec![&phrase])
 885                .await?
 886                .into_iter()
 887                .next()
 888                .unwrap();
 889
 890            log::trace!(
 891                "Embedding search phrase took: {:?} milliseconds",
 892                t0.elapsed().as_millis()
 893            );
 894
 895            let file_ids =
 896                database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?;
 897
 898            let batch_n = cx.background().num_cpus();
 899            let ids_len = file_ids.clone().len();
 900            let batch_size = if ids_len <= batch_n {
 901                ids_len
 902            } else {
 903                ids_len / batch_n
 904            };
 905
 906            let mut result_tasks = Vec::new();
 907            for batch in file_ids.chunks(batch_size) {
 908                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
 909                let limit = limit.clone();
 910                let fs = fs.clone();
 911                let database_url = database_url.clone();
 912                let phrase_embedding = phrase_embedding.clone();
 913                let task = cx.background().spawn(async move {
 914                    let database = VectorDatabase::new(fs, database_url).await.log_err();
 915                    if database.is_none() {
 916                        return Err(anyhow!("failed to acquire database connection"));
 917                    } else {
 918                        database
 919                            .unwrap()
 920                            .top_k_search(&phrase_embedding, limit, batch.as_slice())
 921                    }
 922                });
 923                result_tasks.push(task);
 924            }
 925
 926            let batch_results = futures::future::join_all(result_tasks).await;
 927
 928            let mut results = Vec::new();
 929            for batch_result in batch_results {
 930                if batch_result.is_ok() {
 931                    for (id, similarity) in batch_result.unwrap() {
 932                        let ix = match results.binary_search_by(|(_, s)| {
 933                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
 934                        }) {
 935                            Ok(ix) => ix,
 936                            Err(ix) => ix,
 937                        };
 938                        results.insert(ix, (id, similarity));
 939                        results.truncate(limit);
 940                    }
 941                }
 942            }
 943
 944            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
 945            let documents = database.get_documents_by_ids(ids.as_slice())?;
 946
 947            let mut tasks = Vec::new();
 948            let mut ranges = Vec::new();
 949            let weak_project = project.downgrade();
 950            project.update(&mut cx, |project, cx| {
 951                for (worktree_db_id, file_path, byte_range) in documents {
 952                    let project_state =
 953                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
 954                            state
 955                        } else {
 956                            return Err(anyhow!("project not added"));
 957                        };
 958                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
 959                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
 960                        ranges.push(byte_range);
 961                    }
 962                }
 963
 964                Ok(())
 965            })?;
 966
 967            let buffers = futures::future::join_all(tasks).await;
 968
 969            log::trace!(
 970                "Semantic Searching took: {:?} milliseconds in total",
 971                t0.elapsed().as_millis()
 972            );
 973
 974            Ok(buffers
 975                .into_iter()
 976                .zip(ranges)
 977                .filter_map(|(buffer, range)| {
 978                    let buffer = buffer.log_err()?;
 979                    let range = buffer.read_with(&cx, |buffer, _| {
 980                        buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
 981                    });
 982                    Some(SearchResult { buffer, range })
 983                })
 984                .collect::<Vec<_>>())
 985        })
 986    }
 987
 988    async fn reindex_changed_paths(
 989        this: ModelHandle<SemanticIndex>,
 990        project: ModelHandle<Project>,
 991        last_changed_before: Option<Instant>,
 992        cx: &mut AsyncAppContext,
 993    ) {
 994        let mut pending_files = Vec::new();
 995        let (language_registry, parsing_files_tx) = this.update(cx, |this, cx| {
 996            if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
 997                let outstanding_job_count_tx = &project_state.outstanding_job_count_tx;
 998                let db_ids = &project_state.worktree_db_ids;
 999                let mut worktree: Option<ModelHandle<Worktree>> = None;
1000
1001                project_state.changed_paths.retain(|path, info| {
1002                    if let Some(last_changed_before) = last_changed_before {
1003                        if info.changed_at > last_changed_before {
1004                            return true;
1005                        }
1006                    }
1007
1008                    if worktree
1009                        .as_ref()
1010                        .map_or(true, |tree| tree.read(cx).id() != path.worktree_id)
1011                    {
1012                        worktree = project.read(cx).worktree_for_id(path.worktree_id, cx);
1013                    }
1014                    let Some(worktree) = &worktree else {
1015                        return false;
1016                    };
1017
1018                    let Some(worktree_db_id) = db_ids
1019                        .iter()
1020                        .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1))
1021                    else {
1022                        return false;
1023                    };
1024
1025                    if info.is_deleted {
1026                        this.db_update_tx
1027                            .try_send(DbOperation::Delete {
1028                                worktree_id: worktree_db_id,
1029                                path: path.path.to_path_buf(),
1030                            })
1031                            .ok();
1032                    } else {
1033                        let absolute_path = worktree.read(cx).absolutize(&path.path);
1034                        let job_handle = JobHandle::new(&outstanding_job_count_tx);
1035                        pending_files.push(PendingFile {
1036                            absolute_path,
1037                            relative_path: path.path.to_path_buf(),
1038                            language: None,
1039                            job_handle,
1040                            modified_time: info.mtime,
1041                            worktree_db_id,
1042                        });
1043                    }
1044
1045                    false
1046                });
1047            }
1048
1049            (
1050                this.language_registry.clone(),
1051                this.parsing_files_tx.clone(),
1052            )
1053        });
1054
1055        for mut pending_file in pending_files {
1056            if let Ok(language) = language_registry
1057                .language_for_file(&pending_file.relative_path, None)
1058                .await
1059            {
1060                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
1061                    && &language.name().as_ref() != &"Markdown"
1062                    && language
1063                        .grammar()
1064                        .and_then(|grammar| grammar.embedding_config.as_ref())
1065                        .is_none()
1066                {
1067                    continue;
1068                }
1069                pending_file.language = Some(language);
1070            }
1071            parsing_files_tx.try_send(pending_file).ok();
1072        }
1073    }
1074}
1075
1076impl Entity for SemanticIndex {
1077    type Event = ();
1078}
1079
1080impl Drop for JobHandle {
1081    fn drop(&mut self) {
1082        if let Some(inner) = Arc::get_mut(&mut self.tx) {
1083            // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
1084            if let Some(tx) = inner.upgrade() {
1085                let mut tx = tx.lock();
1086                *tx.borrow_mut() -= 1;
1087            }
1088        }
1089    }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094
1095    use super::*;
1096    #[test]
1097    fn test_job_handle() {
1098        let (job_count_tx, job_count_rx) = watch::channel_with(0);
1099        let tx = Arc::new(Mutex::new(job_count_tx));
1100        let job_handle = JobHandle::new(&tx);
1101
1102        assert_eq!(1, *job_count_rx.borrow());
1103        let new_job_handle = job_handle.clone();
1104        assert_eq!(1, *job_count_rx.borrow());
1105        drop(job_handle);
1106        assert_eq!(1, *job_count_rx.borrow());
1107        drop(new_job_handle);
1108        assert_eq!(0, *job_count_rx.borrow());
1109    }
1110}