semantic_index.rs

   1mod db;
   2mod embedding;
   3mod embedding_queue;
   4mod parsing;
   5pub mod semantic_index_settings;
   6
   7#[cfg(test)]
   8mod semantic_index_tests;
   9
  10use crate::semantic_index_settings::SemanticIndexSettings;
  11use anyhow::{anyhow, Result};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use db::VectorDatabase;
  14use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
  15use embedding_queue::{EmbeddingQueue, FileToEmbed};
  16use futures::{future, FutureExt, StreamExt};
  17use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  18use language::{Anchor, Bias, Buffer, Language, LanguageRegistry};
  19use parking_lot::Mutex;
  20use parsing::{CodeContextRetriever, SpanDigest, PARSEABLE_ENTIRE_FILE_TYPES};
  21use postage::watch;
  22use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Worktree, WorktreeId};
  23use smol::channel;
  24use std::{
  25    cmp::Ordering,
  26    future::Future,
  27    ops::Range,
  28    path::{Path, PathBuf},
  29    sync::{Arc, Weak},
  30    time::{Duration, Instant, SystemTime},
  31};
  32use util::{
  33    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
  34    http::HttpClient,
  35    paths::EMBEDDINGS_DIR,
  36    ResultExt,
  37};
  38use workspace::WorkspaceCreated;
  39
  40const SEMANTIC_INDEX_VERSION: usize = 10;
  41const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60);
  42const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
  43
  44pub fn init(
  45    fs: Arc<dyn Fs>,
  46    http_client: Arc<dyn HttpClient>,
  47    language_registry: Arc<LanguageRegistry>,
  48    cx: &mut AppContext,
  49) {
  50    settings::register::<SemanticIndexSettings>(cx);
  51
  52    let db_file_path = EMBEDDINGS_DIR
  53        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
  54        .join("embeddings_db");
  55
  56    // This needs to be removed at some point before stable.
  57    if *RELEASE_CHANNEL == ReleaseChannel::Stable {
  58        return;
  59    }
  60
  61    cx.subscribe_global::<WorkspaceCreated, _>({
  62        move |event, cx| {
  63            let Some(semantic_index) = SemanticIndex::global(cx) else {
  64                return;
  65            };
  66            let workspace = &event.0;
  67            if let Some(workspace) = workspace.upgrade(cx) {
  68                let project = workspace.read(cx).project().clone();
  69                if project.read(cx).is_local() {
  70                    cx.spawn(|mut cx| async move {
  71                        let previously_indexed = semantic_index
  72                            .update(&mut cx, |index, cx| {
  73                                index.project_previously_indexed(&project, cx)
  74                            })
  75                            .await?;
  76                        if previously_indexed {
  77                            semantic_index
  78                                .update(&mut cx, |index, cx| index.index_project(project, cx))
  79                                .await?;
  80                        }
  81                        anyhow::Ok(())
  82                    })
  83                    .detach_and_log_err(cx);
  84                }
  85            }
  86        }
  87    })
  88    .detach();
  89
  90    cx.spawn(move |mut cx| async move {
  91        let semantic_index = SemanticIndex::new(
  92            fs,
  93            db_file_path,
  94            Arc::new(OpenAIEmbeddings {
  95                client: http_client,
  96                executor: cx.background(),
  97            }),
  98            language_registry,
  99            cx.clone(),
 100        )
 101        .await?;
 102
 103        cx.update(|cx| {
 104            cx.set_global(semantic_index.clone());
 105        });
 106
 107        anyhow::Ok(())
 108    })
 109    .detach();
 110}
 111
 112#[derive(Copy, Clone, Debug)]
 113pub enum SemanticIndexStatus {
 114    NotIndexed,
 115    Indexed,
 116    Indexing { remaining_files: usize },
 117}
 118
 119pub struct SemanticIndex {
 120    fs: Arc<dyn Fs>,
 121    db: VectorDatabase,
 122    embedding_provider: Arc<dyn EmbeddingProvider>,
 123    language_registry: Arc<LanguageRegistry>,
 124    parsing_files_tx: channel::Sender<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>,
 125    _embedding_task: Task<()>,
 126    _parsing_files_tasks: Vec<Task<()>>,
 127    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 128}
 129
 130struct ProjectState {
 131    worktrees: HashMap<WorktreeId, WorktreeState>,
 132    pending_file_count_rx: watch::Receiver<usize>,
 133    pending_file_count_tx: Arc<Mutex<watch::Sender<usize>>>,
 134    pending_index: usize,
 135    _subscription: gpui::Subscription,
 136    _observe_pending_file_count: Task<()>,
 137}
 138
 139enum WorktreeState {
 140    Registering(RegisteringWorktreeState),
 141    Registered(RegisteredWorktreeState),
 142}
 143
 144impl WorktreeState {
 145    fn is_registered(&self) -> bool {
 146        matches!(self, Self::Registered(_))
 147    }
 148
 149    fn paths_changed(
 150        &mut self,
 151        changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
 152        worktree: &Worktree,
 153    ) {
 154        let changed_paths = match self {
 155            Self::Registering(state) => &mut state.changed_paths,
 156            Self::Registered(state) => &mut state.changed_paths,
 157        };
 158
 159        for (path, entry_id, change) in changes.iter() {
 160            let Some(entry) = worktree.entry_for_id(*entry_id) else {
 161                continue;
 162            };
 163            if entry.is_ignored || entry.is_symlink || entry.is_external || entry.is_dir() {
 164                continue;
 165            }
 166            changed_paths.insert(
 167                path.clone(),
 168                ChangedPathInfo {
 169                    mtime: entry.mtime,
 170                    is_deleted: *change == PathChange::Removed,
 171                },
 172            );
 173        }
 174    }
 175}
 176
 177struct RegisteringWorktreeState {
 178    changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
 179    done_rx: watch::Receiver<Option<()>>,
 180    _registration: Task<()>,
 181}
 182
 183impl RegisteringWorktreeState {
 184    fn done(&self) -> impl Future<Output = ()> {
 185        let mut done_rx = self.done_rx.clone();
 186        async move {
 187            while let Some(result) = done_rx.next().await {
 188                if result.is_some() {
 189                    break;
 190                }
 191            }
 192        }
 193    }
 194}
 195
 196struct RegisteredWorktreeState {
 197    db_id: i64,
 198    changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
 199}
 200
 201struct ChangedPathInfo {
 202    mtime: SystemTime,
 203    is_deleted: bool,
 204}
 205
 206#[derive(Clone)]
 207pub struct JobHandle {
 208    /// The outer Arc is here to count the clones of a JobHandle instance;
 209    /// when the last handle to a given job is dropped, we decrement a counter (just once).
 210    tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
 211}
 212
 213impl JobHandle {
 214    fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
 215        *tx.lock().borrow_mut() += 1;
 216        Self {
 217            tx: Arc::new(Arc::downgrade(&tx)),
 218        }
 219    }
 220}
 221
 222impl ProjectState {
 223    fn new(subscription: gpui::Subscription, cx: &mut ModelContext<SemanticIndex>) -> Self {
 224        let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0);
 225        let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx));
 226        Self {
 227            worktrees: Default::default(),
 228            pending_file_count_rx: pending_file_count_rx.clone(),
 229            pending_file_count_tx,
 230            pending_index: 0,
 231            _subscription: subscription,
 232            _observe_pending_file_count: cx.spawn_weak({
 233                let mut pending_file_count_rx = pending_file_count_rx.clone();
 234                |this, mut cx| async move {
 235                    while let Some(_) = pending_file_count_rx.next().await {
 236                        if let Some(this) = this.upgrade(&cx) {
 237                            this.update(&mut cx, |_, cx| cx.notify());
 238                        }
 239                    }
 240                }
 241            }),
 242        }
 243    }
 244
 245    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
 246        self.worktrees
 247            .iter()
 248            .find_map(|(worktree_id, worktree_state)| match worktree_state {
 249                WorktreeState::Registered(state) if state.db_id == id => Some(*worktree_id),
 250                _ => None,
 251            })
 252    }
 253}
 254
 255#[derive(Clone)]
 256pub struct PendingFile {
 257    worktree_db_id: i64,
 258    relative_path: Arc<Path>,
 259    absolute_path: PathBuf,
 260    language: Option<Arc<Language>>,
 261    modified_time: SystemTime,
 262    job_handle: JobHandle,
 263}
 264
 265pub struct SearchResult {
 266    pub buffer: ModelHandle<Buffer>,
 267    pub range: Range<Anchor>,
 268}
 269
 270impl SemanticIndex {
 271    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
 272        if cx.has_global::<ModelHandle<Self>>() {
 273            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
 274        } else {
 275            None
 276        }
 277    }
 278
 279    pub fn enabled(cx: &AppContext) -> bool {
 280        settings::get::<SemanticIndexSettings>(cx).enabled
 281            && *RELEASE_CHANNEL != ReleaseChannel::Stable
 282    }
 283
 284    pub fn status(&self, project: &ModelHandle<Project>) -> SemanticIndexStatus {
 285        if let Some(project_state) = self.projects.get(&project.downgrade()) {
 286            if project_state
 287                .worktrees
 288                .values()
 289                .all(|worktree| worktree.is_registered())
 290                && project_state.pending_index == 0
 291            {
 292                SemanticIndexStatus::Indexed
 293            } else {
 294                SemanticIndexStatus::Indexing {
 295                    remaining_files: project_state.pending_file_count_rx.borrow().clone(),
 296                }
 297            }
 298        } else {
 299            SemanticIndexStatus::NotIndexed
 300        }
 301    }
 302
 303    async fn new(
 304        fs: Arc<dyn Fs>,
 305        database_path: PathBuf,
 306        embedding_provider: Arc<dyn EmbeddingProvider>,
 307        language_registry: Arc<LanguageRegistry>,
 308        mut cx: AsyncAppContext,
 309    ) -> Result<ModelHandle<Self>> {
 310        let t0 = Instant::now();
 311        let database_path = Arc::from(database_path);
 312        let db = VectorDatabase::new(fs.clone(), database_path, cx.background()).await?;
 313
 314        log::trace!(
 315            "db initialization took {:?} milliseconds",
 316            t0.elapsed().as_millis()
 317        );
 318
 319        Ok(cx.add_model(|cx| {
 320            let t0 = Instant::now();
 321            let embedding_queue =
 322                EmbeddingQueue::new(embedding_provider.clone(), cx.background().clone());
 323            let _embedding_task = cx.background().spawn({
 324                let embedded_files = embedding_queue.finished_files();
 325                let db = db.clone();
 326                async move {
 327                    while let Ok(file) = embedded_files.recv().await {
 328                        db.insert_file(file.worktree_id, file.path, file.mtime, file.spans)
 329                            .await
 330                            .log_err();
 331                    }
 332                }
 333            });
 334
 335            // Parse files into embeddable spans.
 336            let (parsing_files_tx, parsing_files_rx) =
 337                channel::unbounded::<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>();
 338            let embedding_queue = Arc::new(Mutex::new(embedding_queue));
 339            let mut _parsing_files_tasks = Vec::new();
 340            for _ in 0..cx.background().num_cpus() {
 341                let fs = fs.clone();
 342                let mut parsing_files_rx = parsing_files_rx.clone();
 343                let embedding_provider = embedding_provider.clone();
 344                let embedding_queue = embedding_queue.clone();
 345                let background = cx.background().clone();
 346                _parsing_files_tasks.push(cx.background().spawn(async move {
 347                    let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
 348                    loop {
 349                        let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
 350                        let mut next_file_to_parse = parsing_files_rx.next().fuse();
 351                        futures::select_biased! {
 352                            next_file_to_parse = next_file_to_parse => {
 353                                if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
 354                                    Self::parse_file(
 355                                        &fs,
 356                                        pending_file,
 357                                        &mut retriever,
 358                                        &embedding_queue,
 359                                        &embeddings_for_digest,
 360                                    )
 361                                    .await
 362                                } else {
 363                                    break;
 364                                }
 365                            },
 366                            _ = timer => {
 367                                embedding_queue.lock().flush();
 368                            }
 369                        }
 370                    }
 371                }));
 372            }
 373
 374            log::trace!(
 375                "semantic index task initialization took {:?} milliseconds",
 376                t0.elapsed().as_millis()
 377            );
 378            Self {
 379                fs,
 380                db,
 381                embedding_provider,
 382                language_registry,
 383                parsing_files_tx,
 384                _embedding_task,
 385                _parsing_files_tasks,
 386                projects: Default::default(),
 387            }
 388        }))
 389    }
 390
 391    async fn parse_file(
 392        fs: &Arc<dyn Fs>,
 393        pending_file: PendingFile,
 394        retriever: &mut CodeContextRetriever,
 395        embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
 396        embeddings_for_digest: &HashMap<SpanDigest, Embedding>,
 397    ) {
 398        let Some(language) = pending_file.language else {
 399            return;
 400        };
 401
 402        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
 403            if let Some(mut spans) = retriever
 404                .parse_file_with_template(&pending_file.relative_path, &content, language)
 405                .log_err()
 406            {
 407                log::trace!(
 408                    "parsed path {:?}: {} spans",
 409                    pending_file.relative_path,
 410                    spans.len()
 411                );
 412
 413                for span in &mut spans {
 414                    if let Some(embedding) = embeddings_for_digest.get(&span.digest) {
 415                        span.embedding = Some(embedding.to_owned());
 416                    }
 417                }
 418
 419                embedding_queue.lock().push(FileToEmbed {
 420                    worktree_id: pending_file.worktree_db_id,
 421                    path: pending_file.relative_path,
 422                    mtime: pending_file.modified_time,
 423                    job_handle: pending_file.job_handle,
 424                    spans: spans,
 425                });
 426            }
 427        }
 428    }
 429
 430    pub fn project_previously_indexed(
 431        &mut self,
 432        project: &ModelHandle<Project>,
 433        cx: &mut ModelContext<Self>,
 434    ) -> Task<Result<bool>> {
 435        let worktrees_indexed_previously = project
 436            .read(cx)
 437            .worktrees(cx)
 438            .map(|worktree| {
 439                self.db
 440                    .worktree_previously_indexed(&worktree.read(cx).abs_path())
 441            })
 442            .collect::<Vec<_>>();
 443        cx.spawn(|_, _cx| async move {
 444            let worktree_indexed_previously =
 445                futures::future::join_all(worktrees_indexed_previously).await;
 446
 447            Ok(worktree_indexed_previously
 448                .iter()
 449                .filter(|worktree| worktree.is_ok())
 450                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
 451        })
 452    }
 453
 454    fn project_entries_changed(
 455        &mut self,
 456        project: ModelHandle<Project>,
 457        worktree_id: WorktreeId,
 458        changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
 459        cx: &mut ModelContext<Self>,
 460    ) {
 461        let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
 462            return;
 463        };
 464        let project = project.downgrade();
 465        let Some(project_state) = self.projects.get_mut(&project) else {
 466            return;
 467        };
 468
 469        let worktree = worktree.read(cx);
 470        let worktree_state =
 471            if let Some(worktree_state) = project_state.worktrees.get_mut(&worktree_id) {
 472                worktree_state
 473            } else {
 474                return;
 475            };
 476        worktree_state.paths_changed(changes, worktree);
 477        if let WorktreeState::Registered(_) = worktree_state {
 478            cx.spawn_weak(|this, mut cx| async move {
 479                cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
 480                if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
 481                    this.update(&mut cx, |this, cx| {
 482                        this.index_project(project, cx).detach_and_log_err(cx)
 483                    });
 484                }
 485            })
 486            .detach();
 487        }
 488    }
 489
 490    fn register_worktree(
 491        &mut self,
 492        project: ModelHandle<Project>,
 493        worktree: ModelHandle<Worktree>,
 494        cx: &mut ModelContext<Self>,
 495    ) {
 496        let project = project.downgrade();
 497        let project_state = if let Some(project_state) = self.projects.get_mut(&project) {
 498            project_state
 499        } else {
 500            return;
 501        };
 502        let worktree = if let Some(worktree) = worktree.read(cx).as_local() {
 503            worktree
 504        } else {
 505            return;
 506        };
 507        let worktree_abs_path = worktree.abs_path().clone();
 508        let scan_complete = worktree.scan_complete();
 509        let worktree_id = worktree.id();
 510        let db = self.db.clone();
 511        let language_registry = self.language_registry.clone();
 512        let (mut done_tx, done_rx) = watch::channel();
 513        let registration = cx.spawn(|this, mut cx| {
 514            async move {
 515                let register = async {
 516                    scan_complete.await;
 517                    let db_id = db.find_or_create_worktree(worktree_abs_path).await?;
 518                    let mut file_mtimes = db.get_file_mtimes(db_id).await?;
 519                    let worktree = if let Some(project) = project.upgrade(&cx) {
 520                        project
 521                            .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx))
 522                            .ok_or_else(|| anyhow!("worktree not found"))?
 523                    } else {
 524                        return anyhow::Ok(());
 525                    };
 526                    let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot());
 527                    let mut changed_paths = cx
 528                        .background()
 529                        .spawn(async move {
 530                            let mut changed_paths = BTreeMap::new();
 531                            for file in worktree.files(false, 0) {
 532                                let absolute_path = worktree.absolutize(&file.path);
 533
 534                                if file.is_external || file.is_ignored || file.is_symlink {
 535                                    continue;
 536                                }
 537
 538                                if let Ok(language) = language_registry
 539                                    .language_for_file(&absolute_path, None)
 540                                    .await
 541                                {
 542                                    // Test if file is valid parseable file
 543                                    if !PARSEABLE_ENTIRE_FILE_TYPES
 544                                        .contains(&language.name().as_ref())
 545                                        && &language.name().as_ref() != &"Markdown"
 546                                        && language
 547                                            .grammar()
 548                                            .and_then(|grammar| grammar.embedding_config.as_ref())
 549                                            .is_none()
 550                                    {
 551                                        continue;
 552                                    }
 553
 554                                    let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
 555                                    let already_stored = stored_mtime
 556                                        .map_or(false, |existing_mtime| {
 557                                            existing_mtime == file.mtime
 558                                        });
 559
 560                                    if !already_stored {
 561                                        changed_paths.insert(
 562                                            file.path.clone(),
 563                                            ChangedPathInfo {
 564                                                mtime: file.mtime,
 565                                                is_deleted: false,
 566                                            },
 567                                        );
 568                                    }
 569                                }
 570                            }
 571
 572                            // Clean up entries from database that are no longer in the worktree.
 573                            for (path, mtime) in file_mtimes {
 574                                changed_paths.insert(
 575                                    path.into(),
 576                                    ChangedPathInfo {
 577                                        mtime,
 578                                        is_deleted: true,
 579                                    },
 580                                );
 581                            }
 582
 583                            anyhow::Ok(changed_paths)
 584                        })
 585                        .await?;
 586                    this.update(&mut cx, |this, cx| {
 587                        let project_state = this
 588                            .projects
 589                            .get_mut(&project)
 590                            .ok_or_else(|| anyhow!("project not registered"))?;
 591                        let project = project
 592                            .upgrade(cx)
 593                            .ok_or_else(|| anyhow!("project was dropped"))?;
 594
 595                        if let Some(WorktreeState::Registering(state)) =
 596                            project_state.worktrees.remove(&worktree_id)
 597                        {
 598                            changed_paths.extend(state.changed_paths);
 599                        }
 600                        project_state.worktrees.insert(
 601                            worktree_id,
 602                            WorktreeState::Registered(RegisteredWorktreeState {
 603                                db_id,
 604                                changed_paths,
 605                            }),
 606                        );
 607                        this.index_project(project, cx).detach_and_log_err(cx);
 608
 609                        anyhow::Ok(())
 610                    })?;
 611
 612                    anyhow::Ok(())
 613                };
 614
 615                if register.await.log_err().is_none() {
 616                    // Stop tracking this worktree if the registration failed.
 617                    this.update(&mut cx, |this, _| {
 618                        this.projects.get_mut(&project).map(|project_state| {
 619                            project_state.worktrees.remove(&worktree_id);
 620                        });
 621                    })
 622                }
 623
 624                *done_tx.borrow_mut() = Some(());
 625            }
 626        });
 627        project_state.worktrees.insert(
 628            worktree_id,
 629            WorktreeState::Registering(RegisteringWorktreeState {
 630                changed_paths: Default::default(),
 631                done_rx,
 632                _registration: registration,
 633            }),
 634        );
 635    }
 636
 637    fn project_worktrees_changed(
 638        &mut self,
 639        project: ModelHandle<Project>,
 640        cx: &mut ModelContext<Self>,
 641    ) {
 642        let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade())
 643        {
 644            project_state
 645        } else {
 646            return;
 647        };
 648
 649        let mut worktrees = project
 650            .read(cx)
 651            .worktrees(cx)
 652            .filter(|worktree| worktree.read(cx).is_local())
 653            .collect::<Vec<_>>();
 654        let worktree_ids = worktrees
 655            .iter()
 656            .map(|worktree| worktree.read(cx).id())
 657            .collect::<HashSet<_>>();
 658
 659        // Remove worktrees that are no longer present
 660        project_state
 661            .worktrees
 662            .retain(|worktree_id, _| worktree_ids.contains(worktree_id));
 663
 664        // Register new worktrees
 665        worktrees.retain(|worktree| {
 666            let worktree_id = worktree.read(cx).id();
 667            !project_state.worktrees.contains_key(&worktree_id)
 668        });
 669        for worktree in worktrees {
 670            self.register_worktree(project.clone(), worktree, cx);
 671        }
 672    }
 673
 674    pub fn pending_file_count(
 675        &self,
 676        project: &ModelHandle<Project>,
 677    ) -> Option<watch::Receiver<usize>> {
 678        Some(
 679            self.projects
 680                .get(&project.downgrade())?
 681                .pending_file_count_rx
 682                .clone(),
 683        )
 684    }
 685
 686    pub fn search_project(
 687        &mut self,
 688        project: ModelHandle<Project>,
 689        phrase: String,
 690        limit: usize,
 691        includes: Vec<PathMatcher>,
 692        excludes: Vec<PathMatcher>,
 693        cx: &mut ModelContext<Self>,
 694    ) -> Task<Result<Vec<SearchResult>>> {
 695        let index = self.index_project(project.clone(), cx);
 696        let embedding_provider = self.embedding_provider.clone();
 697        let db_path = self.db.path().clone();
 698        let fs = self.fs.clone();
 699        cx.spawn(|this, mut cx| async move {
 700            index.await?;
 701
 702            let t0 = Instant::now();
 703            let database =
 704                VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?;
 705
 706            if phrase.len() == 0 {
 707                return Ok(Vec::new());
 708            }
 709
 710            let phrase_embedding = embedding_provider
 711                .embed_batch(vec![phrase])
 712                .await?
 713                .into_iter()
 714                .next()
 715                .unwrap();
 716
 717            log::trace!(
 718                "Embedding search phrase took: {:?} milliseconds",
 719                t0.elapsed().as_millis()
 720            );
 721
 722            let worktree_db_ids = this.read_with(&cx, |this, _| {
 723                let project_state = this
 724                    .projects
 725                    .get(&project.downgrade())
 726                    .ok_or_else(|| anyhow!("project was not indexed"))?;
 727                let worktree_db_ids = project_state
 728                    .worktrees
 729                    .values()
 730                    .filter_map(|worktree| {
 731                        if let WorktreeState::Registered(worktree) = worktree {
 732                            Some(worktree.db_id)
 733                        } else {
 734                            None
 735                        }
 736                    })
 737                    .collect::<Vec<i64>>();
 738                anyhow::Ok(worktree_db_ids)
 739            })?;
 740            let file_ids = database
 741                .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)
 742                .await?;
 743
 744            let batch_n = cx.background().num_cpus();
 745            let ids_len = file_ids.clone().len();
 746            let batch_size = if ids_len <= batch_n {
 747                ids_len
 748            } else {
 749                ids_len / batch_n
 750            };
 751
 752            let mut batch_results = Vec::new();
 753            for batch in file_ids.chunks(batch_size) {
 754                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
 755                let limit = limit.clone();
 756                let fs = fs.clone();
 757                let db_path = db_path.clone();
 758                let phrase_embedding = phrase_embedding.clone();
 759                if let Some(db) = VectorDatabase::new(fs, db_path.clone(), cx.background())
 760                    .await
 761                    .log_err()
 762                {
 763                    batch_results.push(async move {
 764                        db.top_k_search(&phrase_embedding, limit, batch.as_slice())
 765                            .await
 766                    });
 767                }
 768            }
 769            let batch_results = futures::future::join_all(batch_results).await;
 770
 771            let mut results = Vec::new();
 772            for batch_result in batch_results {
 773                if batch_result.is_ok() {
 774                    for (id, similarity) in batch_result.unwrap() {
 775                        let ix = match results.binary_search_by(|(_, s)| {
 776                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
 777                        }) {
 778                            Ok(ix) => ix,
 779                            Err(ix) => ix,
 780                        };
 781                        results.insert(ix, (id, similarity));
 782                        results.truncate(limit);
 783                    }
 784                }
 785            }
 786
 787            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
 788            let spans = database.spans_for_ids(ids.as_slice()).await?;
 789
 790            let mut tasks = Vec::new();
 791            let mut ranges = Vec::new();
 792            let weak_project = project.downgrade();
 793            project.update(&mut cx, |project, cx| {
 794                for (worktree_db_id, file_path, byte_range) in spans {
 795                    let project_state =
 796                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
 797                            state
 798                        } else {
 799                            return Err(anyhow!("project not added"));
 800                        };
 801                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
 802                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
 803                        ranges.push(byte_range);
 804                    }
 805                }
 806
 807                Ok(())
 808            })?;
 809
 810            let buffers = futures::future::join_all(tasks).await;
 811
 812            log::trace!(
 813                "Semantic Searching took: {:?} milliseconds in total",
 814                t0.elapsed().as_millis()
 815            );
 816
 817            Ok(buffers
 818                .into_iter()
 819                .zip(ranges)
 820                .filter_map(|(buffer, range)| {
 821                    let buffer = buffer.log_err()?;
 822                    let range = buffer.read_with(&cx, |buffer, _| {
 823                        let start = buffer.clip_offset(range.start, Bias::Left);
 824                        let end = buffer.clip_offset(range.end, Bias::Right);
 825                        buffer.anchor_before(start)..buffer.anchor_after(end)
 826                    });
 827                    Some(SearchResult { buffer, range })
 828                })
 829                .collect::<Vec<_>>())
 830        })
 831    }
 832
 833    pub fn index_project(
 834        &mut self,
 835        project: ModelHandle<Project>,
 836        cx: &mut ModelContext<Self>,
 837    ) -> Task<Result<()>> {
 838        if !self.projects.contains_key(&project.downgrade()) {
 839            log::trace!("Registering Project for Semantic Index");
 840
 841            let subscription = cx.subscribe(&project, |this, project, event, cx| match event {
 842                project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
 843                    this.project_worktrees_changed(project.clone(), cx);
 844                }
 845                project::Event::WorktreeUpdatedEntries(worktree_id, changes) => {
 846                    this.project_entries_changed(project, *worktree_id, changes.clone(), cx);
 847                }
 848                _ => {}
 849            });
 850            let project_state = ProjectState::new(subscription, cx);
 851            self.projects.insert(project.downgrade(), project_state);
 852            self.project_worktrees_changed(project.clone(), cx);
 853        }
 854        let project_state = self.projects.get_mut(&project.downgrade()).unwrap();
 855        project_state.pending_index += 1;
 856        cx.notify();
 857
 858        let mut pending_file_count_rx = project_state.pending_file_count_rx.clone();
 859        let db = self.db.clone();
 860        let language_registry = self.language_registry.clone();
 861        let parsing_files_tx = self.parsing_files_tx.clone();
 862        let worktree_registration = self.wait_for_worktree_registration(&project, cx);
 863
 864        cx.spawn(|this, mut cx| async move {
 865            worktree_registration.await?;
 866
 867            let mut pending_files = Vec::new();
 868            let mut files_to_delete = Vec::new();
 869            this.update(&mut cx, |this, cx| {
 870                let project_state = this
 871                    .projects
 872                    .get_mut(&project.downgrade())
 873                    .ok_or_else(|| anyhow!("project was dropped"))?;
 874                let pending_file_count_tx = &project_state.pending_file_count_tx;
 875
 876                project_state
 877                    .worktrees
 878                    .retain(|worktree_id, worktree_state| {
 879                        let worktree = if let Some(worktree) =
 880                            project.read(cx).worktree_for_id(*worktree_id, cx)
 881                        {
 882                            worktree
 883                        } else {
 884                            return false;
 885                        };
 886                        let worktree_state =
 887                            if let WorktreeState::Registered(worktree_state) = worktree_state {
 888                                worktree_state
 889                            } else {
 890                                return true;
 891                            };
 892
 893                        worktree_state.changed_paths.retain(|path, info| {
 894                            if info.is_deleted {
 895                                files_to_delete.push((worktree_state.db_id, path.clone()));
 896                            } else {
 897                                let absolute_path = worktree.read(cx).absolutize(path);
 898                                let job_handle = JobHandle::new(pending_file_count_tx);
 899                                pending_files.push(PendingFile {
 900                                    absolute_path,
 901                                    relative_path: path.clone(),
 902                                    language: None,
 903                                    job_handle,
 904                                    modified_time: info.mtime,
 905                                    worktree_db_id: worktree_state.db_id,
 906                                });
 907                            }
 908
 909                            false
 910                        });
 911                        true
 912                    });
 913
 914                anyhow::Ok(())
 915            })?;
 916
 917            cx.background()
 918                .spawn(async move {
 919                    for (worktree_db_id, path) in files_to_delete {
 920                        db.delete_file(worktree_db_id, path).await.log_err();
 921                    }
 922
 923                    let embeddings_for_digest = {
 924                        let mut files = HashMap::default();
 925                        for pending_file in &pending_files {
 926                            files
 927                                .entry(pending_file.worktree_db_id)
 928                                .or_insert(Vec::new())
 929                                .push(pending_file.relative_path.clone());
 930                        }
 931                        Arc::new(
 932                            db.embeddings_for_files(files)
 933                                .await
 934                                .log_err()
 935                                .unwrap_or_default(),
 936                        )
 937                    };
 938
 939                    for mut pending_file in pending_files {
 940                        if let Ok(language) = language_registry
 941                            .language_for_file(&pending_file.relative_path, None)
 942                            .await
 943                        {
 944                            if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
 945                                && &language.name().as_ref() != &"Markdown"
 946                                && language
 947                                    .grammar()
 948                                    .and_then(|grammar| grammar.embedding_config.as_ref())
 949                                    .is_none()
 950                            {
 951                                continue;
 952                            }
 953                            pending_file.language = Some(language);
 954                        }
 955                        parsing_files_tx
 956                            .try_send((embeddings_for_digest.clone(), pending_file))
 957                            .ok();
 958                    }
 959
 960                    // Wait until we're done indexing.
 961                    while let Some(count) = pending_file_count_rx.next().await {
 962                        if count == 0 {
 963                            break;
 964                        }
 965                    }
 966                })
 967                .await;
 968
 969            this.update(&mut cx, |this, cx| {
 970                let project_state = this
 971                    .projects
 972                    .get_mut(&project.downgrade())
 973                    .ok_or_else(|| anyhow!("project was dropped"))?;
 974                project_state.pending_index -= 1;
 975                cx.notify();
 976                anyhow::Ok(())
 977            })?;
 978
 979            Ok(())
 980        })
 981    }
 982
 983    fn wait_for_worktree_registration(
 984        &self,
 985        project: &ModelHandle<Project>,
 986        cx: &mut ModelContext<Self>,
 987    ) -> Task<Result<()>> {
 988        let project = project.downgrade();
 989        cx.spawn_weak(|this, cx| async move {
 990            loop {
 991                let mut pending_worktrees = Vec::new();
 992                this.upgrade(&cx)
 993                    .ok_or_else(|| anyhow!("semantic index dropped"))?
 994                    .read_with(&cx, |this, _| {
 995                        if let Some(project) = this.projects.get(&project) {
 996                            for worktree in project.worktrees.values() {
 997                                if let WorktreeState::Registering(worktree) = worktree {
 998                                    pending_worktrees.push(worktree.done());
 999                                }
1000                            }
1001                        }
1002                    });
1003
1004                if pending_worktrees.is_empty() {
1005                    break;
1006                } else {
1007                    future::join_all(pending_worktrees).await;
1008                }
1009            }
1010            Ok(())
1011        })
1012    }
1013}
1014
1015impl Entity for SemanticIndex {
1016    type Event = ();
1017}
1018
1019impl Drop for JobHandle {
1020    fn drop(&mut self) {
1021        if let Some(inner) = Arc::get_mut(&mut self.tx) {
1022            // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
1023            if let Some(tx) = inner.upgrade() {
1024                let mut tx = tx.lock();
1025                *tx.borrow_mut() -= 1;
1026            }
1027        }
1028    }
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033
1034    use super::*;
1035    #[test]
1036    fn test_job_handle() {
1037        let (job_count_tx, job_count_rx) = watch::channel_with(0);
1038        let tx = Arc::new(Mutex::new(job_count_tx));
1039        let job_handle = JobHandle::new(&tx);
1040
1041        assert_eq!(1, *job_count_rx.borrow());
1042        let new_job_handle = job_handle.clone();
1043        assert_eq!(1, *job_count_rx.borrow());
1044        drop(job_handle);
1045        assert_eq!(1, *job_count_rx.borrow());
1046        drop(new_job_handle);
1047        assert_eq!(0, *job_count_rx.borrow());
1048    }
1049}