semantic_index.rs

   1mod db;
   2mod embedding;
   3mod embedding_queue;
   4mod parsing;
   5pub mod semantic_index_settings;
   6
   7#[cfg(test)]
   8mod semantic_index_tests;
   9
  10use crate::semantic_index_settings::SemanticIndexSettings;
  11use anyhow::{anyhow, Result};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use db::VectorDatabase;
  14use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
  15use embedding_queue::{EmbeddingQueue, FileToEmbed};
  16use futures::{future, FutureExt, StreamExt};
  17use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  18use language::{Anchor, Bias, Buffer, Language, LanguageRegistry};
  19use parking_lot::Mutex;
  20use parsing::{CodeContextRetriever, SpanDigest, PARSEABLE_ENTIRE_FILE_TYPES};
  21use postage::watch;
  22use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Worktree, WorktreeId};
  23use smol::channel;
  24use std::{
  25    cmp::Ordering,
  26    future::Future,
  27    ops::Range,
  28    path::{Path, PathBuf},
  29    sync::{Arc, Weak},
  30    time::{Duration, Instant, SystemTime},
  31};
  32use util::{
  33    channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
  34    http::HttpClient,
  35    paths::EMBEDDINGS_DIR,
  36    ResultExt,
  37};
  38use workspace::WorkspaceCreated;
  39
  40const SEMANTIC_INDEX_VERSION: usize = 10;
  41const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60);
  42const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
  43
  44pub fn init(
  45    fs: Arc<dyn Fs>,
  46    http_client: Arc<dyn HttpClient>,
  47    language_registry: Arc<LanguageRegistry>,
  48    cx: &mut AppContext,
  49) {
  50    settings::register::<SemanticIndexSettings>(cx);
  51
  52    let db_file_path = EMBEDDINGS_DIR
  53        .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
  54        .join("embeddings_db");
  55
  56    // This needs to be removed at some point before stable.
  57    if *RELEASE_CHANNEL == ReleaseChannel::Stable {
  58        return;
  59    }
  60
  61    cx.subscribe_global::<WorkspaceCreated, _>({
  62        move |event, cx| {
  63            let Some(semantic_index) = SemanticIndex::global(cx) else {
  64                return;
  65            };
  66            let workspace = &event.0;
  67            if let Some(workspace) = workspace.upgrade(cx) {
  68                let project = workspace.read(cx).project().clone();
  69                if project.read(cx).is_local() {
  70                    cx.spawn(|mut cx| async move {
  71                        let previously_indexed = semantic_index
  72                            .update(&mut cx, |index, cx| {
  73                                index.project_previously_indexed(&project, cx)
  74                            })
  75                            .await?;
  76                        if previously_indexed {
  77                            semantic_index
  78                                .update(&mut cx, |index, cx| index.index_project(project, cx))
  79                                .await?;
  80                        }
  81                        anyhow::Ok(())
  82                    })
  83                    .detach_and_log_err(cx);
  84                }
  85            }
  86        }
  87    })
  88    .detach();
  89
  90    cx.spawn(move |mut cx| async move {
  91        let semantic_index = SemanticIndex::new(
  92            fs,
  93            db_file_path,
  94            Arc::new(OpenAIEmbeddings::new(http_client, cx.background())),
  95            language_registry,
  96            cx.clone(),
  97        )
  98        .await?;
  99
 100        cx.update(|cx| {
 101            cx.set_global(semantic_index.clone());
 102        });
 103
 104        anyhow::Ok(())
 105    })
 106    .detach();
 107}
 108
 109#[derive(Copy, Clone, Debug)]
 110pub enum SemanticIndexStatus {
 111    NotIndexed,
 112    Indexed,
 113    Indexing {
 114        remaining_files: usize,
 115        rate_limit_expiry: Option<Instant>,
 116    },
 117}
 118
 119pub struct SemanticIndex {
 120    fs: Arc<dyn Fs>,
 121    db: VectorDatabase,
 122    embedding_provider: Arc<dyn EmbeddingProvider>,
 123    language_registry: Arc<LanguageRegistry>,
 124    parsing_files_tx: channel::Sender<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>,
 125    _embedding_task: Task<()>,
 126    _parsing_files_tasks: Vec<Task<()>>,
 127    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 128}
 129
 130struct ProjectState {
 131    worktrees: HashMap<WorktreeId, WorktreeState>,
 132    pending_file_count_rx: watch::Receiver<usize>,
 133    pending_file_count_tx: Arc<Mutex<watch::Sender<usize>>>,
 134    pending_index: usize,
 135    _subscription: gpui::Subscription,
 136    _observe_pending_file_count: Task<()>,
 137}
 138
 139enum WorktreeState {
 140    Registering(RegisteringWorktreeState),
 141    Registered(RegisteredWorktreeState),
 142}
 143
 144impl WorktreeState {
 145    fn is_registered(&self) -> bool {
 146        matches!(self, Self::Registered(_))
 147    }
 148
 149    fn paths_changed(
 150        &mut self,
 151        changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
 152        worktree: &Worktree,
 153    ) {
 154        let changed_paths = match self {
 155            Self::Registering(state) => &mut state.changed_paths,
 156            Self::Registered(state) => &mut state.changed_paths,
 157        };
 158
 159        for (path, entry_id, change) in changes.iter() {
 160            let Some(entry) = worktree.entry_for_id(*entry_id) else {
 161                continue;
 162            };
 163            if entry.is_ignored || entry.is_symlink || entry.is_external || entry.is_dir() {
 164                continue;
 165            }
 166            changed_paths.insert(
 167                path.clone(),
 168                ChangedPathInfo {
 169                    mtime: entry.mtime,
 170                    is_deleted: *change == PathChange::Removed,
 171                },
 172            );
 173        }
 174    }
 175}
 176
 177struct RegisteringWorktreeState {
 178    changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
 179    done_rx: watch::Receiver<Option<()>>,
 180    _registration: Task<()>,
 181}
 182
 183impl RegisteringWorktreeState {
 184    fn done(&self) -> impl Future<Output = ()> {
 185        let mut done_rx = self.done_rx.clone();
 186        async move {
 187            while let Some(result) = done_rx.next().await {
 188                if result.is_some() {
 189                    break;
 190                }
 191            }
 192        }
 193    }
 194}
 195
 196struct RegisteredWorktreeState {
 197    db_id: i64,
 198    changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
 199}
 200
 201struct ChangedPathInfo {
 202    mtime: SystemTime,
 203    is_deleted: bool,
 204}
 205
 206#[derive(Clone)]
 207pub struct JobHandle {
 208    /// The outer Arc is here to count the clones of a JobHandle instance;
 209    /// when the last handle to a given job is dropped, we decrement a counter (just once).
 210    tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
 211}
 212
 213impl JobHandle {
 214    fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
 215        *tx.lock().borrow_mut() += 1;
 216        Self {
 217            tx: Arc::new(Arc::downgrade(&tx)),
 218        }
 219    }
 220}
 221
 222impl ProjectState {
 223    fn new(subscription: gpui::Subscription, cx: &mut ModelContext<SemanticIndex>) -> Self {
 224        let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0);
 225        let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx));
 226        Self {
 227            worktrees: Default::default(),
 228            pending_file_count_rx: pending_file_count_rx.clone(),
 229            pending_file_count_tx,
 230            pending_index: 0,
 231            _subscription: subscription,
 232            _observe_pending_file_count: cx.spawn_weak({
 233                let mut pending_file_count_rx = pending_file_count_rx.clone();
 234                |this, mut cx| async move {
 235                    while let Some(_) = pending_file_count_rx.next().await {
 236                        if let Some(this) = this.upgrade(&cx) {
 237                            this.update(&mut cx, |_, cx| cx.notify());
 238                        }
 239                    }
 240                }
 241            }),
 242        }
 243    }
 244
 245    fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
 246        self.worktrees
 247            .iter()
 248            .find_map(|(worktree_id, worktree_state)| match worktree_state {
 249                WorktreeState::Registered(state) if state.db_id == id => Some(*worktree_id),
 250                _ => None,
 251            })
 252    }
 253}
 254
 255#[derive(Clone)]
 256pub struct PendingFile {
 257    worktree_db_id: i64,
 258    relative_path: Arc<Path>,
 259    absolute_path: PathBuf,
 260    language: Option<Arc<Language>>,
 261    modified_time: SystemTime,
 262    job_handle: JobHandle,
 263}
 264
 265pub struct SearchResult {
 266    pub buffer: ModelHandle<Buffer>,
 267    pub range: Range<Anchor>,
 268}
 269
 270impl SemanticIndex {
 271    pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
 272        if cx.has_global::<ModelHandle<Self>>() {
 273            Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
 274        } else {
 275            None
 276        }
 277    }
 278
 279    pub fn enabled(cx: &AppContext) -> bool {
 280        settings::get::<SemanticIndexSettings>(cx).enabled
 281            && *RELEASE_CHANNEL != ReleaseChannel::Stable
 282    }
 283
 284    pub fn status(&self, project: &ModelHandle<Project>) -> SemanticIndexStatus {
 285        if let Some(project_state) = self.projects.get(&project.downgrade()) {
 286            if project_state
 287                .worktrees
 288                .values()
 289                .all(|worktree| worktree.is_registered())
 290                && project_state.pending_index == 0
 291            {
 292                SemanticIndexStatus::Indexed
 293            } else {
 294                SemanticIndexStatus::Indexing {
 295                    remaining_files: project_state.pending_file_count_rx.borrow().clone(),
 296                    rate_limit_expiry: self.embedding_provider.rate_limit_expiration(),
 297                }
 298            }
 299        } else {
 300            SemanticIndexStatus::NotIndexed
 301        }
 302    }
 303
 304    async fn new(
 305        fs: Arc<dyn Fs>,
 306        database_path: PathBuf,
 307        embedding_provider: Arc<dyn EmbeddingProvider>,
 308        language_registry: Arc<LanguageRegistry>,
 309        mut cx: AsyncAppContext,
 310    ) -> Result<ModelHandle<Self>> {
 311        let t0 = Instant::now();
 312        let database_path = Arc::from(database_path);
 313        let db = VectorDatabase::new(fs.clone(), database_path, cx.background()).await?;
 314
 315        log::trace!(
 316            "db initialization took {:?} milliseconds",
 317            t0.elapsed().as_millis()
 318        );
 319
 320        Ok(cx.add_model(|cx| {
 321            let t0 = Instant::now();
 322            let embedding_queue =
 323                EmbeddingQueue::new(embedding_provider.clone(), cx.background().clone());
 324            let _embedding_task = cx.background().spawn({
 325                let embedded_files = embedding_queue.finished_files();
 326                let db = db.clone();
 327                async move {
 328                    while let Ok(file) = embedded_files.recv().await {
 329                        db.insert_file(file.worktree_id, file.path, file.mtime, file.spans)
 330                            .await
 331                            .log_err();
 332                    }
 333                }
 334            });
 335
 336            // Parse files into embeddable spans.
 337            let (parsing_files_tx, parsing_files_rx) =
 338                channel::unbounded::<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>();
 339            let embedding_queue = Arc::new(Mutex::new(embedding_queue));
 340            let mut _parsing_files_tasks = Vec::new();
 341            for _ in 0..cx.background().num_cpus() {
 342                let fs = fs.clone();
 343                let mut parsing_files_rx = parsing_files_rx.clone();
 344                let embedding_provider = embedding_provider.clone();
 345                let embedding_queue = embedding_queue.clone();
 346                let background = cx.background().clone();
 347                _parsing_files_tasks.push(cx.background().spawn(async move {
 348                    let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
 349                    loop {
 350                        let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
 351                        let mut next_file_to_parse = parsing_files_rx.next().fuse();
 352                        futures::select_biased! {
 353                            next_file_to_parse = next_file_to_parse => {
 354                                if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
 355                                    Self::parse_file(
 356                                        &fs,
 357                                        pending_file,
 358                                        &mut retriever,
 359                                        &embedding_queue,
 360                                        &embeddings_for_digest,
 361                                    )
 362                                    .await
 363                                } else {
 364                                    break;
 365                                }
 366                            },
 367                            _ = timer => {
 368                                embedding_queue.lock().flush();
 369                            }
 370                        }
 371                    }
 372                }));
 373            }
 374
 375            log::trace!(
 376                "semantic index task initialization took {:?} milliseconds",
 377                t0.elapsed().as_millis()
 378            );
 379            Self {
 380                fs,
 381                db,
 382                embedding_provider,
 383                language_registry,
 384                parsing_files_tx,
 385                _embedding_task,
 386                _parsing_files_tasks,
 387                projects: Default::default(),
 388            }
 389        }))
 390    }
 391
 392    async fn parse_file(
 393        fs: &Arc<dyn Fs>,
 394        pending_file: PendingFile,
 395        retriever: &mut CodeContextRetriever,
 396        embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
 397        embeddings_for_digest: &HashMap<SpanDigest, Embedding>,
 398    ) {
 399        let Some(language) = pending_file.language else {
 400            return;
 401        };
 402
 403        if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
 404            if let Some(mut spans) = retriever
 405                .parse_file_with_template(&pending_file.relative_path, &content, language)
 406                .log_err()
 407            {
 408                log::trace!(
 409                    "parsed path {:?}: {} spans",
 410                    pending_file.relative_path,
 411                    spans.len()
 412                );
 413
 414                for span in &mut spans {
 415                    if let Some(embedding) = embeddings_for_digest.get(&span.digest) {
 416                        span.embedding = Some(embedding.to_owned());
 417                    }
 418                }
 419
 420                embedding_queue.lock().push(FileToEmbed {
 421                    worktree_id: pending_file.worktree_db_id,
 422                    path: pending_file.relative_path,
 423                    mtime: pending_file.modified_time,
 424                    job_handle: pending_file.job_handle,
 425                    spans: spans,
 426                });
 427            }
 428        }
 429    }
 430
 431    pub fn project_previously_indexed(
 432        &mut self,
 433        project: &ModelHandle<Project>,
 434        cx: &mut ModelContext<Self>,
 435    ) -> Task<Result<bool>> {
 436        let worktrees_indexed_previously = project
 437            .read(cx)
 438            .worktrees(cx)
 439            .map(|worktree| {
 440                self.db
 441                    .worktree_previously_indexed(&worktree.read(cx).abs_path())
 442            })
 443            .collect::<Vec<_>>();
 444        cx.spawn(|_, _cx| async move {
 445            let worktree_indexed_previously =
 446                futures::future::join_all(worktrees_indexed_previously).await;
 447
 448            Ok(worktree_indexed_previously
 449                .iter()
 450                .filter(|worktree| worktree.is_ok())
 451                .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
 452        })
 453    }
 454
 455    fn project_entries_changed(
 456        &mut self,
 457        project: ModelHandle<Project>,
 458        worktree_id: WorktreeId,
 459        changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
 460        cx: &mut ModelContext<Self>,
 461    ) {
 462        let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
 463            return;
 464        };
 465        let project = project.downgrade();
 466        let Some(project_state) = self.projects.get_mut(&project) else {
 467            return;
 468        };
 469
 470        let worktree = worktree.read(cx);
 471        let worktree_state =
 472            if let Some(worktree_state) = project_state.worktrees.get_mut(&worktree_id) {
 473                worktree_state
 474            } else {
 475                return;
 476            };
 477        worktree_state.paths_changed(changes, worktree);
 478        if let WorktreeState::Registered(_) = worktree_state {
 479            cx.spawn_weak(|this, mut cx| async move {
 480                cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
 481                if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
 482                    this.update(&mut cx, |this, cx| {
 483                        this.index_project(project, cx).detach_and_log_err(cx)
 484                    });
 485                }
 486            })
 487            .detach();
 488        }
 489    }
 490
 491    fn register_worktree(
 492        &mut self,
 493        project: ModelHandle<Project>,
 494        worktree: ModelHandle<Worktree>,
 495        cx: &mut ModelContext<Self>,
 496    ) {
 497        let project = project.downgrade();
 498        let project_state = if let Some(project_state) = self.projects.get_mut(&project) {
 499            project_state
 500        } else {
 501            return;
 502        };
 503        let worktree = if let Some(worktree) = worktree.read(cx).as_local() {
 504            worktree
 505        } else {
 506            return;
 507        };
 508        let worktree_abs_path = worktree.abs_path().clone();
 509        let scan_complete = worktree.scan_complete();
 510        let worktree_id = worktree.id();
 511        let db = self.db.clone();
 512        let language_registry = self.language_registry.clone();
 513        let (mut done_tx, done_rx) = watch::channel();
 514        let registration = cx.spawn(|this, mut cx| {
 515            async move {
 516                let register = async {
 517                    scan_complete.await;
 518                    let db_id = db.find_or_create_worktree(worktree_abs_path).await?;
 519                    let mut file_mtimes = db.get_file_mtimes(db_id).await?;
 520                    let worktree = if let Some(project) = project.upgrade(&cx) {
 521                        project
 522                            .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx))
 523                            .ok_or_else(|| anyhow!("worktree not found"))?
 524                    } else {
 525                        return anyhow::Ok(());
 526                    };
 527                    let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot());
 528                    let mut changed_paths = cx
 529                        .background()
 530                        .spawn(async move {
 531                            let mut changed_paths = BTreeMap::new();
 532                            for file in worktree.files(false, 0) {
 533                                let absolute_path = worktree.absolutize(&file.path);
 534
 535                                if file.is_external || file.is_ignored || file.is_symlink {
 536                                    continue;
 537                                }
 538
 539                                if let Ok(language) = language_registry
 540                                    .language_for_file(&absolute_path, None)
 541                                    .await
 542                                {
 543                                    // Test if file is valid parseable file
 544                                    if !PARSEABLE_ENTIRE_FILE_TYPES
 545                                        .contains(&language.name().as_ref())
 546                                        && &language.name().as_ref() != &"Markdown"
 547                                        && language
 548                                            .grammar()
 549                                            .and_then(|grammar| grammar.embedding_config.as_ref())
 550                                            .is_none()
 551                                    {
 552                                        continue;
 553                                    }
 554
 555                                    let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
 556                                    let already_stored = stored_mtime
 557                                        .map_or(false, |existing_mtime| {
 558                                            existing_mtime == file.mtime
 559                                        });
 560
 561                                    if !already_stored {
 562                                        changed_paths.insert(
 563                                            file.path.clone(),
 564                                            ChangedPathInfo {
 565                                                mtime: file.mtime,
 566                                                is_deleted: false,
 567                                            },
 568                                        );
 569                                    }
 570                                }
 571                            }
 572
 573                            // Clean up entries from database that are no longer in the worktree.
 574                            for (path, mtime) in file_mtimes {
 575                                changed_paths.insert(
 576                                    path.into(),
 577                                    ChangedPathInfo {
 578                                        mtime,
 579                                        is_deleted: true,
 580                                    },
 581                                );
 582                            }
 583
 584                            anyhow::Ok(changed_paths)
 585                        })
 586                        .await?;
 587                    this.update(&mut cx, |this, cx| {
 588                        let project_state = this
 589                            .projects
 590                            .get_mut(&project)
 591                            .ok_or_else(|| anyhow!("project not registered"))?;
 592                        let project = project
 593                            .upgrade(cx)
 594                            .ok_or_else(|| anyhow!("project was dropped"))?;
 595
 596                        if let Some(WorktreeState::Registering(state)) =
 597                            project_state.worktrees.remove(&worktree_id)
 598                        {
 599                            changed_paths.extend(state.changed_paths);
 600                        }
 601                        project_state.worktrees.insert(
 602                            worktree_id,
 603                            WorktreeState::Registered(RegisteredWorktreeState {
 604                                db_id,
 605                                changed_paths,
 606                            }),
 607                        );
 608                        this.index_project(project, cx).detach_and_log_err(cx);
 609
 610                        anyhow::Ok(())
 611                    })?;
 612
 613                    anyhow::Ok(())
 614                };
 615
 616                if register.await.log_err().is_none() {
 617                    // Stop tracking this worktree if the registration failed.
 618                    this.update(&mut cx, |this, _| {
 619                        this.projects.get_mut(&project).map(|project_state| {
 620                            project_state.worktrees.remove(&worktree_id);
 621                        });
 622                    })
 623                }
 624
 625                *done_tx.borrow_mut() = Some(());
 626            }
 627        });
 628        project_state.worktrees.insert(
 629            worktree_id,
 630            WorktreeState::Registering(RegisteringWorktreeState {
 631                changed_paths: Default::default(),
 632                done_rx,
 633                _registration: registration,
 634            }),
 635        );
 636    }
 637
 638    fn project_worktrees_changed(
 639        &mut self,
 640        project: ModelHandle<Project>,
 641        cx: &mut ModelContext<Self>,
 642    ) {
 643        let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade())
 644        {
 645            project_state
 646        } else {
 647            return;
 648        };
 649
 650        let mut worktrees = project
 651            .read(cx)
 652            .worktrees(cx)
 653            .filter(|worktree| worktree.read(cx).is_local())
 654            .collect::<Vec<_>>();
 655        let worktree_ids = worktrees
 656            .iter()
 657            .map(|worktree| worktree.read(cx).id())
 658            .collect::<HashSet<_>>();
 659
 660        // Remove worktrees that are no longer present
 661        project_state
 662            .worktrees
 663            .retain(|worktree_id, _| worktree_ids.contains(worktree_id));
 664
 665        // Register new worktrees
 666        worktrees.retain(|worktree| {
 667            let worktree_id = worktree.read(cx).id();
 668            !project_state.worktrees.contains_key(&worktree_id)
 669        });
 670        for worktree in worktrees {
 671            self.register_worktree(project.clone(), worktree, cx);
 672        }
 673    }
 674
 675    pub fn pending_file_count(
 676        &self,
 677        project: &ModelHandle<Project>,
 678    ) -> Option<watch::Receiver<usize>> {
 679        Some(
 680            self.projects
 681                .get(&project.downgrade())?
 682                .pending_file_count_rx
 683                .clone(),
 684        )
 685    }
 686
 687    pub fn search_project(
 688        &mut self,
 689        project: ModelHandle<Project>,
 690        phrase: String,
 691        limit: usize,
 692        includes: Vec<PathMatcher>,
 693        excludes: Vec<PathMatcher>,
 694        cx: &mut ModelContext<Self>,
 695    ) -> Task<Result<Vec<SearchResult>>> {
 696        let index = self.index_project(project.clone(), cx);
 697        let embedding_provider = self.embedding_provider.clone();
 698        let db_path = self.db.path().clone();
 699        let fs = self.fs.clone();
 700        cx.spawn(|this, mut cx| async move {
 701            index.await?;
 702
 703            let t0 = Instant::now();
 704            let database =
 705                VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?;
 706
 707            if phrase.len() == 0 {
 708                return Ok(Vec::new());
 709            }
 710
 711            let phrase_embedding = embedding_provider
 712                .embed_batch(vec![phrase])
 713                .await?
 714                .into_iter()
 715                .next()
 716                .unwrap();
 717
 718            log::trace!(
 719                "Embedding search phrase took: {:?} milliseconds",
 720                t0.elapsed().as_millis()
 721            );
 722
 723            let worktree_db_ids = this.read_with(&cx, |this, _| {
 724                let project_state = this
 725                    .projects
 726                    .get(&project.downgrade())
 727                    .ok_or_else(|| anyhow!("project was not indexed"))?;
 728                let worktree_db_ids = project_state
 729                    .worktrees
 730                    .values()
 731                    .filter_map(|worktree| {
 732                        if let WorktreeState::Registered(worktree) = worktree {
 733                            Some(worktree.db_id)
 734                        } else {
 735                            None
 736                        }
 737                    })
 738                    .collect::<Vec<i64>>();
 739                anyhow::Ok(worktree_db_ids)
 740            })?;
 741            let file_ids = database
 742                .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)
 743                .await?;
 744
 745            let batch_n = cx.background().num_cpus();
 746            let ids_len = file_ids.clone().len();
 747            let batch_size = if ids_len <= batch_n {
 748                ids_len
 749            } else {
 750                ids_len / batch_n
 751            };
 752
 753            let mut batch_results = Vec::new();
 754            for batch in file_ids.chunks(batch_size) {
 755                let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
 756                let limit = limit.clone();
 757                let fs = fs.clone();
 758                let db_path = db_path.clone();
 759                let phrase_embedding = phrase_embedding.clone();
 760                if let Some(db) = VectorDatabase::new(fs, db_path.clone(), cx.background())
 761                    .await
 762                    .log_err()
 763                {
 764                    batch_results.push(async move {
 765                        db.top_k_search(&phrase_embedding, limit, batch.as_slice())
 766                            .await
 767                    });
 768                }
 769            }
 770            let batch_results = futures::future::join_all(batch_results).await;
 771
 772            let mut results = Vec::new();
 773            for batch_result in batch_results {
 774                if batch_result.is_ok() {
 775                    for (id, similarity) in batch_result.unwrap() {
 776                        let ix = match results.binary_search_by(|(_, s)| {
 777                            similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
 778                        }) {
 779                            Ok(ix) => ix,
 780                            Err(ix) => ix,
 781                        };
 782                        results.insert(ix, (id, similarity));
 783                        results.truncate(limit);
 784                    }
 785                }
 786            }
 787
 788            let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
 789            let spans = database.spans_for_ids(ids.as_slice()).await?;
 790
 791            let mut tasks = Vec::new();
 792            let mut ranges = Vec::new();
 793            let weak_project = project.downgrade();
 794            project.update(&mut cx, |project, cx| {
 795                for (worktree_db_id, file_path, byte_range) in spans {
 796                    let project_state =
 797                        if let Some(state) = this.read(cx).projects.get(&weak_project) {
 798                            state
 799                        } else {
 800                            return Err(anyhow!("project not added"));
 801                        };
 802                    if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
 803                        tasks.push(project.open_buffer((worktree_id, file_path), cx));
 804                        ranges.push(byte_range);
 805                    }
 806                }
 807
 808                Ok(())
 809            })?;
 810
 811            let buffers = futures::future::join_all(tasks).await;
 812
 813            log::trace!(
 814                "Semantic Searching took: {:?} milliseconds in total",
 815                t0.elapsed().as_millis()
 816            );
 817
 818            Ok(buffers
 819                .into_iter()
 820                .zip(ranges)
 821                .filter_map(|(buffer, range)| {
 822                    let buffer = buffer.log_err()?;
 823                    let range = buffer.read_with(&cx, |buffer, _| {
 824                        let start = buffer.clip_offset(range.start, Bias::Left);
 825                        let end = buffer.clip_offset(range.end, Bias::Right);
 826                        buffer.anchor_before(start)..buffer.anchor_after(end)
 827                    });
 828                    Some(SearchResult { buffer, range })
 829                })
 830                .collect::<Vec<_>>())
 831        })
 832    }
 833
 834    pub fn index_project(
 835        &mut self,
 836        project: ModelHandle<Project>,
 837        cx: &mut ModelContext<Self>,
 838    ) -> Task<Result<()>> {
 839        if !self.projects.contains_key(&project.downgrade()) {
 840            log::trace!("Registering Project for Semantic Index");
 841
 842            let subscription = cx.subscribe(&project, |this, project, event, cx| match event {
 843                project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
 844                    this.project_worktrees_changed(project.clone(), cx);
 845                }
 846                project::Event::WorktreeUpdatedEntries(worktree_id, changes) => {
 847                    this.project_entries_changed(project, *worktree_id, changes.clone(), cx);
 848                }
 849                _ => {}
 850            });
 851            let project_state = ProjectState::new(subscription, cx);
 852            self.projects.insert(project.downgrade(), project_state);
 853            self.project_worktrees_changed(project.clone(), cx);
 854        }
 855        let project_state = self.projects.get_mut(&project.downgrade()).unwrap();
 856        project_state.pending_index += 1;
 857        cx.notify();
 858
 859        let mut pending_file_count_rx = project_state.pending_file_count_rx.clone();
 860        let db = self.db.clone();
 861        let language_registry = self.language_registry.clone();
 862        let parsing_files_tx = self.parsing_files_tx.clone();
 863        let worktree_registration = self.wait_for_worktree_registration(&project, cx);
 864
 865        cx.spawn(|this, mut cx| async move {
 866            worktree_registration.await?;
 867
 868            let mut pending_files = Vec::new();
 869            let mut files_to_delete = Vec::new();
 870            this.update(&mut cx, |this, cx| {
 871                let project_state = this
 872                    .projects
 873                    .get_mut(&project.downgrade())
 874                    .ok_or_else(|| anyhow!("project was dropped"))?;
 875                let pending_file_count_tx = &project_state.pending_file_count_tx;
 876
 877                project_state
 878                    .worktrees
 879                    .retain(|worktree_id, worktree_state| {
 880                        let worktree = if let Some(worktree) =
 881                            project.read(cx).worktree_for_id(*worktree_id, cx)
 882                        {
 883                            worktree
 884                        } else {
 885                            return false;
 886                        };
 887                        let worktree_state =
 888                            if let WorktreeState::Registered(worktree_state) = worktree_state {
 889                                worktree_state
 890                            } else {
 891                                return true;
 892                            };
 893
 894                        worktree_state.changed_paths.retain(|path, info| {
 895                            if info.is_deleted {
 896                                files_to_delete.push((worktree_state.db_id, path.clone()));
 897                            } else {
 898                                let absolute_path = worktree.read(cx).absolutize(path);
 899                                let job_handle = JobHandle::new(pending_file_count_tx);
 900                                pending_files.push(PendingFile {
 901                                    absolute_path,
 902                                    relative_path: path.clone(),
 903                                    language: None,
 904                                    job_handle,
 905                                    modified_time: info.mtime,
 906                                    worktree_db_id: worktree_state.db_id,
 907                                });
 908                            }
 909
 910                            false
 911                        });
 912                        true
 913                    });
 914
 915                anyhow::Ok(())
 916            })?;
 917
 918            cx.background()
 919                .spawn(async move {
 920                    for (worktree_db_id, path) in files_to_delete {
 921                        db.delete_file(worktree_db_id, path).await.log_err();
 922                    }
 923
 924                    let embeddings_for_digest = {
 925                        let mut files = HashMap::default();
 926                        for pending_file in &pending_files {
 927                            files
 928                                .entry(pending_file.worktree_db_id)
 929                                .or_insert(Vec::new())
 930                                .push(pending_file.relative_path.clone());
 931                        }
 932                        Arc::new(
 933                            db.embeddings_for_files(files)
 934                                .await
 935                                .log_err()
 936                                .unwrap_or_default(),
 937                        )
 938                    };
 939
 940                    for mut pending_file in pending_files {
 941                        if let Ok(language) = language_registry
 942                            .language_for_file(&pending_file.relative_path, None)
 943                            .await
 944                        {
 945                            if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
 946                                && &language.name().as_ref() != &"Markdown"
 947                                && language
 948                                    .grammar()
 949                                    .and_then(|grammar| grammar.embedding_config.as_ref())
 950                                    .is_none()
 951                            {
 952                                continue;
 953                            }
 954                            pending_file.language = Some(language);
 955                        }
 956                        parsing_files_tx
 957                            .try_send((embeddings_for_digest.clone(), pending_file))
 958                            .ok();
 959                    }
 960
 961                    // Wait until we're done indexing.
 962                    while let Some(count) = pending_file_count_rx.next().await {
 963                        if count == 0 {
 964                            break;
 965                        }
 966                    }
 967                })
 968                .await;
 969
 970            this.update(&mut cx, |this, cx| {
 971                let project_state = this
 972                    .projects
 973                    .get_mut(&project.downgrade())
 974                    .ok_or_else(|| anyhow!("project was dropped"))?;
 975                project_state.pending_index -= 1;
 976                cx.notify();
 977                anyhow::Ok(())
 978            })?;
 979
 980            Ok(())
 981        })
 982    }
 983
 984    fn wait_for_worktree_registration(
 985        &self,
 986        project: &ModelHandle<Project>,
 987        cx: &mut ModelContext<Self>,
 988    ) -> Task<Result<()>> {
 989        let project = project.downgrade();
 990        cx.spawn_weak(|this, cx| async move {
 991            loop {
 992                let mut pending_worktrees = Vec::new();
 993                this.upgrade(&cx)
 994                    .ok_or_else(|| anyhow!("semantic index dropped"))?
 995                    .read_with(&cx, |this, _| {
 996                        if let Some(project) = this.projects.get(&project) {
 997                            for worktree in project.worktrees.values() {
 998                                if let WorktreeState::Registering(worktree) = worktree {
 999                                    pending_worktrees.push(worktree.done());
1000                                }
1001                            }
1002                        }
1003                    });
1004
1005                if pending_worktrees.is_empty() {
1006                    break;
1007                } else {
1008                    future::join_all(pending_worktrees).await;
1009                }
1010            }
1011            Ok(())
1012        })
1013    }
1014}
1015
1016impl Entity for SemanticIndex {
1017    type Event = ();
1018}
1019
1020impl Drop for JobHandle {
1021    fn drop(&mut self) {
1022        if let Some(inner) = Arc::get_mut(&mut self.tx) {
1023            // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
1024            if let Some(tx) = inner.upgrade() {
1025                let mut tx = tx.lock();
1026                *tx.borrow_mut() -= 1;
1027            }
1028        }
1029    }
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034
1035    use super::*;
1036    #[test]
1037    fn test_job_handle() {
1038        let (job_count_tx, job_count_rx) = watch::channel_with(0);
1039        let tx = Arc::new(Mutex::new(job_count_tx));
1040        let job_handle = JobHandle::new(&tx);
1041
1042        assert_eq!(1, *job_count_rx.borrow());
1043        let new_job_handle = job_handle.clone();
1044        assert_eq!(1, *job_count_rx.borrow());
1045        drop(job_handle);
1046        assert_eq!(1, *job_count_rx.borrow());
1047        drop(new_job_handle);
1048        assert_eq!(0, *job_count_rx.borrow());
1049    }
1050}