Detailed changes
@@ -2351,19 +2351,6 @@ dependencies = [
"digest",
]
-[[package]]
-name = "blake3"
-version = "1.8.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0"
-dependencies = [
- "arrayref",
- "arrayvec",
- "cc",
- "cfg-if",
- "constant_time_eq 0.3.1",
-]
-
[[package]]
name = "block"
version = "0.1.6"
@@ -3578,12 +3565,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
-[[package]]
-name = "constant_time_eq"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
-
[[package]]
name = "context_server"
version = "0.1.0"
@@ -6164,17 +6145,6 @@ dependencies = [
"futures-util",
]
-[[package]]
-name = "futures-batch"
-version = "0.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f444c45a1cb86f2a7e301469fd50a82084a60dadc25d94529a8312276ecb71a"
-dependencies = [
- "futures 0.3.31",
- "futures-timer",
- "pin-utils",
-]
-
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -6270,12 +6240,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
-[[package]]
-name = "futures-timer"
-version = "3.0.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
-
[[package]]
name = "futures-util"
version = "0.3.31"
@@ -14688,49 +14652,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
-[[package]]
-name = "semantic_index"
-version = "0.1.0"
-dependencies = [
- "anyhow",
- "arrayvec",
- "blake3",
- "client",
- "clock",
- "collections",
- "feature_flags",
- "fs",
- "futures 0.3.31",
- "futures-batch",
- "gpui",
- "heed",
- "http_client",
- "language",
- "language_model",
- "languages",
- "log",
- "open_ai",
- "parking_lot",
- "project",
- "reqwest_client",
- "serde",
- "serde_json",
- "settings",
- "sha2",
- "smol",
- "streaming-iterator",
- "tempfile",
- "theme",
- "tree-sitter",
- "ui",
- "unindent",
- "util",
- "workspace",
- "workspace-hack",
- "worktree",
- "zlog",
-]
-
[[package]]
name = "semantic_version"
version = "0.1.0"
@@ -20915,7 +20836,7 @@ dependencies = [
"aes",
"byteorder",
"bzip2",
- "constant_time_eq 0.1.5",
+ "constant_time_eq",
"crc32fast",
"crossbeam-utils",
"flate2",
@@ -143,7 +143,6 @@ members = [
"crates/rules_library",
"crates/schema_generator",
"crates/search",
- "crates/semantic_index",
"crates/semantic_version",
"crates/session",
"crates/settings",
@@ -373,7 +372,6 @@ rope = { path = "crates/rope" }
rpc = { path = "crates/rpc" }
rules_library = { path = "crates/rules_library" }
search = { path = "crates/search" }
-semantic_index = { path = "crates/semantic_index" }
semantic_version = { path = "crates/semantic_version" }
session = { path = "crates/session" }
settings = { path = "crates/settings" }
@@ -1,69 +0,0 @@
-[package]
-name = "semantic_index"
-description = "Process, chunk, and embed text as vectors for semantic search."
-version = "0.1.0"
-edition.workspace = true
-publish.workspace = true
-license = "GPL-3.0-or-later"
-
-[lints]
-workspace = true
-
-[lib]
-path = "src/semantic_index.rs"
-
-[[example]]
-name = "index"
-path = "examples/index.rs"
-crate-type = ["bin"]
-
-[dependencies]
-anyhow.workspace = true
-arrayvec.workspace = true
-blake3.workspace = true
-client.workspace = true
-clock.workspace = true
-collections.workspace = true
-feature_flags.workspace = true
-fs.workspace = true
-futures-batch.workspace = true
-futures.workspace = true
-gpui.workspace = true
-heed.workspace = true
-http_client.workspace = true
-language.workspace = true
-language_model.workspace = true
-log.workspace = true
-open_ai.workspace = true
-parking_lot.workspace = true
-project.workspace = true
-serde.workspace = true
-serde_json.workspace = true
-settings.workspace = true
-sha2.workspace = true
-smol.workspace = true
-streaming-iterator.workspace = true
-theme.workspace = true
-tree-sitter.workspace = true
-ui.workspace = true
-unindent.workspace = true
-util.workspace = true
-workspace.workspace = true
-worktree.workspace = true
-workspace-hack.workspace = true
-
-[dev-dependencies]
-client = { workspace = true, features = ["test-support"] }
-fs = { workspace = true, features = ["test-support"] }
-futures.workspace = true
-gpui = { workspace = true, features = ["test-support"] }
-http_client = { workspace = true, features = ["test-support"] }
-language = { workspace = true, features = ["test-support"] }
-languages.workspace = true
-project = { workspace = true, features = ["test-support"] }
-tempfile.workspace = true
-reqwest_client.workspace = true
-util = { workspace = true, features = ["test-support"] }
-workspace = { workspace = true, features = ["test-support"] }
-worktree = { workspace = true, features = ["test-support"] }
-zlog.workspace = true
@@ -1 +0,0 @@
-../../LICENSE-GPL
@@ -1,140 +0,0 @@
-use client::Client;
-use futures::channel::oneshot;
-use gpui::Application;
-use http_client::HttpClientWithUrl;
-use language::language_settings::AllLanguageSettings;
-use project::Project;
-use semantic_index::{OpenAiEmbeddingModel, OpenAiEmbeddingProvider, SemanticDb};
-use settings::SettingsStore;
-use std::{
- path::{Path, PathBuf},
- sync::Arc,
-};
-
-fn main() {
- zlog::init();
-
- use clock::FakeSystemClock;
-
- Application::new().run(|cx| {
- let store = SettingsStore::test(cx);
- cx.set_global(store);
- language::init(cx);
- Project::init_settings(cx);
- SettingsStore::update(cx, |store, cx| {
- store.update_user_settings::<AllLanguageSettings>(cx, |_| {});
- });
-
- let clock = Arc::new(FakeSystemClock::new());
-
- let http = Arc::new(HttpClientWithUrl::new(
- Arc::new(
- reqwest_client::ReqwestClient::user_agent("Zed semantic index example").unwrap(),
- ),
- "http://localhost:11434",
- None,
- ));
- let client = client::Client::new(clock, http.clone(), cx);
- Client::set_global(client, cx);
-
- let args: Vec<String> = std::env::args().collect();
- if args.len() < 2 {
- eprintln!("Usage: cargo run --example index -p semantic_index -- <project_path>");
- cx.quit();
- return;
- }
-
- // let embedding_provider = semantic_index::FakeEmbeddingProvider;
-
- let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY not set");
-
- let embedding_provider = Arc::new(OpenAiEmbeddingProvider::new(
- http,
- OpenAiEmbeddingModel::TextEmbedding3Small,
- open_ai::OPEN_AI_API_URL.to_string(),
- api_key,
- ));
-
- cx.spawn(async move |cx| {
- let semantic_index = SemanticDb::new(
- PathBuf::from("/tmp/semantic-index-db.mdb"),
- embedding_provider,
- cx,
- );
-
- let mut semantic_index = semantic_index.await.unwrap();
-
- let project_path = Path::new(&args[1]);
-
- let project = Project::example([project_path], cx).await;
-
- cx.update(|cx| {
- let language_registry = project.read(cx).languages().clone();
- let node_runtime = project.read(cx).node_runtime().unwrap().clone();
- languages::init(language_registry, node_runtime, cx);
- })
- .unwrap();
-
- let project_index = cx
- .update(|cx| semantic_index.project_index(project.clone(), cx))
- .unwrap()
- .unwrap();
-
- let (tx, rx) = oneshot::channel();
- let mut tx = Some(tx);
- let subscription = cx.update(|cx| {
- cx.subscribe(&project_index, move |_, event, _| {
- if let Some(tx) = tx.take() {
- _ = tx.send(*event);
- }
- })
- });
-
- let index_start = std::time::Instant::now();
- rx.await.expect("no event emitted");
- drop(subscription);
- println!("Index time: {:?}", index_start.elapsed());
-
- let results = cx
- .update(|cx| {
- let project_index = project_index.read(cx);
- let query = "converting an anchor to a point";
- project_index.search(vec![query.into()], 4, cx)
- })
- .unwrap()
- .await
- .unwrap();
-
- for search_result in results {
- let path = search_result.path.clone();
-
- let content = cx
- .update(|cx| {
- let worktree = search_result.worktree.read(cx);
- let entry_abs_path = worktree.abs_path().join(search_result.path.clone());
- let fs = project.read(cx).fs().clone();
- cx.spawn(async move |_| fs.load(&entry_abs_path).await.unwrap())
- })
- .unwrap()
- .await;
-
- let range = search_result.range.clone();
- let content = content[search_result.range].to_owned();
-
- println!(
- "✄✄✄✄✄✄✄✄✄✄✄✄✄✄ {:?} @ {} ✄✄✄✄✄✄✄✄✄✄✄✄✄✄",
- path, search_result.score
- );
- println!("{:?}:{:?}:{:?}", path, range.start, range.end);
- println!("{}", content);
- }
-
- cx.background_executor()
- .timer(std::time::Duration::from_secs(100000))
- .await;
-
- cx.update(|cx| cx.quit()).unwrap();
- })
- .detach();
- });
-}
@@ -1,3 +0,0 @@
-fn main() {
- println!("Hello Indexer!");
-}
@@ -1,43 +0,0 @@
-# Searching for a needle in a haystack
-
-When you have a large amount of text, it can be useful to search for a specific word or phrase. This is often referred to as "finding a needle in a haystack." In this markdown document, we're "hiding" a key phrase for our text search to find. Can you find it?
-
-## Instructions
-
-1. Use the search functionality in your text editor or markdown viewer to find the hidden phrase in this document.
-
-2. Once you've found the **phrase**, write it down and proceed to the next step.
-
-Honestly, I just want to fill up plenty of characters so that we chunk this markdown into several chunks.
-
-## Tips
-
-- Relax
-- Take a deep breath
-- Focus on the task at hand
-- Don't get distracted by other text
-- Use the search functionality to your advantage
-
-## Example code
-
-```python
-def search_for_needle(haystack, needle):
- if needle in haystack:
- return True
- else:
- return False
-```
-
-```javascript
-function searchForNeedle(haystack, needle) {
- return haystack.includes(needle);
-}
-```
-
-## Background
-
-When creating an index for a book or searching for a specific term in a large document, the ability to quickly find a specific word or phrase is essential. This is where search functionality comes in handy. However, one should _remember_ that the search is only as good as the index that was built. As they say, garbage in, garbage out!
-
-## Conclusion
-
-Searching for a needle in a haystack can be a challenging task, but with the right tools and techniques, it becomes much easier. Whether you're looking for a specific word in a document or trying to find a key piece of information in a large dataset, the ability to search efficiently is a valuable skill to have.
@@ -1,415 +0,0 @@
-use language::{Language, with_parser, with_query_cursor};
-use serde::{Deserialize, Serialize};
-use sha2::{Digest, Sha256};
-use std::{
- cmp::{self, Reverse},
- ops::Range,
- path::Path,
- sync::Arc,
-};
-use streaming_iterator::StreamingIterator;
-use tree_sitter::QueryCapture;
-use util::ResultExt as _;
-
-#[derive(Copy, Clone)]
-struct ChunkSizeRange {
- min: usize,
- max: usize,
-}
-
-const CHUNK_SIZE_RANGE: ChunkSizeRange = ChunkSizeRange {
- min: 1024,
- max: 8192,
-};
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct Chunk {
- pub range: Range<usize>,
- pub digest: [u8; 32],
-}
-
-pub fn chunk_text(text: &str, language: Option<&Arc<Language>>, path: &Path) -> Vec<Chunk> {
- chunk_text_with_size_range(text, language, path, CHUNK_SIZE_RANGE)
-}
-
-fn chunk_text_with_size_range(
- text: &str,
- language: Option<&Arc<Language>>,
- path: &Path,
- size_config: ChunkSizeRange,
-) -> Vec<Chunk> {
- let ranges = syntactic_ranges(text, language, path).unwrap_or_default();
- chunk_text_with_syntactic_ranges(text, &ranges, size_config)
-}
-
-fn syntactic_ranges(
- text: &str,
- language: Option<&Arc<Language>>,
- path: &Path,
-) -> Option<Vec<Range<usize>>> {
- let language = language?;
- let grammar = language.grammar()?;
- let outline = grammar.outline_config.as_ref()?;
- let tree = with_parser(|parser| {
- parser.set_language(&grammar.ts_language).log_err()?;
- parser.parse(text, None)
- });
-
- let Some(tree) = tree else {
- log::error!("failed to parse file {path:?} for chunking");
- return None;
- };
-
- struct RowInfo {
- offset: usize,
- is_comment: bool,
- }
-
- let scope = language.default_scope();
- let line_comment_prefixes = scope.line_comment_prefixes();
- let row_infos = text
- .split('\n')
- .map({
- let mut offset = 0;
- move |line| {
- let line = line.trim_start();
- let is_comment = line_comment_prefixes
- .iter()
- .any(|prefix| line.starts_with(prefix.as_ref()));
- let result = RowInfo { offset, is_comment };
- offset += line.len() + 1;
- result
- }
- })
- .collect::<Vec<_>>();
-
- // Retrieve a list of ranges of outline items (types, functions, etc) in the document.
- // Omit single-line outline items (e.g. struct fields, constant declarations), because
- // we'll already be attempting to split on lines.
- let mut ranges = with_query_cursor(|cursor| {
- cursor
- .matches(&outline.query, tree.root_node(), text.as_bytes())
- .filter_map_deref(|mat| {
- mat.captures
- .iter()
- .find_map(|QueryCapture { node, index }| {
- if *index == outline.item_capture_ix {
- let mut start_offset = node.start_byte();
- let mut start_row = node.start_position().row;
- let end_offset = node.end_byte();
- let end_row = node.end_position().row;
-
- // Expand the range to include any preceding comments.
- while start_row > 0 && row_infos[start_row - 1].is_comment {
- start_offset = row_infos[start_row - 1].offset;
- start_row -= 1;
- }
-
- if end_row > start_row {
- return Some(start_offset..end_offset);
- }
- }
- None
- })
- })
- .collect::<Vec<_>>()
- });
-
- ranges.sort_unstable_by_key(|range| (range.start, Reverse(range.end)));
- Some(ranges)
-}
-
-fn chunk_text_with_syntactic_ranges(
- text: &str,
- mut syntactic_ranges: &[Range<usize>],
- size_config: ChunkSizeRange,
-) -> Vec<Chunk> {
- let mut chunks = Vec::new();
- let mut range = 0..0;
- let mut range_end_nesting_depth = 0;
-
- // Try to split the text at line boundaries.
- let mut line_ixs = text
- .match_indices('\n')
- .map(|(ix, _)| ix + 1)
- .chain(if text.ends_with('\n') {
- None
- } else {
- Some(text.len())
- })
- .peekable();
-
- while let Some(&line_ix) = line_ixs.peek() {
- // If the current position is beyond the maximum chunk size, then
- // start a new chunk.
- if line_ix - range.start > size_config.max {
- if range.is_empty() {
- range.end = cmp::min(range.start + size_config.max, line_ix);
- while !text.is_char_boundary(range.end) {
- range.end -= 1;
- }
- }
-
- chunks.push(Chunk {
- range: range.clone(),
- digest: Sha256::digest(&text[range.clone()]).into(),
- });
- range_end_nesting_depth = 0;
- range.start = range.end;
- continue;
- }
-
- // Discard any syntactic ranges that end before the current position.
- while let Some(first_item) = syntactic_ranges.first() {
- if first_item.end < line_ix {
- syntactic_ranges = &syntactic_ranges[1..];
- continue;
- } else {
- break;
- }
- }
-
- // Count how many syntactic ranges contain the current position.
- let mut nesting_depth = 0;
- for range in syntactic_ranges {
- if range.start > line_ix {
- break;
- }
- if range.start < line_ix && range.end > line_ix {
- nesting_depth += 1;
- }
- }
-
- // Extend the current range to this position, unless an earlier candidate
- // end position was less nested syntactically.
- if range.len() < size_config.min || nesting_depth <= range_end_nesting_depth {
- range.end = line_ix;
- range_end_nesting_depth = nesting_depth;
- }
-
- line_ixs.next();
- }
-
- if !range.is_empty() {
- chunks.push(Chunk {
- range: range.clone(),
- digest: Sha256::digest(&text[range]).into(),
- });
- }
-
- chunks
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use language::{Language, LanguageConfig, LanguageMatcher, tree_sitter_rust};
- use unindent::Unindent as _;
-
- #[test]
- fn test_chunk_text_with_syntax() {
- let language = rust_language();
-
- let text = "
- struct Person {
- first_name: String,
- last_name: String,
- age: u32,
- }
-
- impl Person {
- fn new(first_name: String, last_name: String, age: u32) -> Self {
- Self { first_name, last_name, age }
- }
-
- /// Returns the first name
- /// something something something
- fn first_name(&self) -> &str {
- &self.first_name
- }
-
- fn last_name(&self) -> &str {
- &self.last_name
- }
-
- fn age(&self) -> u32 {
- self.age
- }
- }
- "
- .unindent();
-
- let chunks = chunk_text_with_size_range(
- &text,
- Some(&language),
- Path::new("lib.rs"),
- ChunkSizeRange {
- min: text.find('}').unwrap(),
- max: text.find("Self {").unwrap(),
- },
- );
-
- // The entire impl cannot fit in a chunk, so it is split.
- // Within the impl, two methods can fit in a chunk.
- assert_chunks(
- &text,
- &chunks,
- &[
- "struct Person {", // ...
- "impl Person {",
- " /// Returns the first name",
- " fn last_name",
- ],
- );
-
- let text = "
- struct T {}
- struct U {}
- struct V {}
- struct W {
- a: T,
- b: U,
- }
- "
- .unindent();
-
- let chunks = chunk_text_with_size_range(
- &text,
- Some(&language),
- Path::new("lib.rs"),
- ChunkSizeRange {
- min: text.find('{').unwrap(),
- max: text.find('V').unwrap(),
- },
- );
-
- // Two single-line structs can fit in a chunk.
- // The last struct cannot fit in a chunk together
- // with the previous single-line struct.
- assert_chunks(
- &text,
- &chunks,
- &[
- "struct T", // ...
- "struct V", // ...
- "struct W", // ...
- "}",
- ],
- );
- }
-
- #[test]
- fn test_chunk_with_long_lines() {
- let language = rust_language();
-
- let text = "
- struct S { a: u32 }
- struct T { a: u64 }
- struct U { a: u64, b: u64, c: u64, d: u64, e: u64, f: u64, g: u64, h: u64, i: u64, j: u64 }
- struct W { a: u64, b: u64, c: u64, d: u64, e: u64, f: u64, g: u64, h: u64, i: u64, j: u64 }
- "
- .unindent();
-
- let chunks = chunk_text_with_size_range(
- &text,
- Some(&language),
- Path::new("lib.rs"),
- ChunkSizeRange { min: 32, max: 64 },
- );
-
- // The line is too long to fit in one chunk
- assert_chunks(
- &text,
- &chunks,
- &[
- "struct S {", // ...
- "struct U",
- "4, h: u64, i: u64", // ...
- "struct W",
- "4, h: u64, i: u64", // ...
- ],
- );
- }
-
- #[track_caller]
- fn assert_chunks(text: &str, chunks: &[Chunk], expected_chunk_text_prefixes: &[&str]) {
- check_chunk_invariants(text, chunks);
-
- assert_eq!(
- chunks.len(),
- expected_chunk_text_prefixes.len(),
- "unexpected number of chunks: {chunks:?}",
- );
-
- let mut prev_chunk_end = 0;
- for (ix, chunk) in chunks.iter().enumerate() {
- let expected_prefix = expected_chunk_text_prefixes[ix];
- let chunk_text = &text[chunk.range.clone()];
- if !chunk_text.starts_with(expected_prefix) {
- let chunk_prefix_offset = text[prev_chunk_end..].find(expected_prefix);
- if let Some(chunk_prefix_offset) = chunk_prefix_offset {
- panic!(
- "chunk {ix} starts at unexpected offset {}. expected {}",
- chunk.range.start,
- chunk_prefix_offset + prev_chunk_end
- );
- } else {
- panic!("invalid expected chunk prefix {ix}: {expected_prefix:?}");
- }
- }
- prev_chunk_end = chunk.range.end;
- }
- }
-
- #[track_caller]
- fn check_chunk_invariants(text: &str, chunks: &[Chunk]) {
- for (ix, chunk) in chunks.iter().enumerate() {
- if ix > 0 && chunk.range.start != chunks[ix - 1].range.end {
- panic!("chunk ranges are not contiguous: {:?}", chunks);
- }
- }
-
- if text.is_empty() {
- assert!(chunks.is_empty())
- } else if chunks.first().unwrap().range.start != 0
- || chunks.last().unwrap().range.end != text.len()
- {
- panic!("chunks don't cover entire text {:?}", chunks);
- }
- }
-
- #[test]
- fn test_chunk_text() {
- let text = "a\n".repeat(1000);
- let chunks = chunk_text(&text, None, Path::new("lib.rs"));
- assert_eq!(
- chunks.len(),
- ((2000_f64) / (CHUNK_SIZE_RANGE.max as f64)).ceil() as usize
- );
- }
-
- fn rust_language() -> Arc<Language> {
- Arc::new(
- Language::new(
- LanguageConfig {
- name: "Rust".into(),
- matcher: LanguageMatcher {
- path_suffixes: vec!["rs".to_string()],
- ..Default::default()
- },
- ..Default::default()
- },
- Some(tree_sitter_rust::LANGUAGE.into()),
- )
- .with_outline_query(
- "
- (function_item name: (_) @name) @item
- (impl_item type: (_) @name) @item
- (struct_item name: (_) @name) @item
- (field_declaration name: (_) @name) @item
- ",
- )
- .unwrap(),
- )
- }
-}
@@ -1,134 +0,0 @@
-mod lmstudio;
-mod ollama;
-mod open_ai;
-
-pub use lmstudio::*;
-pub use ollama::*;
-pub use open_ai::*;
-use sha2::{Digest, Sha256};
-
-use anyhow::Result;
-use futures::{FutureExt, future::BoxFuture};
-use serde::{Deserialize, Serialize};
-use std::{fmt, future};
-
-/// Trait for embedding providers. Texts in, vectors out.
-pub trait EmbeddingProvider: Sync + Send {
- fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>>;
- fn batch_size(&self) -> usize;
-}
-
-#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
-pub struct Embedding(Vec<f32>);
-
-impl Embedding {
- pub fn new(mut embedding: Vec<f32>) -> Self {
- let len = embedding.len();
- let mut norm = 0f32;
-
- for i in 0..len {
- norm += embedding[i] * embedding[i];
- }
-
- norm = norm.sqrt();
- for dimension in &mut embedding {
- *dimension /= norm;
- }
-
- Self(embedding)
- }
-
- fn len(&self) -> usize {
- self.0.len()
- }
-
- pub fn similarity(&self, others: &[Embedding]) -> (f32, usize) {
- debug_assert!(others.iter().all(|other| self.0.len() == other.0.len()));
- others
- .iter()
- .enumerate()
- .map(|(index, other)| {
- let dot_product: f32 = self
- .0
- .iter()
- .copied()
- .zip(other.0.iter().copied())
- .map(|(a, b)| a * b)
- .sum();
- (dot_product, index)
- })
- .max_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal))
- .unwrap_or((0.0, 0))
- }
-}
-
-impl fmt::Display for Embedding {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let digits_to_display = 3;
-
- // Start the Embedding display format
- write!(f, "Embedding(sized: {}; values: [", self.len())?;
-
- for (index, value) in self.0.iter().enumerate().take(digits_to_display) {
- // Lead with comma if not the first element
- if index != 0 {
- write!(f, ", ")?;
- }
- write!(f, "{:.3}", value)?;
- }
- if self.len() > digits_to_display {
- write!(f, "...")?;
- }
- write!(f, "])")
- }
-}
-
-#[derive(Debug)]
-pub struct TextToEmbed<'a> {
- pub text: &'a str,
- pub digest: [u8; 32],
-}
-
-impl<'a> TextToEmbed<'a> {
- pub fn new(text: &'a str) -> Self {
- let digest = Sha256::digest(text.as_bytes());
- Self {
- text,
- digest: digest.into(),
- }
- }
-}
-
-pub struct FakeEmbeddingProvider;
-
-impl EmbeddingProvider for FakeEmbeddingProvider {
- fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>> {
- let embeddings = texts
- .iter()
- .map(|_text| {
- let mut embedding = vec![0f32; 1536];
- for i in 0..embedding.len() {
- embedding[i] = i as f32;
- }
- Embedding::new(embedding)
- })
- .collect();
- future::ready(Ok(embeddings)).boxed()
- }
-
- fn batch_size(&self) -> usize {
- 16
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
-
- #[gpui::test]
- fn test_normalize_embedding() {
- let normalized = Embedding::new(vec![1.0, 1.0, 1.0]);
- let value: f32 = 1.0 / 3.0_f32.sqrt();
- assert_eq!(normalized, Embedding(vec![value; 3]));
- }
-}
@@ -1,70 +0,0 @@
-use anyhow::{Context as _, Result};
-use futures::{AsyncReadExt as _, FutureExt, future::BoxFuture};
-use http_client::HttpClient;
-use serde::{Deserialize, Serialize};
-use std::sync::Arc;
-
-use crate::{Embedding, EmbeddingProvider, TextToEmbed};
-
-pub enum LmStudioEmbeddingModel {
- NomicEmbedText,
-}
-
-pub struct LmStudioEmbeddingProvider {
- client: Arc<dyn HttpClient>,
- model: LmStudioEmbeddingModel,
-}
-
-#[derive(Serialize)]
-struct LmStudioEmbeddingRequest {
- model: String,
- prompt: String,
-}
-
-#[derive(Deserialize)]
-struct LmStudioEmbeddingResponse {
- embedding: Vec<f32>,
-}
-
-impl LmStudioEmbeddingProvider {
- pub fn new(client: Arc<dyn HttpClient>, model: LmStudioEmbeddingModel) -> Self {
- Self { client, model }
- }
-}
-
-impl EmbeddingProvider for LmStudioEmbeddingProvider {
- fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>> {
- let model = match self.model {
- LmStudioEmbeddingModel::NomicEmbedText => "nomic-embed-text",
- };
-
- futures::future::try_join_all(texts.iter().map(|to_embed| {
- let request = LmStudioEmbeddingRequest {
- model: model.to_string(),
- prompt: to_embed.text.to_string(),
- };
-
- let request = serde_json::to_string(&request).unwrap();
-
- async {
- let response = self
- .client
- .post_json("http://localhost:1234/api/v0/embeddings", request.into())
- .await?;
-
- let mut body = String::new();
- response.into_body().read_to_string(&mut body).await?;
-
- let response: LmStudioEmbeddingResponse =
- serde_json::from_str(&body).context("Unable to parse response")?;
-
- Ok(Embedding::new(response.embedding))
- }
- }))
- .boxed()
- }
-
- fn batch_size(&self) -> usize {
- 256
- }
-}
@@ -1,74 +0,0 @@
-use anyhow::{Context as _, Result};
-use futures::{AsyncReadExt as _, FutureExt, future::BoxFuture};
-use http_client::HttpClient;
-use serde::{Deserialize, Serialize};
-use std::sync::Arc;
-
-use crate::{Embedding, EmbeddingProvider, TextToEmbed};
-
-pub enum OllamaEmbeddingModel {
- NomicEmbedText,
- MxbaiEmbedLarge,
-}
-
-pub struct OllamaEmbeddingProvider {
- client: Arc<dyn HttpClient>,
- model: OllamaEmbeddingModel,
-}
-
-#[derive(Serialize)]
-struct OllamaEmbeddingRequest {
- model: String,
- prompt: String,
-}
-
-#[derive(Deserialize)]
-struct OllamaEmbeddingResponse {
- embedding: Vec<f32>,
-}
-
-impl OllamaEmbeddingProvider {
- pub fn new(client: Arc<dyn HttpClient>, model: OllamaEmbeddingModel) -> Self {
- Self { client, model }
- }
-}
-
-impl EmbeddingProvider for OllamaEmbeddingProvider {
- fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>> {
- //
- let model = match self.model {
- OllamaEmbeddingModel::NomicEmbedText => "nomic-embed-text",
- OllamaEmbeddingModel::MxbaiEmbedLarge => "mxbai-embed-large",
- };
-
- futures::future::try_join_all(texts.iter().map(|to_embed| {
- let request = OllamaEmbeddingRequest {
- model: model.to_string(),
- prompt: to_embed.text.to_string(),
- };
-
- let request = serde_json::to_string(&request).unwrap();
-
- async {
- let response = self
- .client
- .post_json("http://localhost:11434/api/embeddings", request.into())
- .await?;
-
- let mut body = String::new();
- response.into_body().read_to_string(&mut body).await?;
-
- let response: OllamaEmbeddingResponse =
- serde_json::from_str(&body).context("Unable to pull response")?;
-
- Ok(Embedding::new(response.embedding))
- }
- }))
- .boxed()
- }
-
- fn batch_size(&self) -> usize {
- // TODO: Figure out decent value
- 10
- }
-}
@@ -1,55 +0,0 @@
-use crate::{Embedding, EmbeddingProvider, TextToEmbed};
-use anyhow::Result;
-use futures::{FutureExt, future::BoxFuture};
-use http_client::HttpClient;
-pub use open_ai::OpenAiEmbeddingModel;
-use std::sync::Arc;
-
-pub struct OpenAiEmbeddingProvider {
- client: Arc<dyn HttpClient>,
- model: OpenAiEmbeddingModel,
- api_url: String,
- api_key: String,
-}
-
-impl OpenAiEmbeddingProvider {
- pub fn new(
- client: Arc<dyn HttpClient>,
- model: OpenAiEmbeddingModel,
- api_url: String,
- api_key: String,
- ) -> Self {
- Self {
- client,
- model,
- api_url,
- api_key,
- }
- }
-}
-
-impl EmbeddingProvider for OpenAiEmbeddingProvider {
- fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>> {
- let embed = open_ai::embed(
- self.client.as_ref(),
- &self.api_url,
- &self.api_key,
- self.model,
- texts.iter().map(|to_embed| to_embed.text),
- );
- async move {
- let response = embed.await?;
- Ok(response
- .data
- .into_iter()
- .map(|data| Embedding::new(data.embedding))
- .collect())
- }
- .boxed()
- }
-
- fn batch_size(&self) -> usize {
- // From https://platform.openai.com/docs/api-reference/embeddings/create
- 2048
- }
-}
@@ -1,470 +0,0 @@
-use crate::{
- chunking::{self, Chunk},
- embedding::{Embedding, EmbeddingProvider, TextToEmbed},
- indexing::{IndexingEntryHandle, IndexingEntrySet},
-};
-use anyhow::{Context as _, Result};
-use collections::Bound;
-use feature_flags::FeatureFlagAppExt;
-use fs::Fs;
-use fs::MTime;
-use futures::{FutureExt as _, stream::StreamExt};
-use futures_batch::ChunksTimeoutStreamExt;
-use gpui::{App, AppContext as _, Entity, Task};
-use heed::types::{SerdeBincode, Str};
-use language::LanguageRegistry;
-use log;
-use project::{Entry, UpdatedEntriesSet, Worktree};
-use serde::{Deserialize, Serialize};
-use smol::channel;
-use std::{cmp::Ordering, future::Future, iter, path::Path, pin::pin, sync::Arc, time::Duration};
-use util::ResultExt;
-use worktree::Snapshot;
-
-pub struct EmbeddingIndex {
- worktree: Entity<Worktree>,
- db_connection: heed::Env,
- db: heed::Database<Str, SerdeBincode<EmbeddedFile>>,
- fs: Arc<dyn Fs>,
- language_registry: Arc<LanguageRegistry>,
- embedding_provider: Arc<dyn EmbeddingProvider>,
- entry_ids_being_indexed: Arc<IndexingEntrySet>,
-}
-
-impl EmbeddingIndex {
- pub fn new(
- worktree: Entity<Worktree>,
- fs: Arc<dyn Fs>,
- db_connection: heed::Env,
- embedding_db: heed::Database<Str, SerdeBincode<EmbeddedFile>>,
- language_registry: Arc<LanguageRegistry>,
- embedding_provider: Arc<dyn EmbeddingProvider>,
- entry_ids_being_indexed: Arc<IndexingEntrySet>,
- ) -> Self {
- Self {
- worktree,
- fs,
- db_connection,
- db: embedding_db,
- language_registry,
- embedding_provider,
- entry_ids_being_indexed,
- }
- }
-
- pub fn db(&self) -> &heed::Database<Str, SerdeBincode<EmbeddedFile>> {
- &self.db
- }
-
- pub fn index_entries_changed_on_disk(
- &self,
- cx: &App,
- ) -> impl Future<Output = Result<()>> + use<> {
- if !cx.is_staff() {
- return async move { Ok(()) }.boxed();
- }
-
- let worktree = self.worktree.read(cx).snapshot();
- let worktree_abs_path = worktree.abs_path().clone();
- let scan = self.scan_entries(worktree, cx);
- let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx);
- let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx);
- let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
- async move {
- futures::try_join!(scan.task, chunk.task, embed.task, persist)?;
- Ok(())
- }
- .boxed()
- }
-
- pub fn index_updated_entries(
- &self,
- updated_entries: UpdatedEntriesSet,
- cx: &App,
- ) -> impl Future<Output = Result<()>> + use<> {
- if !cx.is_staff() {
- return async move { Ok(()) }.boxed();
- }
-
- let worktree = self.worktree.read(cx).snapshot();
- let worktree_abs_path = worktree.abs_path().clone();
- let scan = self.scan_updated_entries(worktree, updated_entries, cx);
- let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx);
- let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx);
- let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
- async move {
- futures::try_join!(scan.task, chunk.task, embed.task, persist)?;
- Ok(())
- }
- .boxed()
- }
-
- fn scan_entries(&self, worktree: Snapshot, cx: &App) -> ScanEntries {
- let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
- let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
- let db_connection = self.db_connection.clone();
- let db = self.db;
- let entries_being_indexed = self.entry_ids_being_indexed.clone();
- let task = cx.background_spawn(async move {
- let txn = db_connection
- .read_txn()
- .context("failed to create read transaction")?;
- let mut db_entries = db
- .iter(&txn)
- .context("failed to create iterator")?
- .move_between_keys()
- .peekable();
-
- let mut deletion_range: Option<(Bound<&str>, Bound<&str>)> = None;
- for entry in worktree.files(false, 0) {
- log::trace!("scanning for embedding index: {:?}", &entry.path);
-
- let entry_db_key = db_key_for_path(&entry.path);
-
- let mut saved_mtime = None;
- while let Some(db_entry) = db_entries.peek() {
- match db_entry {
- Ok((db_path, db_embedded_file)) => match (*db_path).cmp(&entry_db_key) {
- Ordering::Less => {
- if let Some(deletion_range) = deletion_range.as_mut() {
- deletion_range.1 = Bound::Included(db_path);
- } else {
- deletion_range =
- Some((Bound::Included(db_path), Bound::Included(db_path)));
- }
-
- db_entries.next();
- }
- Ordering::Equal => {
- if let Some(deletion_range) = deletion_range.take() {
- deleted_entry_ranges_tx
- .send((
- deletion_range.0.map(ToString::to_string),
- deletion_range.1.map(ToString::to_string),
- ))
- .await?;
- }
- saved_mtime = db_embedded_file.mtime;
- db_entries.next();
- break;
- }
- Ordering::Greater => {
- break;
- }
- },
- Err(_) => return Err(db_entries.next().unwrap().unwrap_err())?,
- }
- }
-
- if entry.mtime != saved_mtime {
- let handle = entries_being_indexed.insert(entry.id);
- updated_entries_tx.send((entry.clone(), handle)).await?;
- }
- }
-
- if let Some(db_entry) = db_entries.next() {
- let (db_path, _) = db_entry?;
- deleted_entry_ranges_tx
- .send((Bound::Included(db_path.to_string()), Bound::Unbounded))
- .await?;
- }
-
- Ok(())
- });
-
- ScanEntries {
- updated_entries: updated_entries_rx,
- deleted_entry_ranges: deleted_entry_ranges_rx,
- task,
- }
- }
-
- fn scan_updated_entries(
- &self,
- worktree: Snapshot,
- updated_entries: UpdatedEntriesSet,
- cx: &App,
- ) -> ScanEntries {
- let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
- let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
- let entries_being_indexed = self.entry_ids_being_indexed.clone();
- let task = cx.background_spawn(async move {
- for (path, entry_id, status) in updated_entries.iter() {
- match status {
- project::PathChange::Added
- | project::PathChange::Updated
- | project::PathChange::AddedOrUpdated => {
- if let Some(entry) = worktree.entry_for_id(*entry_id)
- && entry.is_file()
- {
- let handle = entries_being_indexed.insert(entry.id);
- updated_entries_tx.send((entry.clone(), handle)).await?;
- }
- }
- project::PathChange::Removed => {
- let db_path = db_key_for_path(path);
- deleted_entry_ranges_tx
- .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
- .await?;
- }
- project::PathChange::Loaded => {
- // Do nothing.
- }
- }
- }
-
- Ok(())
- });
-
- ScanEntries {
- updated_entries: updated_entries_rx,
- deleted_entry_ranges: deleted_entry_ranges_rx,
- task,
- }
- }
-
- fn chunk_files(
- &self,
- worktree_abs_path: Arc<Path>,
- entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
- cx: &App,
- ) -> ChunkFiles {
- let language_registry = self.language_registry.clone();
- let fs = self.fs.clone();
- let (chunked_files_tx, chunked_files_rx) = channel::bounded(2048);
- let task = cx.spawn(async move |cx| {
- cx.background_executor()
- .scoped(|cx| {
- for _ in 0..cx.num_cpus() {
- cx.spawn(async {
- while let Ok((entry, handle)) = entries.recv().await {
- let entry_abs_path = worktree_abs_path.join(&entry.path);
- if let Some(text) = fs.load(&entry_abs_path).await.ok() {
- let language = language_registry
- .language_for_file_path(&entry.path)
- .await
- .ok();
- let chunked_file = ChunkedFile {
- chunks: chunking::chunk_text(
- &text,
- language.as_ref(),
- &entry.path,
- ),
- handle,
- path: entry.path,
- mtime: entry.mtime,
- text,
- };
-
- if chunked_files_tx.send(chunked_file).await.is_err() {
- return;
- }
- }
- }
- });
- }
- })
- .await;
- Ok(())
- });
-
- ChunkFiles {
- files: chunked_files_rx,
- task,
- }
- }
-
- pub fn embed_files(
- embedding_provider: Arc<dyn EmbeddingProvider>,
- chunked_files: channel::Receiver<ChunkedFile>,
- cx: &App,
- ) -> EmbedFiles {
- let embedding_provider = embedding_provider.clone();
- let (embedded_files_tx, embedded_files_rx) = channel::bounded(512);
- let task = cx.background_spawn(async move {
- let mut chunked_file_batches =
- pin!(chunked_files.chunks_timeout(512, Duration::from_secs(2)));
- while let Some(chunked_files) = chunked_file_batches.next().await {
- // View the batch of files as a vec of chunks
- // Flatten out to a vec of chunks that we can subdivide into batch sized pieces
- // Once those are done, reassemble them back into the files in which they belong
- // If any embeddings fail for a file, the entire file is discarded
-
- let chunks: Vec<TextToEmbed> = chunked_files
- .iter()
- .flat_map(|file| {
- file.chunks.iter().map(|chunk| TextToEmbed {
- text: &file.text[chunk.range.clone()],
- digest: chunk.digest,
- })
- })
- .collect::<Vec<_>>();
-
- let mut embeddings: Vec<Option<Embedding>> = Vec::new();
- for embedding_batch in chunks.chunks(embedding_provider.batch_size()) {
- if let Some(batch_embeddings) =
- embedding_provider.embed(embedding_batch).await.log_err()
- {
- if batch_embeddings.len() == embedding_batch.len() {
- embeddings.extend(batch_embeddings.into_iter().map(Some));
- continue;
- }
- log::error!(
- "embedding provider returned unexpected embedding count {}, expected {}",
- batch_embeddings.len(), embedding_batch.len()
- );
- }
-
- embeddings.extend(iter::repeat(None).take(embedding_batch.len()));
- }
-
- let mut embeddings = embeddings.into_iter();
- for chunked_file in chunked_files {
- let mut embedded_file = EmbeddedFile {
- path: chunked_file.path,
- mtime: chunked_file.mtime,
- chunks: Vec::new(),
- };
-
- let mut embedded_all_chunks = true;
- for (chunk, embedding) in
- chunked_file.chunks.into_iter().zip(embeddings.by_ref())
- {
- if let Some(embedding) = embedding {
- embedded_file
- .chunks
- .push(EmbeddedChunk { chunk, embedding });
- } else {
- embedded_all_chunks = false;
- }
- }
-
- if embedded_all_chunks {
- embedded_files_tx
- .send((embedded_file, chunked_file.handle))
- .await?;
- }
- }
- }
- Ok(())
- });
-
- EmbedFiles {
- files: embedded_files_rx,
- task,
- }
- }
-
- fn persist_embeddings(
- &self,
- deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
- embedded_files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
- cx: &App,
- ) -> Task<Result<()>> {
- let db_connection = self.db_connection.clone();
- let db = self.db;
-
- cx.background_spawn(async move {
- let mut deleted_entry_ranges = pin!(deleted_entry_ranges);
- let mut embedded_files = pin!(embedded_files);
- loop {
- // Interleave deletions and persists of embedded files
- futures::select_biased! {
- deletion_range = deleted_entry_ranges.next() => {
- if let Some(deletion_range) = deletion_range {
- let mut txn = db_connection.write_txn()?;
- let start = deletion_range.0.as_ref().map(|start| start.as_str());
- let end = deletion_range.1.as_ref().map(|end| end.as_str());
- log::debug!("deleting embeddings in range {:?}", &(start, end));
- db.delete_range(&mut txn, &(start, end))?;
- txn.commit()?;
- }
- },
- file = embedded_files.next() => {
- if let Some((file, _)) = file {
- let mut txn = db_connection.write_txn()?;
- log::debug!("saving embedding for file {:?}", file.path);
- let key = db_key_for_path(&file.path);
- db.put(&mut txn, &key, &file)?;
- txn.commit()?;
- }
- },
- complete => break,
- }
- }
-
- Ok(())
- })
- }
-
- pub fn paths(&self, cx: &App) -> Task<Result<Vec<Arc<Path>>>> {
- let connection = self.db_connection.clone();
- let db = self.db;
- cx.background_spawn(async move {
- let tx = connection
- .read_txn()
- .context("failed to create read transaction")?;
- let result = db
- .iter(&tx)?
- .map(|entry| Ok(entry?.1.path))
- .collect::<Result<Vec<Arc<Path>>>>();
- drop(tx);
- result
- })
- }
-
- pub fn chunks_for_path(&self, path: Arc<Path>, cx: &App) -> Task<Result<Vec<EmbeddedChunk>>> {
- let connection = self.db_connection.clone();
- let db = self.db;
- cx.background_spawn(async move {
- let tx = connection
- .read_txn()
- .context("failed to create read transaction")?;
- Ok(db
- .get(&tx, &db_key_for_path(&path))?
- .context("no such path")?
- .chunks)
- })
- }
-}
-
-struct ScanEntries {
- updated_entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
- deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
- task: Task<Result<()>>,
-}
-
-struct ChunkFiles {
- files: channel::Receiver<ChunkedFile>,
- task: Task<Result<()>>,
-}
-
-pub struct ChunkedFile {
- pub path: Arc<Path>,
- pub mtime: Option<MTime>,
- pub handle: IndexingEntryHandle,
- pub text: String,
- pub chunks: Vec<Chunk>,
-}
-
-pub struct EmbedFiles {
- pub files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
- pub task: Task<Result<()>>,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct EmbeddedFile {
- pub path: Arc<Path>,
- pub mtime: Option<MTime>,
- pub chunks: Vec<EmbeddedChunk>,
-}
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct EmbeddedChunk {
- pub chunk: Chunk,
- pub embedding: Embedding,
-}
-
-fn db_key_for_path(path: &Arc<Path>) -> String {
- path.to_string_lossy().replace('/', "\0")
-}
@@ -1,49 +0,0 @@
-use collections::HashSet;
-use parking_lot::Mutex;
-use project::ProjectEntryId;
-use smol::channel;
-use std::sync::{Arc, Weak};
-
-/// The set of entries that are currently being indexed.
-pub struct IndexingEntrySet {
- entry_ids: Mutex<HashSet<ProjectEntryId>>,
- tx: channel::Sender<()>,
-}
-
-/// When dropped, removes the entry from the set of entries that are being indexed.
-#[derive(Clone)]
-pub(crate) struct IndexingEntryHandle {
- entry_id: ProjectEntryId,
- set: Weak<IndexingEntrySet>,
-}
-
-impl IndexingEntrySet {
- pub fn new(tx: channel::Sender<()>) -> Self {
- Self {
- entry_ids: Default::default(),
- tx,
- }
- }
-
- pub fn insert(self: &Arc<Self>, entry_id: ProjectEntryId) -> IndexingEntryHandle {
- self.entry_ids.lock().insert(entry_id);
- self.tx.send_blocking(()).ok();
- IndexingEntryHandle {
- entry_id,
- set: Arc::downgrade(self),
- }
- }
-
- pub fn len(&self) -> usize {
- self.entry_ids.lock().len()
- }
-}
-
-impl Drop for IndexingEntryHandle {
- fn drop(&mut self) {
- if let Some(set) = self.set.upgrade() {
- set.tx.send_blocking(()).ok();
- set.entry_ids.lock().remove(&self.entry_id);
- }
- }
-}
@@ -1,548 +0,0 @@
-use crate::{
- embedding::{EmbeddingProvider, TextToEmbed},
- summary_index::FileSummary,
- worktree_index::{WorktreeIndex, WorktreeIndexHandle},
-};
-use anyhow::{Context as _, Result, anyhow};
-use collections::HashMap;
-use fs::Fs;
-use futures::FutureExt;
-use gpui::{
- App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity,
-};
-use language::LanguageRegistry;
-use log;
-use project::{Project, Worktree, WorktreeId};
-use serde::{Deserialize, Serialize};
-use smol::channel;
-use std::{
- cmp::Ordering,
- future::Future,
- num::NonZeroUsize,
- ops::{Range, RangeInclusive},
- path::{Path, PathBuf},
- sync::Arc,
-};
-use util::ResultExt;
-
-#[derive(Debug)]
-pub struct SearchResult {
- pub worktree: Entity<Worktree>,
- pub path: Arc<Path>,
- pub range: Range<usize>,
- pub score: f32,
- pub query_index: usize,
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub struct LoadedSearchResult {
- pub path: Arc<Path>,
- pub full_path: PathBuf,
- pub excerpt_content: String,
- pub row_range: RangeInclusive<u32>,
- pub query_index: usize,
-}
-
-pub struct WorktreeSearchResult {
- pub worktree_id: WorktreeId,
- pub path: Arc<Path>,
- pub range: Range<usize>,
- pub query_index: usize,
- pub score: f32,
-}
-
-#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
-pub enum Status {
- Idle,
- Loading,
- Scanning { remaining_count: NonZeroUsize },
-}
-
-pub struct ProjectIndex {
- db_connection: heed::Env,
- project: WeakEntity<Project>,
- worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
- language_registry: Arc<LanguageRegistry>,
- fs: Arc<dyn Fs>,
- last_status: Status,
- status_tx: channel::Sender<()>,
- embedding_provider: Arc<dyn EmbeddingProvider>,
- _maintain_status: Task<()>,
- _subscription: Subscription,
-}
-
-impl ProjectIndex {
- pub fn new(
- project: Entity<Project>,
- db_connection: heed::Env,
- embedding_provider: Arc<dyn EmbeddingProvider>,
- cx: &mut Context<Self>,
- ) -> Self {
- let language_registry = project.read(cx).languages().clone();
- let fs = project.read(cx).fs().clone();
- let (status_tx, status_rx) = channel::unbounded();
- let mut this = ProjectIndex {
- db_connection,
- project: project.downgrade(),
- worktree_indices: HashMap::default(),
- language_registry,
- fs,
- status_tx,
- last_status: Status::Idle,
- embedding_provider,
- _subscription: cx.subscribe(&project, Self::handle_project_event),
- _maintain_status: cx.spawn(async move |this, cx| {
- while status_rx.recv().await.is_ok() {
- if this.update(cx, |this, cx| this.update_status(cx)).is_err() {
- break;
- }
- }
- }),
- };
- this.update_worktree_indices(cx);
- this
- }
-
- pub fn status(&self) -> Status {
- self.last_status
- }
-
- pub fn project(&self) -> WeakEntity<Project> {
- self.project.clone()
- }
-
- pub fn fs(&self) -> Arc<dyn Fs> {
- self.fs.clone()
- }
-
- fn handle_project_event(
- &mut self,
- _: Entity<Project>,
- event: &project::Event,
- cx: &mut Context<Self>,
- ) {
- match event {
- project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
- self.update_worktree_indices(cx);
- }
- _ => {}
- }
- }
-
- fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
- let Some(project) = self.project.upgrade() else {
- return;
- };
-
- let worktrees = project
- .read(cx)
- .visible_worktrees(cx)
- .filter_map(|worktree| {
- if worktree.read(cx).is_local() {
- Some((worktree.entity_id(), worktree))
- } else {
- None
- }
- })
- .collect::<HashMap<_, _>>();
-
- self.worktree_indices
- .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
- for (worktree_id, worktree) in worktrees {
- self.worktree_indices.entry(worktree_id).or_insert_with(|| {
- let worktree_index = WorktreeIndex::load(
- worktree.clone(),
- self.db_connection.clone(),
- self.language_registry.clone(),
- self.fs.clone(),
- self.status_tx.clone(),
- self.embedding_provider.clone(),
- cx,
- );
-
- let load_worktree = cx.spawn(async move |this, cx| {
- let result = match worktree_index.await {
- Ok(worktree_index) => {
- this.update(cx, |this, _| {
- this.worktree_indices.insert(
- worktree_id,
- WorktreeIndexHandle::Loaded {
- index: worktree_index.clone(),
- },
- );
- })?;
- Ok(worktree_index)
- }
- Err(error) => {
- this.update(cx, |this, _cx| {
- this.worktree_indices.remove(&worktree_id)
- })?;
- Err(Arc::new(error))
- }
- };
-
- this.update(cx, |this, cx| this.update_status(cx))?;
-
- result
- });
-
- WorktreeIndexHandle::Loading {
- index: load_worktree.shared(),
- }
- });
- }
-
- self.update_status(cx);
- }
-
- fn update_status(&mut self, cx: &mut Context<Self>) {
- let mut indexing_count = 0;
- let mut any_loading = false;
-
- for index in self.worktree_indices.values_mut() {
- match index {
- WorktreeIndexHandle::Loading { .. } => {
- any_loading = true;
- break;
- }
- WorktreeIndexHandle::Loaded { index, .. } => {
- indexing_count += index.read(cx).entry_ids_being_indexed().len();
- }
- }
- }
-
- let status = if any_loading {
- Status::Loading
- } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
- Status::Scanning { remaining_count }
- } else {
- Status::Idle
- };
-
- if status != self.last_status {
- self.last_status = status;
- cx.emit(status);
- }
- }
-
- pub fn search(
- &self,
- queries: Vec<String>,
- limit: usize,
- cx: &App,
- ) -> Task<Result<Vec<SearchResult>>> {
- let (chunks_tx, chunks_rx) = channel::bounded(1024);
- let mut worktree_scan_tasks = Vec::new();
- for worktree_index in self.worktree_indices.values() {
- let worktree_index = worktree_index.clone();
- let chunks_tx = chunks_tx.clone();
- worktree_scan_tasks.push(cx.spawn(async move |cx| {
- let index = match worktree_index {
- WorktreeIndexHandle::Loading { index } => {
- index.clone().await.map_err(|error| anyhow!(error))?
- }
- WorktreeIndexHandle::Loaded { index } => index.clone(),
- };
-
- index
- .read_with(cx, |index, cx| {
- let worktree_id = index.worktree().read(cx).id();
- let db_connection = index.db_connection().clone();
- let db = *index.embedding_index().db();
- cx.background_spawn(async move {
- let txn = db_connection
- .read_txn()
- .context("failed to create read transaction")?;
- let db_entries = db.iter(&txn).context("failed to iterate database")?;
- for db_entry in db_entries {
- let (_key, db_embedded_file) = db_entry?;
- for chunk in db_embedded_file.chunks {
- chunks_tx
- .send((worktree_id, db_embedded_file.path.clone(), chunk))
- .await?;
- }
- }
- anyhow::Ok(())
- })
- })?
- .await
- }));
- }
- drop(chunks_tx);
-
- let project = self.project.clone();
- let embedding_provider = self.embedding_provider.clone();
- cx.spawn(async move |cx| {
- #[cfg(debug_assertions)]
- let embedding_query_start = std::time::Instant::now();
- log::info!("Searching for {queries:?}");
- let queries: Vec<TextToEmbed> = queries
- .iter()
- .map(|s| TextToEmbed::new(s.as_str()))
- .collect();
-
- let query_embeddings = embedding_provider.embed(&queries[..]).await?;
- anyhow::ensure!(
- query_embeddings.len() == queries.len(),
- "The number of query embeddings does not match the number of queries"
- );
-
- let mut results_by_worker = Vec::new();
- for _ in 0..cx.background_executor().num_cpus() {
- results_by_worker.push(Vec::<WorktreeSearchResult>::new());
- }
-
- #[cfg(debug_assertions)]
- let search_start = std::time::Instant::now();
- cx.background_executor()
- .scoped(|cx| {
- for results in results_by_worker.iter_mut() {
- cx.spawn(async {
- while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
- let (score, query_index) =
- chunk.embedding.similarity(&query_embeddings);
-
- let ix = match results.binary_search_by(|probe| {
- score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
- }) {
- Ok(ix) | Err(ix) => ix,
- };
- if ix < limit {
- results.insert(
- ix,
- WorktreeSearchResult {
- worktree_id,
- path: path.clone(),
- range: chunk.chunk.range.clone(),
- query_index,
- score,
- },
- );
- if results.len() > limit {
- results.pop();
- }
- }
- }
- });
- }
- })
- .await;
-
- for scan_task in futures::future::join_all(worktree_scan_tasks).await {
- scan_task.log_err();
- }
-
- project.read_with(cx, |project, cx| {
- let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
- for worker_results in results_by_worker {
- search_results.extend(worker_results.into_iter().filter_map(|result| {
- Some(SearchResult {
- worktree: project.worktree_for_id(result.worktree_id, cx)?,
- path: result.path,
- range: result.range,
- score: result.score,
- query_index: result.query_index,
- })
- }));
- }
- search_results.sort_unstable_by(|a, b| {
- b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
- });
- search_results.truncate(limit);
-
- #[cfg(debug_assertions)]
- {
- let search_elapsed = search_start.elapsed();
- log::debug!(
- "searched {} entries in {:?}",
- search_results.len(),
- search_elapsed
- );
- let embedding_query_elapsed = embedding_query_start.elapsed();
- log::debug!("embedding query took {:?}", embedding_query_elapsed);
- }
-
- search_results
- })
- })
- }
-
- #[cfg(test)]
- pub fn path_count(&self, cx: &App) -> Result<u64> {
- let mut result = 0;
- for worktree_index in self.worktree_indices.values() {
- if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
- result += index.read(cx).path_count()?;
- }
- }
- Ok(result)
- }
-
- pub(crate) fn worktree_index(
- &self,
- worktree_id: WorktreeId,
- cx: &App,
- ) -> Option<Entity<WorktreeIndex>> {
- for index in self.worktree_indices.values() {
- if let WorktreeIndexHandle::Loaded { index, .. } = index
- && index.read(cx).worktree().read(cx).id() == worktree_id
- {
- return Some(index.clone());
- }
- }
- None
- }
-
- pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
- let mut result = self
- .worktree_indices
- .values()
- .filter_map(|index| {
- if let WorktreeIndexHandle::Loaded { index, .. } = index {
- Some(index.clone())
- } else {
- None
- }
- })
- .collect::<Vec<_>>();
- result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
- result
- }
-
- pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
- let (summaries_tx, summaries_rx) = channel::bounded(1024);
- let mut worktree_scan_tasks = Vec::new();
- for worktree_index in self.worktree_indices.values() {
- let worktree_index = worktree_index.clone();
- let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
- worktree_scan_tasks.push(cx.spawn(async move |cx| {
- let index = match worktree_index {
- WorktreeIndexHandle::Loading { index } => {
- index.clone().await.map_err(|error| anyhow!(error))?
- }
- WorktreeIndexHandle::Loaded { index } => index.clone(),
- };
-
- index
- .read_with(cx, |index, cx| {
- let db_connection = index.db_connection().clone();
- let summary_index = index.summary_index();
- let file_digest_db = summary_index.file_digest_db();
- let summary_db = summary_index.summary_db();
-
- cx.background_spawn(async move {
- let txn = db_connection
- .read_txn()
- .context("failed to create db read transaction")?;
- let db_entries = file_digest_db
- .iter(&txn)
- .context("failed to iterate database")?;
- for db_entry in db_entries {
- let (file_path, db_file) = db_entry?;
-
- match summary_db.get(&txn, &db_file.digest) {
- Ok(opt_summary) => {
- // Currently, we only use summaries we already have. If the file hasn't been
- // summarized yet, then we skip it and don't include it in the inferred context.
- // If we want to do just-in-time summarization, this would be the place to do it!
- if let Some(summary) = opt_summary {
- summaries_tx
- .send((file_path.to_string(), summary.to_string()))
- .await?;
- } else {
- log::warn!("No summary found for {:?}", &db_file);
- }
- }
- Err(err) => {
- log::error!(
- "Error reading from summary database: {:?}",
- err
- );
- }
- }
- }
- anyhow::Ok(())
- })
- })?
- .await
- }));
- }
- drop(summaries_tx);
-
- let project = self.project.clone();
- cx.spawn(async move |cx| {
- let mut results_by_worker = Vec::new();
- for _ in 0..cx.background_executor().num_cpus() {
- results_by_worker.push(Vec::<FileSummary>::new());
- }
-
- cx.background_executor()
- .scoped(|cx| {
- for results in results_by_worker.iter_mut() {
- cx.spawn(async {
- while let Ok((filename, summary)) = summaries_rx.recv().await {
- results.push(FileSummary { filename, summary });
- }
- });
- }
- })
- .await;
-
- for scan_task in futures::future::join_all(worktree_scan_tasks).await {
- scan_task.log_err();
- }
-
- project.read_with(cx, |_project, _cx| {
- results_by_worker.into_iter().flatten().collect()
- })
- })
- }
-
- /// Empty out the backlogs of all the worktrees in the project
- pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
- let flush_start = std::time::Instant::now();
-
- futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
- let worktree_index = worktree_index.clone();
-
- cx.spawn(async move |cx| {
- let index = match worktree_index {
- WorktreeIndexHandle::Loading { index } => {
- index.clone().await.map_err(|error| anyhow!(error))?
- }
- WorktreeIndexHandle::Loaded { index } => index.clone(),
- };
- let worktree_abs_path =
- cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
-
- index
- .read_with(cx, |index, cx| {
- cx.background_spawn(
- index.summary_index().flush_backlog(worktree_abs_path, cx),
- )
- })?
- .await
- })
- }))
- .map(move |results| {
- // Log any errors, but don't block the user. These summaries are supposed to
- // improve quality by providing extra context, but they aren't hard requirements!
- for result in results {
- if let Err(err) = result {
- log::error!("Error flushing summary backlog: {:?}", err);
- }
- }
-
- log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
- })
- }
-
- pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
- self.worktree_indices(cx)
- .iter()
- .map(|index| index.read(cx).summary_index().backlog_len())
- .sum()
- }
-}
-
-impl EventEmitter<Status> for ProjectIndex {}
@@ -1,306 +0,0 @@
-use crate::ProjectIndex;
-use gpui::{
- AnyElement, App, CursorStyle, Entity, EventEmitter, FocusHandle, Focusable, IntoElement,
- ListOffset, ListState, MouseMoveEvent, Render, UniformListScrollHandle, canvas, div, list,
- uniform_list,
-};
-use project::WorktreeId;
-use settings::Settings;
-use std::{ops::Range, path::Path, sync::Arc};
-use theme::ThemeSettings;
-use ui::prelude::*;
-use workspace::item::Item;
-
-pub struct ProjectIndexDebugView {
- index: Entity<ProjectIndex>,
- rows: Vec<Row>,
- selected_path: Option<PathState>,
- hovered_row_ix: Option<usize>,
- focus_handle: FocusHandle,
- list_scroll_handle: UniformListScrollHandle,
- _subscription: gpui::Subscription,
-}
-
-struct PathState {
- path: Arc<Path>,
- chunks: Vec<SharedString>,
- list_state: ListState,
-}
-
-enum Row {
- Worktree(Arc<Path>),
- Entry(WorktreeId, Arc<Path>),
-}
-
-impl ProjectIndexDebugView {
- pub fn new(index: Entity<ProjectIndex>, window: &mut Window, cx: &mut Context<Self>) -> Self {
- let mut this = Self {
- rows: Vec::new(),
- list_scroll_handle: UniformListScrollHandle::new(),
- selected_path: None,
- hovered_row_ix: None,
- focus_handle: cx.focus_handle(),
- _subscription: cx.subscribe_in(&index, window, |this, _, _, window, cx| {
- this.update_rows(window, cx)
- }),
- index,
- };
- this.update_rows(window, cx);
- this
- }
-
- fn update_rows(&mut self, window: &mut Window, cx: &mut Context<Self>) {
- let worktree_indices = self.index.read(cx).worktree_indices(cx);
- cx.spawn_in(window, async move |this, cx| {
- let mut rows = Vec::new();
-
- for index in worktree_indices {
- let (root_path, worktree_id, worktree_paths) =
- index.read_with(cx, |index, cx| {
- let worktree = index.worktree().read(cx);
- (
- worktree.abs_path(),
- worktree.id(),
- index.embedding_index().paths(cx),
- )
- })?;
- rows.push(Row::Worktree(root_path));
- rows.extend(
- worktree_paths
- .await?
- .into_iter()
- .map(|path| Row::Entry(worktree_id, path)),
- );
- }
-
- this.update(cx, |this, cx| {
- this.rows = rows;
- cx.notify();
- })
- })
- .detach();
- }
-
- fn handle_path_click(
- &mut self,
- worktree_id: WorktreeId,
- file_path: Arc<Path>,
- window: &mut Window,
- cx: &mut Context<Self>,
- ) -> Option<()> {
- let project_index = self.index.read(cx);
- let fs = project_index.fs().clone();
- let worktree_index = project_index.worktree_index(worktree_id, cx)?.read(cx);
- let root_path = worktree_index.worktree().read(cx).abs_path();
- let chunks = worktree_index
- .embedding_index()
- .chunks_for_path(file_path.clone(), cx);
-
- cx.spawn_in(window, async move |this, cx| {
- let chunks = chunks.await?;
- let content = fs.load(&root_path.join(&file_path)).await?;
- let chunks = chunks
- .into_iter()
- .map(|chunk| {
- let mut start = chunk.chunk.range.start.min(content.len());
- let mut end = chunk.chunk.range.end.min(content.len());
- while !content.is_char_boundary(start) {
- start += 1;
- }
- while !content.is_char_boundary(end) {
- end -= 1;
- }
- content[start..end].to_string().into()
- })
- .collect::<Vec<_>>();
-
- this.update(cx, |this, cx| {
- this.selected_path = Some(PathState {
- path: file_path,
- list_state: ListState::new(chunks.len(), gpui::ListAlignment::Top, px(100.)),
- chunks,
- });
- cx.notify();
- })
- })
- .detach();
- None
- }
-
- fn render_chunk(&mut self, ix: usize, cx: &mut Context<Self>) -> AnyElement {
- let buffer_font = ThemeSettings::get_global(cx).buffer_font.clone();
- let Some(state) = &self.selected_path else {
- return div().into_any();
- };
-
- let colors = cx.theme().colors();
- let chunk = &state.chunks[ix];
-
- div()
- .text_ui(cx)
- .w_full()
- .font(buffer_font)
- .child(
- h_flex()
- .justify_between()
- .child(format!(
- "chunk {} of {}. length: {}",
- ix + 1,
- state.chunks.len(),
- chunk.len(),
- ))
- .child(
- h_flex()
- .child(
- Button::new(("prev", ix), "prev")
- .disabled(ix == 0)
- .on_click(cx.listener(move |this, _, _, _| {
- this.scroll_to_chunk(ix.saturating_sub(1))
- })),
- )
- .child(
- Button::new(("next", ix), "next")
- .disabled(ix + 1 == state.chunks.len())
- .on_click(cx.listener(move |this, _, _, _| {
- this.scroll_to_chunk(ix + 1)
- })),
- ),
- ),
- )
- .child(
- div()
- .bg(colors.editor_background)
- .text_xs()
- .child(chunk.clone()),
- )
- .into_any_element()
- }
-
- fn scroll_to_chunk(&mut self, ix: usize) {
- if let Some(state) = self.selected_path.as_mut() {
- state.list_state.scroll_to(ListOffset {
- item_ix: ix,
- offset_in_item: px(0.),
- })
- }
- }
-}
-
-impl Render for ProjectIndexDebugView {
- fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
- if let Some(selected_path) = self.selected_path.as_ref() {
- v_flex()
- .child(
- div()
- .id("selected-path-name")
- .child(
- h_flex()
- .justify_between()
- .child(selected_path.path.to_string_lossy().to_string())
- .child("x"),
- )
- .border_b_1()
- .border_color(cx.theme().colors().border)
- .cursor(CursorStyle::PointingHand)
- .on_click(cx.listener(|this, _, _, cx| {
- this.selected_path.take();
- cx.notify();
- })),
- )
- .child(
- list(
- selected_path.list_state.clone(),
- cx.processor(|this, ix, _, cx| this.render_chunk(ix, cx)),
- )
- .size_full(),
- )
- .size_full()
- .into_any_element()
- } else {
- let mut list = uniform_list(
- "ProjectIndexDebugView",
- self.rows.len(),
- cx.processor(move |this, range: Range<usize>, _, cx| {
- this.rows[range]
- .iter()
- .enumerate()
- .map(|(ix, row)| match row {
- Row::Worktree(root_path) => div()
- .id(ix)
- .child(Label::new(root_path.to_string_lossy().to_string())),
- Row::Entry(worktree_id, file_path) => div()
- .id(ix)
- .pl_8()
- .child(Label::new(file_path.to_string_lossy().to_string()))
- .on_mouse_move(cx.listener(
- move |this, _: &MouseMoveEvent, _, cx| {
- if this.hovered_row_ix != Some(ix) {
- this.hovered_row_ix = Some(ix);
- cx.notify();
- }
- },
- ))
- .cursor(CursorStyle::PointingHand)
- .on_click(cx.listener({
- let worktree_id = *worktree_id;
- let file_path = file_path.clone();
- move |this, _, window, cx| {
- this.handle_path_click(
- worktree_id,
- file_path.clone(),
- window,
- cx,
- );
- }
- })),
- })
- .collect()
- }),
- )
- .track_scroll(self.list_scroll_handle.clone())
- .size_full()
- .text_bg(cx.theme().colors().background)
- .into_any_element();
-
- canvas(
- move |bounds, window, cx| {
- list.prepaint_as_root(bounds.origin, bounds.size.into(), window, cx);
- list
- },
- |_, mut list, window, cx| {
- list.paint(window, cx);
- },
- )
- .size_full()
- .into_any_element()
- }
- }
-}
-
-impl EventEmitter<()> for ProjectIndexDebugView {}
-
-impl Item for ProjectIndexDebugView {
- type Event = ();
-
- fn tab_content_text(&self, _detail: usize, _cx: &App) -> SharedString {
- "Project Index (Debug)".into()
- }
-
- fn clone_on_split(
- &self,
- _: Option<workspace::WorkspaceId>,
- window: &mut Window,
- cx: &mut Context<Self>,
- ) -> Option<Entity<Self>>
- where
- Self: Sized,
- {
- Some(cx.new(|cx| Self::new(self.index.clone(), window, cx)))
- }
-}
-
-impl Focusable for ProjectIndexDebugView {
- fn focus_handle(&self, _: &App) -> gpui::FocusHandle {
- self.focus_handle.clone()
- }
-}
@@ -1,632 +0,0 @@
-mod chunking;
-mod embedding;
-mod embedding_index;
-mod indexing;
-mod project_index;
-mod project_index_debug_view;
-mod summary_backlog;
-mod summary_index;
-mod worktree_index;
-
-use anyhow::{Context as _, Result};
-use collections::HashMap;
-use fs::Fs;
-use gpui::{App, AppContext as _, AsyncApp, BorrowAppContext, Context, Entity, Global, WeakEntity};
-use language::LineEnding;
-use project::{Project, Worktree};
-use std::{
- cmp::Ordering,
- path::{Path, PathBuf},
- sync::Arc,
-};
-use util::ResultExt as _;
-use workspace::Workspace;
-
-pub use embedding::*;
-pub use project_index::{LoadedSearchResult, ProjectIndex, SearchResult, Status};
-pub use project_index_debug_view::ProjectIndexDebugView;
-pub use summary_index::FileSummary;
-
-pub struct SemanticDb {
- embedding_provider: Arc<dyn EmbeddingProvider>,
- db_connection: Option<heed::Env>,
- project_indices: HashMap<WeakEntity<Project>, Entity<ProjectIndex>>,
-}
-
-impl Global for SemanticDb {}
-
-impl SemanticDb {
- pub async fn new(
- db_path: PathBuf,
- embedding_provider: Arc<dyn EmbeddingProvider>,
- cx: &mut AsyncApp,
- ) -> Result<Self> {
- let db_connection = cx
- .background_spawn(async move {
- std::fs::create_dir_all(&db_path)?;
- unsafe {
- heed::EnvOpenOptions::new()
- .map_size(1024 * 1024 * 1024)
- .max_dbs(3000)
- .open(db_path)
- }
- })
- .await
- .context("opening database connection")?;
-
- cx.update(|cx| {
- cx.observe_new(
- |workspace: &mut Workspace, _window, cx: &mut Context<Workspace>| {
- let project = workspace.project().clone();
-
- if cx.has_global::<SemanticDb>() {
- cx.update_global::<SemanticDb, _>(|this, cx| {
- this.create_project_index(project, cx);
- })
- } else {
- log::info!("No SemanticDb, skipping project index")
- }
- },
- )
- .detach();
- })
- .ok();
-
- Ok(SemanticDb {
- db_connection: Some(db_connection),
- embedding_provider,
- project_indices: HashMap::default(),
- })
- }
-
- pub async fn load_results(
- mut results: Vec<SearchResult>,
- fs: &Arc<dyn Fs>,
- cx: &AsyncApp,
- ) -> Result<Vec<LoadedSearchResult>> {
- let mut max_scores_by_path = HashMap::<_, (f32, usize)>::default();
- for result in &results {
- let (score, query_index) = max_scores_by_path
- .entry((result.worktree.clone(), result.path.clone()))
- .or_default();
- if result.score > *score {
- *score = result.score;
- *query_index = result.query_index;
- }
- }
-
- results.sort_by(|a, b| {
- let max_score_a = max_scores_by_path[&(a.worktree.clone(), a.path.clone())].0;
- let max_score_b = max_scores_by_path[&(b.worktree.clone(), b.path.clone())].0;
- max_score_b
- .partial_cmp(&max_score_a)
- .unwrap_or(Ordering::Equal)
- .then_with(|| a.worktree.entity_id().cmp(&b.worktree.entity_id()))
- .then_with(|| a.path.cmp(&b.path))
- .then_with(|| a.range.start.cmp(&b.range.start))
- });
-
- let mut last_loaded_file: Option<(Entity<Worktree>, Arc<Path>, PathBuf, String)> = None;
- let mut loaded_results = Vec::<LoadedSearchResult>::new();
- for result in results {
- let full_path;
- let file_content;
- if let Some(last_loaded_file) =
- last_loaded_file
- .as_ref()
- .filter(|(last_worktree, last_path, _, _)| {
- last_worktree == &result.worktree && last_path == &result.path
- })
- {
- full_path = last_loaded_file.2.clone();
- file_content = &last_loaded_file.3;
- } else {
- let output = result.worktree.read_with(cx, |worktree, _cx| {
- let entry_abs_path = worktree.abs_path().join(&result.path);
- let mut entry_full_path = PathBuf::from(worktree.root_name());
- entry_full_path.push(&result.path);
- let file_content = async {
- let entry_abs_path = entry_abs_path;
- fs.load(&entry_abs_path).await
- };
- (entry_full_path, file_content)
- })?;
- full_path = output.0;
- let Some(content) = output.1.await.log_err() else {
- continue;
- };
- last_loaded_file = Some((
- result.worktree.clone(),
- result.path.clone(),
- full_path.clone(),
- content,
- ));
- file_content = &last_loaded_file.as_ref().unwrap().3;
- };
-
- let query_index = max_scores_by_path[&(result.worktree.clone(), result.path.clone())].1;
-
- let mut range_start = result.range.start.min(file_content.len());
- let mut range_end = result.range.end.min(file_content.len());
- while !file_content.is_char_boundary(range_start) {
- range_start += 1;
- }
- while !file_content.is_char_boundary(range_end) {
- range_end += 1;
- }
-
- let start_row = file_content[0..range_start].matches('\n').count() as u32;
- let mut end_row = file_content[0..range_end].matches('\n').count() as u32;
- let start_line_byte_offset = file_content[0..range_start]
- .rfind('\n')
- .map(|pos| pos + 1)
- .unwrap_or_default();
- let mut end_line_byte_offset = range_end;
- if file_content[..end_line_byte_offset].ends_with('\n') {
- end_row -= 1;
- } else {
- end_line_byte_offset = file_content[range_end..]
- .find('\n')
- .map(|pos| range_end + pos + 1)
- .unwrap_or_else(|| file_content.len());
- }
- let mut excerpt_content =
- file_content[start_line_byte_offset..end_line_byte_offset].to_string();
- LineEnding::normalize(&mut excerpt_content);
-
- if let Some(prev_result) = loaded_results.last_mut()
- && prev_result.full_path == full_path
- && *prev_result.row_range.end() + 1 == start_row
- {
- prev_result.row_range = *prev_result.row_range.start()..=end_row;
- prev_result.excerpt_content.push_str(&excerpt_content);
- continue;
- }
-
- loaded_results.push(LoadedSearchResult {
- path: result.path,
- full_path,
- excerpt_content,
- row_range: start_row..=end_row,
- query_index,
- });
- }
-
- for result in &mut loaded_results {
- while result.excerpt_content.ends_with("\n\n") {
- result.excerpt_content.pop();
- result.row_range =
- *result.row_range.start()..=result.row_range.end().saturating_sub(1)
- }
- }
-
- Ok(loaded_results)
- }
-
- pub fn project_index(
- &mut self,
- project: Entity<Project>,
- _cx: &mut App,
- ) -> Option<Entity<ProjectIndex>> {
- self.project_indices.get(&project.downgrade()).cloned()
- }
-
- pub fn remaining_summaries(
- &self,
- project: &WeakEntity<Project>,
- cx: &mut App,
- ) -> Option<usize> {
- self.project_indices.get(project).map(|project_index| {
- project_index.update(cx, |project_index, cx| {
- project_index.remaining_summaries(cx)
- })
- })
- }
-
- pub fn create_project_index(
- &mut self,
- project: Entity<Project>,
- cx: &mut App,
- ) -> Entity<ProjectIndex> {
- let project_index = cx.new(|cx| {
- ProjectIndex::new(
- project.clone(),
- self.db_connection.clone().unwrap(),
- self.embedding_provider.clone(),
- cx,
- )
- });
-
- let project_weak = project.downgrade();
- self.project_indices
- .insert(project_weak.clone(), project_index.clone());
-
- cx.observe_release(&project, move |_, cx| {
- if cx.has_global::<SemanticDb>() {
- cx.update_global::<SemanticDb, _>(|this, _| {
- this.project_indices.remove(&project_weak);
- })
- }
- })
- .detach();
-
- project_index
- }
-}
-
-impl Drop for SemanticDb {
- fn drop(&mut self) {
- self.db_connection.take().unwrap().prepare_for_closing();
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use chunking::Chunk;
- use embedding_index::{ChunkedFile, EmbeddingIndex};
- use feature_flags::FeatureFlagAppExt;
- use fs::FakeFs;
- use futures::{FutureExt, future::BoxFuture};
- use gpui::TestAppContext;
- use indexing::IndexingEntrySet;
- use language::language_settings::AllLanguageSettings;
- use project::{Project, ProjectEntryId};
- use serde_json::json;
- use settings::SettingsStore;
- use smol::channel;
- use std::{future, path::Path, sync::Arc};
- use util::path;
-
- fn init_test(cx: &mut TestAppContext) {
- zlog::init_test();
-
- cx.update(|cx| {
- let store = SettingsStore::test(cx);
- cx.set_global(store);
- language::init(cx);
- cx.update_flags(false, vec![]);
- Project::init_settings(cx);
- SettingsStore::update(cx, |store, cx| {
- store.update_user_settings::<AllLanguageSettings>(cx, |_| {});
- });
- });
- }
-
- pub struct TestEmbeddingProvider {
- batch_size: usize,
- compute_embedding: Box<dyn Fn(&str) -> Result<Embedding> + Send + Sync>,
- }
-
- impl TestEmbeddingProvider {
- pub fn new(
- batch_size: usize,
- compute_embedding: impl 'static + Fn(&str) -> Result<Embedding> + Send + Sync,
- ) -> Self {
- Self {
- batch_size,
- compute_embedding: Box::new(compute_embedding),
- }
- }
- }
-
- impl EmbeddingProvider for TestEmbeddingProvider {
- fn embed<'a>(
- &'a self,
- texts: &'a [TextToEmbed<'a>],
- ) -> BoxFuture<'a, Result<Vec<Embedding>>> {
- let embeddings = texts
- .iter()
- .map(|to_embed| (self.compute_embedding)(to_embed.text))
- .collect();
- future::ready(embeddings).boxed()
- }
-
- fn batch_size(&self) -> usize {
- self.batch_size
- }
- }
-
- #[gpui::test]
- async fn test_search(cx: &mut TestAppContext) {
- cx.executor().allow_parking();
-
- init_test(cx);
-
- cx.update(|cx| {
- // This functionality is staff-flagged.
- cx.update_flags(true, vec![]);
- });
-
- let temp_dir = tempfile::tempdir().unwrap();
-
- let mut semantic_index = SemanticDb::new(
- temp_dir.path().into(),
- Arc::new(TestEmbeddingProvider::new(16, |text| {
- let mut embedding = vec![0f32; 2];
- // if the text contains garbage, give it a 1 in the first dimension
- if text.contains("garbage in") {
- embedding[0] = 0.9;
- } else {
- embedding[0] = -0.9;
- }
-
- if text.contains("garbage out") {
- embedding[1] = 0.9;
- } else {
- embedding[1] = -0.9;
- }
-
- Ok(Embedding::new(embedding))
- })),
- &mut cx.to_async(),
- )
- .await
- .unwrap();
-
- let fs = FakeFs::new(cx.executor());
- let project_path = Path::new("/fake_project");
-
- fs.insert_tree(
- project_path,
- json!({
- "fixture": {
- "main.rs": include_str!("../fixture/main.rs"),
- "needle.md": include_str!("../fixture/needle.md"),
- }
- }),
- )
- .await;
-
- let project = Project::test(fs, [project_path], cx).await;
-
- let project_index = cx.update(|cx| {
- let language_registry = project.read(cx).languages().clone();
- let node_runtime = project.read(cx).node_runtime().unwrap().clone();
- languages::init(language_registry, node_runtime, cx);
- semantic_index.create_project_index(project.clone(), cx)
- });
-
- cx.run_until_parked();
- while cx
- .update(|cx| semantic_index.remaining_summaries(&project.downgrade(), cx))
- .unwrap()
- > 0
- {
- cx.run_until_parked();
- }
-
- let results = cx
- .update(|cx| {
- let project_index = project_index.read(cx);
- let query = "garbage in, garbage out";
- project_index.search(vec![query.into()], 4, cx)
- })
- .await
- .unwrap();
-
- assert!(
- results.len() > 1,
- "should have found some results, but only found {:?}",
- results
- );
-
- for result in &results {
- println!("result: {:?}", result.path);
- println!("score: {:?}", result.score);
- }
-
- // Find result that is greater than 0.5
- let search_result = results.iter().find(|result| result.score > 0.9).unwrap();
-
- assert_eq!(
- search_result.path.to_string_lossy(),
- path!("fixture/needle.md")
- );
-
- let content = cx
- .update(|cx| {
- let worktree = search_result.worktree.read(cx);
- let entry_abs_path = worktree.abs_path().join(&search_result.path);
- let fs = project.read(cx).fs().clone();
- cx.background_spawn(async move { fs.load(&entry_abs_path).await.unwrap() })
- })
- .await;
-
- let range = search_result.range.clone();
- let content = content[range].to_owned();
-
- assert!(content.contains("garbage in, garbage out"));
- }
-
- #[gpui::test]
- async fn test_embed_files(cx: &mut TestAppContext) {
- cx.executor().allow_parking();
-
- let provider = Arc::new(TestEmbeddingProvider::new(3, |text| {
- anyhow::ensure!(
- !text.contains('g'),
- "cannot embed text containing a 'g' character"
- );
- Ok(Embedding::new(
- ('a'..='z')
- .map(|char| text.chars().filter(|c| *c == char).count() as f32)
- .collect(),
- ))
- }));
-
- let (indexing_progress_tx, _) = channel::unbounded();
- let indexing_entries = Arc::new(IndexingEntrySet::new(indexing_progress_tx));
-
- let (chunked_files_tx, chunked_files_rx) = channel::unbounded::<ChunkedFile>();
- chunked_files_tx
- .send_blocking(ChunkedFile {
- path: Path::new("test1.md").into(),
- mtime: None,
- handle: indexing_entries.insert(ProjectEntryId::from_proto(0)),
- text: "abcdefghijklmnop".to_string(),
- chunks: [0..4, 4..8, 8..12, 12..16]
- .into_iter()
- .map(|range| Chunk {
- range,
- digest: Default::default(),
- })
- .collect(),
- })
- .unwrap();
- chunked_files_tx
- .send_blocking(ChunkedFile {
- path: Path::new("test2.md").into(),
- mtime: None,
- handle: indexing_entries.insert(ProjectEntryId::from_proto(1)),
- text: "qrstuvwxyz".to_string(),
- chunks: [0..4, 4..8, 8..10]
- .into_iter()
- .map(|range| Chunk {
- range,
- digest: Default::default(),
- })
- .collect(),
- })
- .unwrap();
- chunked_files_tx.close();
-
- let embed_files_task =
- cx.update(|cx| EmbeddingIndex::embed_files(provider.clone(), chunked_files_rx, cx));
- embed_files_task.task.await.unwrap();
-
- let embedded_files_rx = embed_files_task.files;
- let mut embedded_files = Vec::new();
- while let Ok((embedded_file, _)) = embedded_files_rx.recv().await {
- embedded_files.push(embedded_file);
- }
-
- assert_eq!(embedded_files.len(), 1);
- assert_eq!(embedded_files[0].path.as_ref(), Path::new("test2.md"));
- assert_eq!(
- embedded_files[0]
- .chunks
- .iter()
- .map(|embedded_chunk| { embedded_chunk.embedding.clone() })
- .collect::<Vec<Embedding>>(),
- vec![
- (provider.compute_embedding)("qrst").unwrap(),
- (provider.compute_embedding)("uvwx").unwrap(),
- (provider.compute_embedding)("yz").unwrap(),
- ],
- );
- }
-
- #[gpui::test]
- async fn test_load_search_results(cx: &mut TestAppContext) {
- init_test(cx);
-
- let fs = FakeFs::new(cx.executor());
- let project_path = Path::new("/fake_project");
-
- let file1_content = "one\ntwo\nthree\nfour\nfive\n";
- let file2_content = "aaa\nbbb\nccc\nddd\neee\n";
-
- fs.insert_tree(
- project_path,
- json!({
- "file1.txt": file1_content,
- "file2.txt": file2_content,
- }),
- )
- .await;
-
- let fs = fs as Arc<dyn Fs>;
- let project = Project::test(fs.clone(), [project_path], cx).await;
- let worktree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap());
-
- // chunk that is already newline-aligned
- let search_results = vec![SearchResult {
- worktree: worktree.clone(),
- path: Path::new("file1.txt").into(),
- range: 0..file1_content.find("four").unwrap(),
- score: 0.5,
- query_index: 0,
- }];
- assert_eq!(
- SemanticDb::load_results(search_results, &fs, &cx.to_async())
- .await
- .unwrap(),
- &[LoadedSearchResult {
- path: Path::new("file1.txt").into(),
- full_path: "fake_project/file1.txt".into(),
- excerpt_content: "one\ntwo\nthree\n".into(),
- row_range: 0..=2,
- query_index: 0,
- }]
- );
-
- // chunk that is *not* newline-aligned
- let search_results = vec![SearchResult {
- worktree: worktree.clone(),
- path: Path::new("file1.txt").into(),
- range: file1_content.find("two").unwrap() + 1..file1_content.find("four").unwrap() + 2,
- score: 0.5,
- query_index: 0,
- }];
- assert_eq!(
- SemanticDb::load_results(search_results, &fs, &cx.to_async())
- .await
- .unwrap(),
- &[LoadedSearchResult {
- path: Path::new("file1.txt").into(),
- full_path: "fake_project/file1.txt".into(),
- excerpt_content: "two\nthree\nfour\n".into(),
- row_range: 1..=3,
- query_index: 0,
- }]
- );
-
- // chunks that are adjacent
-
- let search_results = vec![
- SearchResult {
- worktree: worktree.clone(),
- path: Path::new("file1.txt").into(),
- range: file1_content.find("two").unwrap()..file1_content.len(),
- score: 0.6,
- query_index: 0,
- },
- SearchResult {
- worktree: worktree.clone(),
- path: Path::new("file1.txt").into(),
- range: 0..file1_content.find("two").unwrap(),
- score: 0.5,
- query_index: 1,
- },
- SearchResult {
- worktree: worktree.clone(),
- path: Path::new("file2.txt").into(),
- range: 0..file2_content.len(),
- score: 0.8,
- query_index: 1,
- },
- ];
- assert_eq!(
- SemanticDb::load_results(search_results, &fs, &cx.to_async())
- .await
- .unwrap(),
- &[
- LoadedSearchResult {
- path: Path::new("file2.txt").into(),
- full_path: "fake_project/file2.txt".into(),
- excerpt_content: file2_content.into(),
- row_range: 0..=4,
- query_index: 1,
- },
- LoadedSearchResult {
- path: Path::new("file1.txt").into(),
- full_path: "fake_project/file1.txt".into(),
- excerpt_content: file1_content.into(),
- row_range: 0..=4,
- query_index: 0,
- }
- ]
- );
- }
-}
@@ -1,49 +0,0 @@
-use collections::HashMap;
-use fs::MTime;
-use std::{path::Path, sync::Arc};
-
-const MAX_FILES_BEFORE_RESUMMARIZE: usize = 4;
-const MAX_BYTES_BEFORE_RESUMMARIZE: u64 = 1_000_000; // 1 MB
-
-#[derive(Default, Debug)]
-pub struct SummaryBacklog {
- /// Key: path to a file that needs summarization, but that we haven't summarized yet. Value: that file's size on disk, in bytes, and its mtime.
- files: HashMap<Arc<Path>, (u64, Option<MTime>)>,
- /// Cache of the sum of all values in `files`, so we don't have to traverse the whole map to check if we're over the byte limit.
- total_bytes: u64,
-}
-
-impl SummaryBacklog {
- /// Store the given path in the backlog, along with how many bytes are in it.
- pub fn insert(&mut self, path: Arc<Path>, bytes_on_disk: u64, mtime: Option<MTime>) {
- let (prev_bytes, _) = self
- .files
- .insert(path, (bytes_on_disk, mtime))
- .unwrap_or_default(); // Default to 0 prev_bytes
-
- // Update the cached total by subtracting out the old amount and adding the new one.
- self.total_bytes = self.total_bytes - prev_bytes + bytes_on_disk;
- }
-
- /// Returns true if the total number of bytes in the backlog exceeds a predefined threshold.
- pub fn needs_drain(&self) -> bool {
- self.files.len() > MAX_FILES_BEFORE_RESUMMARIZE ||
- // The whole purpose of the cached total_bytes is to make this comparison cheap.
- // Otherwise we'd have to traverse the entire dictionary every time we wanted this answer.
- self.total_bytes > MAX_BYTES_BEFORE_RESUMMARIZE
- }
-
- /// Remove all the entries in the backlog and return the file paths as an iterator.
- #[allow(clippy::needless_lifetimes)] // Clippy thinks this 'a can be elided, but eliding it gives a compile error
- pub fn drain<'a>(&'a mut self) -> impl Iterator<Item = (Arc<Path>, Option<MTime>)> + 'a {
- self.total_bytes = 0;
-
- self.files
- .drain()
- .map(|(path, (_size, mtime))| (path, mtime))
- }
-
- pub fn len(&self) -> usize {
- self.files.len()
- }
-}
@@ -1,696 +0,0 @@
-use anyhow::{Context as _, Result, anyhow};
-use arrayvec::ArrayString;
-use fs::{Fs, MTime};
-use futures::{TryFutureExt, stream::StreamExt};
-use futures_batch::ChunksTimeoutStreamExt;
-use gpui::{App, AppContext as _, Entity, Task};
-use heed::{
- RoTxn,
- types::{SerdeBincode, Str},
-};
-use language_model::{
- LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
- LanguageModelRequestMessage, Role,
-};
-use log;
-use parking_lot::Mutex;
-use project::{Entry, UpdatedEntriesSet, Worktree};
-use serde::{Deserialize, Serialize};
-use smol::channel;
-use std::{
- future::Future,
- path::Path,
- pin::pin,
- sync::Arc,
- time::{Duration, Instant},
-};
-use util::ResultExt;
-use worktree::Snapshot;
-
-use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct FileSummary {
- pub filename: String,
- pub summary: String,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-struct UnsummarizedFile {
- // Path to the file on disk
- path: Arc<Path>,
- // The mtime of the file on disk
- mtime: Option<MTime>,
- // BLAKE3 hash of the source file's contents
- digest: Blake3Digest,
- // The source file's contents
- contents: String,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-struct SummarizedFile {
- // Path to the file on disk
- path: String,
- // The mtime of the file on disk
- mtime: Option<MTime>,
- // BLAKE3 hash of the source file's contents
- digest: Blake3Digest,
- // The LLM's summary of the file's contents
- summary: String,
-}
-
-/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
-pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct FileDigest {
- pub mtime: Option<MTime>,
- pub digest: Blake3Digest,
-}
-
-struct NeedsSummary {
- files: channel::Receiver<UnsummarizedFile>,
- task: Task<Result<()>>,
-}
-
-struct SummarizeFiles {
- files: channel::Receiver<SummarizedFile>,
- task: Task<Result<()>>,
-}
-
-pub struct SummaryIndex {
- worktree: Entity<Worktree>,
- fs: Arc<dyn Fs>,
- db_connection: heed::Env,
- file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
- summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
- backlog: Arc<Mutex<SummaryBacklog>>,
- _entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
-}
-
-struct Backlogged {
- paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
- task: Task<Result<()>>,
-}
-
-struct MightNeedSummaryFiles {
- files: channel::Receiver<UnsummarizedFile>,
- task: Task<Result<()>>,
-}
-
-impl SummaryIndex {
- pub fn new(
- worktree: Entity<Worktree>,
- fs: Arc<dyn Fs>,
- db_connection: heed::Env,
- file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
- summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
- _entry_ids_being_indexed: Arc<IndexingEntrySet>,
- ) -> Self {
- Self {
- worktree,
- fs,
- db_connection,
- file_digest_db,
- summary_db,
- _entry_ids_being_indexed,
- backlog: Default::default(),
- }
- }
-
- pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
- self.file_digest_db
- }
-
- pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
- self.summary_db
- }
-
- pub fn index_entries_changed_on_disk(
- &self,
- is_auto_available: bool,
- cx: &App,
- ) -> impl Future<Output = Result<()>> + use<> {
- let start = Instant::now();
- let backlogged;
- let digest;
- let needs_summary;
- let summaries;
- let persist;
-
- if is_auto_available {
- let worktree = self.worktree.read(cx).snapshot();
- let worktree_abs_path = worktree.abs_path().clone();
-
- backlogged = self.scan_entries(worktree, cx);
- digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
- needs_summary = self.check_summary_cache(digest.files, cx);
- summaries = self.summarize_files(needs_summary.files, cx);
- persist = self.persist_summaries(summaries.files, cx);
- } else {
- // This feature is only staff-shipped, so make the rest of these no-ops.
- backlogged = Backlogged {
- paths_to_digest: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- digest = MightNeedSummaryFiles {
- files: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- needs_summary = NeedsSummary {
- files: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- summaries = SummarizeFiles {
- files: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- persist = Task::ready(Ok(()));
- }
-
- async move {
- futures::try_join!(
- backlogged.task,
- digest.task,
- needs_summary.task,
- summaries.task,
- persist
- )?;
-
- if is_auto_available {
- log::info!(
- "Summarizing everything that changed on disk took {:?}",
- start.elapsed()
- );
- }
-
- Ok(())
- }
- }
-
- pub fn index_updated_entries(
- &mut self,
- updated_entries: UpdatedEntriesSet,
- is_auto_available: bool,
- cx: &App,
- ) -> impl Future<Output = Result<()>> + use<> {
- let start = Instant::now();
- let backlogged;
- let digest;
- let needs_summary;
- let summaries;
- let persist;
-
- if is_auto_available {
- let worktree = self.worktree.read(cx).snapshot();
- let worktree_abs_path = worktree.abs_path().clone();
-
- backlogged = self.scan_updated_entries(worktree, updated_entries, cx);
- digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
- needs_summary = self.check_summary_cache(digest.files, cx);
- summaries = self.summarize_files(needs_summary.files, cx);
- persist = self.persist_summaries(summaries.files, cx);
- } else {
- // This feature is only staff-shipped, so make the rest of these no-ops.
- backlogged = Backlogged {
- paths_to_digest: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- digest = MightNeedSummaryFiles {
- files: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- needs_summary = NeedsSummary {
- files: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- summaries = SummarizeFiles {
- files: channel::unbounded().1,
- task: Task::ready(Ok(())),
- };
- persist = Task::ready(Ok(()));
- }
-
- async move {
- futures::try_join!(
- backlogged.task,
- digest.task,
- needs_summary.task,
- summaries.task,
- persist
- )?;
-
- log::debug!("Summarizing updated entries took {:?}", start.elapsed());
-
- Ok(())
- }
- }
-
- fn check_summary_cache(
- &self,
- might_need_summary: channel::Receiver<UnsummarizedFile>,
- cx: &App,
- ) -> NeedsSummary {
- let db_connection = self.db_connection.clone();
- let db = self.summary_db;
- let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
- let task = cx.background_spawn(async move {
- let mut might_need_summary = pin!(might_need_summary);
- while let Some(file) = might_need_summary.next().await {
- let tx = db_connection
- .read_txn()
- .context("Failed to create read transaction for checking which hashes are in summary cache")?;
-
- match db.get(&tx, &file.digest) {
- Ok(opt_answer) => {
- if opt_answer.is_none() {
- // It's not in the summary cache db, so we need to summarize it.
- log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
- needs_summary_tx.send(file).await?;
- } else {
- log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
- }
- }
- Err(err) => {
- log::error!("Reading from the summaries database failed: {:?}", err);
- }
- }
- }
-
- Ok(())
- });
-
- NeedsSummary {
- files: needs_summary_rx,
- task,
- }
- }
-
- fn scan_entries(&self, worktree: Snapshot, cx: &App) -> Backlogged {
- let (tx, rx) = channel::bounded(512);
- let db_connection = self.db_connection.clone();
- let digest_db = self.file_digest_db;
- let backlog = Arc::clone(&self.backlog);
- let task = cx.background_spawn(async move {
- let txn = db_connection
- .read_txn()
- .context("failed to create read transaction")?;
-
- for entry in worktree.files(false, 0) {
- let needs_summary =
- Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
-
- if !needs_summary.is_empty() {
- tx.send(needs_summary).await?;
- }
- }
-
- // TODO delete db entries for deleted files
-
- Ok(())
- });
-
- Backlogged {
- paths_to_digest: rx,
- task,
- }
- }
-
- fn add_to_backlog(
- backlog: Arc<Mutex<SummaryBacklog>>,
- digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
- txn: &RoTxn<'_>,
- entry: &Entry,
- ) -> Vec<(Arc<Path>, Option<MTime>)> {
- let entry_db_key = db_key_for_path(&entry.path);
-
- match digest_db.get(txn, &entry_db_key) {
- Ok(opt_saved_digest) => {
- // The file path is the same, but the mtime is different. (Or there was no mtime.)
- // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
- if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
- let mut backlog = backlog.lock();
-
- log::info!(
- "Inserting {:?} ({:?} bytes) into backlog",
- &entry.path,
- entry.size,
- );
- backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
-
- if backlog.needs_drain() {
- log::info!("Draining summary backlog...");
- return backlog.drain().collect();
- }
- }
- }
- Err(err) => {
- log::error!(
- "Error trying to get file digest db entry {:?}: {:?}",
- &entry_db_key,
- err
- );
- }
- }
-
- Vec::new()
- }
-
- fn scan_updated_entries(
- &self,
- worktree: Snapshot,
- updated_entries: UpdatedEntriesSet,
- cx: &App,
- ) -> Backlogged {
- log::info!("Scanning for updated entries that might need summarization...");
- let (tx, rx) = channel::bounded(512);
- // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
- let db_connection = self.db_connection.clone();
- let digest_db = self.file_digest_db;
- let backlog = Arc::clone(&self.backlog);
- let task = cx.background_spawn(async move {
- let txn = db_connection
- .read_txn()
- .context("failed to create read transaction")?;
-
- for (path, entry_id, status) in updated_entries.iter() {
- match status {
- project::PathChange::Loaded
- | project::PathChange::Added
- | project::PathChange::Updated
- | project::PathChange::AddedOrUpdated => {
- if let Some(entry) = worktree.entry_for_id(*entry_id)
- && entry.is_file()
- {
- let needs_summary =
- Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
-
- if !needs_summary.is_empty() {
- tx.send(needs_summary).await?;
- }
- }
- }
- project::PathChange::Removed => {
- let _db_path = db_key_for_path(path);
- // TODO delete db entries for deleted files
- // deleted_entry_ranges_tx
- // .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
- // .await?;
- }
- }
- }
-
- Ok(())
- });
-
- Backlogged {
- paths_to_digest: rx,
- // deleted_entry_ranges: deleted_entry_ranges_rx,
- task,
- }
- }
-
- fn digest_files(
- &self,
- paths: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
- worktree_abs_path: Arc<Path>,
- cx: &App,
- ) -> MightNeedSummaryFiles {
- let fs = self.fs.clone();
- let (rx, tx) = channel::bounded(2048);
- let task = cx.spawn(async move |cx| {
- cx.background_executor()
- .scoped(|cx| {
- for _ in 0..cx.num_cpus() {
- cx.spawn(async {
- while let Ok(pairs) = paths.recv().await {
- // Note: we could process all these files concurrently if desired. Might or might not speed things up.
- for (path, mtime) in pairs {
- let entry_abs_path = worktree_abs_path.join(&path);
-
- // Load the file's contents and compute its hash digest.
- let unsummarized_file = {
- let Some(contents) = fs
- .load(&entry_abs_path)
- .await
- .with_context(|| {
- format!("failed to read path {entry_abs_path:?}")
- })
- .log_err()
- else {
- continue;
- };
-
- let digest = {
- let mut hasher = blake3::Hasher::new();
- // Incorporate both the (relative) file path as well as the contents of the file into the hash.
- // This is because in some languages and frameworks, identical files can do different things
- // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
- hasher.update(path.display().to_string().as_bytes());
- hasher.update(contents.as_bytes());
- hasher.finalize().to_hex()
- };
-
- UnsummarizedFile {
- digest,
- contents,
- path,
- mtime,
- }
- };
-
- if let Err(err) = rx
- .send(unsummarized_file)
- .map_err(|error| anyhow!(error))
- .await
- {
- log::error!("Error: {:?}", err);
-
- return;
- }
- }
- }
- });
- }
- })
- .await;
- Ok(())
- });
-
- MightNeedSummaryFiles { files: tx, task }
- }
-
- fn summarize_files(
- &self,
- unsummarized_files: channel::Receiver<UnsummarizedFile>,
- cx: &App,
- ) -> SummarizeFiles {
- let (summarized_tx, summarized_rx) = channel::bounded(512);
- let task = cx.spawn(async move |cx| {
- while let Ok(file) = unsummarized_files.recv().await {
- log::debug!("Summarizing {:?}", file);
- let summary = cx
- .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
- .await
- .unwrap_or_else(|err| {
- // Log a warning because we'll continue anyway.
- // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
- // but this might give bad summaries due to cutting off source code files in the middle.
- log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
-
- String::new()
- });
-
- // Note that the summary could be empty because of an error talking to a cloud provider,
- // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
- if !summary.is_empty() {
- summarized_tx
- .send(SummarizedFile {
- path: file.path.display().to_string(),
- digest: file.digest,
- summary,
- mtime: file.mtime,
- })
- .await?
- }
- }
-
- Ok(())
- });
-
- SummarizeFiles {
- files: summarized_rx,
- task,
- }
- }
-
- fn summarize_code(
- code: &str,
- path: &Path,
- cx: &App,
- ) -> impl Future<Output = Result<String>> + use<> {
- let start = Instant::now();
- let (summary_model_id, use_cache): (LanguageModelId, bool) = (
- "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
- false, // qwen2 doesn't have a cache, but we should probably infer this from the model
- );
- let Some(model) = LanguageModelRegistry::read_global(cx)
- .available_models(cx)
- .find(|model| &model.id() == &summary_model_id)
- else {
- return cx.background_spawn(async move {
- anyhow::bail!("Couldn't find the preferred summarization model ({summary_model_id:?}) in the language registry's available models")
- });
- };
- let utf8_path = path.to_string_lossy();
- const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
- let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
-
- log::debug!(
- "Summarizing code by sending this prompt to {:?}: {:?}",
- model.name(),
- &prompt
- );
-
- let request = LanguageModelRequest {
- thread_id: None,
- prompt_id: None,
- mode: None,
- intent: None,
- messages: vec![LanguageModelRequestMessage {
- role: Role::User,
- content: vec![prompt.into()],
- cache: use_cache,
- }],
- tools: Vec::new(),
- tool_choice: None,
- stop: Vec::new(),
- temperature: None,
- thinking_allowed: true,
- };
-
- let code_len = code.len();
- cx.spawn(async move |cx| {
- let stream = model.stream_completion(request, cx);
- cx.background_spawn(async move {
- let answer: String = stream
- .await?
- .filter_map(|event| async {
- if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
- Some(text)
- } else {
- None
- }
- })
- .collect()
- .await;
-
- log::info!(
- "It took {:?} to summarize {:?} bytes of code.",
- start.elapsed(),
- code_len
- );
-
- log::debug!("Summary was: {:?}", &answer);
-
- Ok(answer)
- })
- .await
-
- // TODO if summarization failed, put it back in the backlog!
- })
- }
-
- fn persist_summaries(
- &self,
- summaries: channel::Receiver<SummarizedFile>,
- cx: &App,
- ) -> Task<Result<()>> {
- let db_connection = self.db_connection.clone();
- let digest_db = self.file_digest_db;
- let summary_db = self.summary_db;
- cx.background_spawn(async move {
- let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2)));
- while let Some(summaries) = summaries.next().await {
- let mut txn = db_connection.write_txn()?;
- for file in &summaries {
- log::debug!(
- "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
- &file.path,
- file.summary.len(),
- file.digest
- );
- digest_db.put(
- &mut txn,
- &file.path,
- &FileDigest {
- mtime: file.mtime,
- digest: file.digest,
- },
- )?;
- summary_db.put(&mut txn, &file.digest, &file.summary)?;
- }
- txn.commit()?;
-
- drop(summaries);
- log::debug!("committed summaries");
- }
-
- Ok(())
- })
- }
-
- /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
- pub(crate) fn flush_backlog(
- &self,
- worktree_abs_path: Arc<Path>,
- cx: &App,
- ) -> impl Future<Output = Result<()>> + use<> {
- let start = Instant::now();
- let backlogged = {
- let (tx, rx) = channel::bounded(512);
- let needs_summary: Vec<(Arc<Path>, Option<MTime>)> = {
- let mut backlog = self.backlog.lock();
-
- backlog.drain().collect()
- };
-
- let task = cx.background_spawn(async move {
- tx.send(needs_summary).await?;
- Ok(())
- });
-
- Backlogged {
- paths_to_digest: rx,
- task,
- }
- };
-
- let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
- let needs_summary = self.check_summary_cache(digest.files, cx);
- let summaries = self.summarize_files(needs_summary.files, cx);
- let persist = self.persist_summaries(summaries.files, cx);
-
- async move {
- futures::try_join!(
- backlogged.task,
- digest.task,
- needs_summary.task,
- summaries.task,
- persist
- )?;
-
- log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
-
- Ok(())
- }
- }
-
- pub(crate) fn backlog_len(&self) -> usize {
- self.backlog.lock().len()
- }
-}
-
-fn db_key_for_path(path: &Arc<Path>) -> String {
- path.to_string_lossy().replace('/', "\0")
-}
@@ -1,205 +0,0 @@
-use crate::embedding::EmbeddingProvider;
-use crate::embedding_index::EmbeddingIndex;
-use crate::indexing::IndexingEntrySet;
-use crate::summary_index::SummaryIndex;
-use anyhow::Result;
-use fs::Fs;
-use futures::future::Shared;
-use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Subscription, Task, WeakEntity};
-use language::LanguageRegistry;
-use log;
-use project::{UpdatedEntriesSet, Worktree};
-use smol::channel;
-use std::sync::Arc;
-use util::ResultExt;
-
-#[derive(Clone)]
-pub enum WorktreeIndexHandle {
- Loading {
- index: Shared<Task<Result<Entity<WorktreeIndex>, Arc<anyhow::Error>>>>,
- },
- Loaded {
- index: Entity<WorktreeIndex>,
- },
-}
-
-pub struct WorktreeIndex {
- worktree: Entity<Worktree>,
- db_connection: heed::Env,
- embedding_index: EmbeddingIndex,
- summary_index: SummaryIndex,
- entry_ids_being_indexed: Arc<IndexingEntrySet>,
- _index_entries: Task<Result<()>>,
- _subscription: Subscription,
-}
-
-impl WorktreeIndex {
- pub fn load(
- worktree: Entity<Worktree>,
- db_connection: heed::Env,
- language_registry: Arc<LanguageRegistry>,
- fs: Arc<dyn Fs>,
- status_tx: channel::Sender<()>,
- embedding_provider: Arc<dyn EmbeddingProvider>,
- cx: &mut App,
- ) -> Task<Result<Entity<Self>>> {
- let worktree_for_index = worktree.clone();
- let worktree_for_summary = worktree.clone();
- let worktree_abs_path = worktree.read(cx).abs_path();
- let embedding_fs = Arc::clone(&fs);
- let summary_fs = fs;
- cx.spawn(async move |cx| {
- let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
- let (embedding_index, summary_index) = cx
- .background_spawn({
- let entries_being_indexed = Arc::clone(&entries_being_indexed);
- let db_connection = db_connection.clone();
- async move {
- let mut txn = db_connection.write_txn()?;
- let embedding_index = {
- let db_name = worktree_abs_path.to_string_lossy();
- let db = db_connection.create_database(&mut txn, Some(&db_name))?;
-
- EmbeddingIndex::new(
- worktree_for_index,
- embedding_fs,
- db_connection.clone(),
- db,
- language_registry,
- embedding_provider,
- Arc::clone(&entries_being_indexed),
- )
- };
- let summary_index = {
- let file_digest_db = {
- let db_name =
- // Prepend something that wouldn't be found at the beginning of an
- // absolute path, so we don't get db key namespace conflicts with
- // embeddings, which use the abs path as a key.
- format!("digests-{}", worktree_abs_path.to_string_lossy());
- db_connection.create_database(&mut txn, Some(&db_name))?
- };
- let summary_db = {
- let db_name =
- // Prepend something that wouldn't be found at the beginning of an
- // absolute path, so we don't get db key namespace conflicts with
- // embeddings, which use the abs path as a key.
- format!("summaries-{}", worktree_abs_path.to_string_lossy());
- db_connection.create_database(&mut txn, Some(&db_name))?
- };
- SummaryIndex::new(
- worktree_for_summary,
- summary_fs,
- db_connection.clone(),
- file_digest_db,
- summary_db,
- Arc::clone(&entries_being_indexed),
- )
- };
- txn.commit()?;
- anyhow::Ok((embedding_index, summary_index))
- }
- })
- .await?;
-
- cx.new(|cx| {
- Self::new(
- worktree,
- db_connection,
- embedding_index,
- summary_index,
- entries_being_indexed,
- cx,
- )
- })
- })
- }
-
- pub fn new(
- worktree: Entity<Worktree>,
- db_connection: heed::Env,
- embedding_index: EmbeddingIndex,
- summary_index: SummaryIndex,
- entry_ids_being_indexed: Arc<IndexingEntrySet>,
- cx: &mut Context<Self>,
- ) -> Self {
- let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
- let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
- if let worktree::Event::UpdatedEntries(update) = event {
- log::debug!("Updating entries...");
- _ = updated_entries_tx.try_send(update.clone());
- }
- });
-
- Self {
- db_connection,
- embedding_index,
- summary_index,
- worktree,
- entry_ids_being_indexed,
- _index_entries: cx.spawn(async move |this, cx| {
- Self::index_entries(this, updated_entries_rx, cx).await
- }),
- _subscription,
- }
- }
-
- pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
- self.entry_ids_being_indexed.as_ref()
- }
-
- pub fn worktree(&self) -> &Entity<Worktree> {
- &self.worktree
- }
-
- pub fn db_connection(&self) -> &heed::Env {
- &self.db_connection
- }
-
- pub fn embedding_index(&self) -> &EmbeddingIndex {
- &self.embedding_index
- }
-
- pub fn summary_index(&self) -> &SummaryIndex {
- &self.summary_index
- }
-
- async fn index_entries(
- this: WeakEntity<Self>,
- updated_entries: channel::Receiver<UpdatedEntriesSet>,
- cx: &mut AsyncApp,
- ) -> Result<()> {
- let index = this.update(cx, |this, cx| {
- futures::future::try_join(
- this.embedding_index.index_entries_changed_on_disk(cx),
- this.summary_index.index_entries_changed_on_disk(false, cx),
- )
- })?;
- index.await.log_err();
-
- while let Ok(updated_entries) = updated_entries.recv().await {
- let index = this.update(cx, |this, cx| {
- futures::future::try_join(
- this.embedding_index
- .index_updated_entries(updated_entries.clone(), cx),
- this.summary_index
- .index_updated_entries(updated_entries, false, cx),
- )
- })?;
- index.await.log_err();
- }
-
- Ok(())
- }
-
- #[cfg(test)]
- pub fn path_count(&self) -> Result<u64> {
- use anyhow::Context as _;
-
- let txn = self
- .db_connection
- .read_txn()
- .context("failed to create read transaction")?;
- Ok(self.embedding_index().db().len(&txn)?)
- }
-}