diff --git a/Cargo.lock b/Cargo.lock index 5ca8375fef2c1e631f820bae88710a328bc02ebe..49dcb57ef133612ab807ab7c84a25b070c9e1241 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2351,19 +2351,6 @@ dependencies = [ "digest", ] -[[package]] -name = "blake3" -version = "1.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" -dependencies = [ - "arrayref", - "arrayvec", - "cc", - "cfg-if", - "constant_time_eq 0.3.1", -] - [[package]] name = "block" version = "0.1.6" @@ -3578,12 +3565,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "constant_time_eq" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" - [[package]] name = "context_server" version = "0.1.0" @@ -6164,17 +6145,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-batch" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f444c45a1cb86f2a7e301469fd50a82084a60dadc25d94529a8312276ecb71a" -dependencies = [ - "futures 0.3.31", - "futures-timer", - "pin-utils", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -6270,12 +6240,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.31" @@ -14688,49 +14652,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749" -[[package]] -name = "semantic_index" -version = "0.1.0" -dependencies = [ - "anyhow", - "arrayvec", - "blake3", - "client", - "clock", - "collections", - "feature_flags", - "fs", - "futures 0.3.31", - "futures-batch", - "gpui", - "heed", - "http_client", - "language", - "language_model", - "languages", - "log", - "open_ai", - "parking_lot", - "project", - "reqwest_client", - "serde", - "serde_json", - "settings", - "sha2", - "smol", - "streaming-iterator", - "tempfile", - "theme", - "tree-sitter", - "ui", - "unindent", - "util", - "workspace", - "workspace-hack", - "worktree", - "zlog", -] - [[package]] name = "semantic_version" version = "0.1.0" @@ -20915,7 +20836,7 @@ dependencies = [ "aes", "byteorder", "bzip2", - "constant_time_eq 0.1.5", + "constant_time_eq", "crc32fast", "crossbeam-utils", "flate2", diff --git a/Cargo.toml b/Cargo.toml index d8e8040cd920e1f6b5a561c80a4a205d030cbb49..c8d555a24278a77a9dbd0f649ee75d6d04efcba0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,7 +143,6 @@ members = [ "crates/rules_library", "crates/schema_generator", "crates/search", - "crates/semantic_index", "crates/semantic_version", "crates/session", "crates/settings", @@ -373,7 +372,6 @@ rope = { path = "crates/rope" } rpc = { path = "crates/rpc" } rules_library = { path = "crates/rules_library" } search = { path = "crates/search" } -semantic_index = { path = "crates/semantic_index" } semantic_version = { path = "crates/semantic_version" } session = { path = "crates/session" } settings = { path = "crates/settings" } diff --git a/crates/semantic_index/Cargo.toml b/crates/semantic_index/Cargo.toml deleted file mode 100644 index c5fe14d9cf9b79e55343b2943b49eb9d6f1a139b..0000000000000000000000000000000000000000 --- a/crates/semantic_index/Cargo.toml +++ /dev/null @@ -1,69 +0,0 @@ -[package] -name = "semantic_index" -description = "Process, chunk, and embed text as vectors for semantic search." -version = "0.1.0" -edition.workspace = true -publish.workspace = true -license = "GPL-3.0-or-later" - -[lints] -workspace = true - -[lib] -path = "src/semantic_index.rs" - -[[example]] -name = "index" -path = "examples/index.rs" -crate-type = ["bin"] - -[dependencies] -anyhow.workspace = true -arrayvec.workspace = true -blake3.workspace = true -client.workspace = true -clock.workspace = true -collections.workspace = true -feature_flags.workspace = true -fs.workspace = true -futures-batch.workspace = true -futures.workspace = true -gpui.workspace = true -heed.workspace = true -http_client.workspace = true -language.workspace = true -language_model.workspace = true -log.workspace = true -open_ai.workspace = true -parking_lot.workspace = true -project.workspace = true -serde.workspace = true -serde_json.workspace = true -settings.workspace = true -sha2.workspace = true -smol.workspace = true -streaming-iterator.workspace = true -theme.workspace = true -tree-sitter.workspace = true -ui.workspace = true -unindent.workspace = true -util.workspace = true -workspace.workspace = true -worktree.workspace = true -workspace-hack.workspace = true - -[dev-dependencies] -client = { workspace = true, features = ["test-support"] } -fs = { workspace = true, features = ["test-support"] } -futures.workspace = true -gpui = { workspace = true, features = ["test-support"] } -http_client = { workspace = true, features = ["test-support"] } -language = { workspace = true, features = ["test-support"] } -languages.workspace = true -project = { workspace = true, features = ["test-support"] } -tempfile.workspace = true -reqwest_client.workspace = true -util = { workspace = true, features = ["test-support"] } -workspace = { workspace = true, features = ["test-support"] } -worktree = { workspace = true, features = ["test-support"] } -zlog.workspace = true diff --git a/crates/semantic_index/LICENSE-GPL b/crates/semantic_index/LICENSE-GPL deleted file mode 120000 index 89e542f750cd3860a0598eff0dc34b56d7336dc4..0000000000000000000000000000000000000000 --- a/crates/semantic_index/LICENSE-GPL +++ /dev/null @@ -1 +0,0 @@ -../../LICENSE-GPL \ No newline at end of file diff --git a/crates/semantic_index/examples/index.rs b/crates/semantic_index/examples/index.rs deleted file mode 100644 index 86f1e53a606c5a38846e937347f20b6166a7b728..0000000000000000000000000000000000000000 --- a/crates/semantic_index/examples/index.rs +++ /dev/null @@ -1,140 +0,0 @@ -use client::Client; -use futures::channel::oneshot; -use gpui::Application; -use http_client::HttpClientWithUrl; -use language::language_settings::AllLanguageSettings; -use project::Project; -use semantic_index::{OpenAiEmbeddingModel, OpenAiEmbeddingProvider, SemanticDb}; -use settings::SettingsStore; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; - -fn main() { - zlog::init(); - - use clock::FakeSystemClock; - - Application::new().run(|cx| { - let store = SettingsStore::test(cx); - cx.set_global(store); - language::init(cx); - Project::init_settings(cx); - SettingsStore::update(cx, |store, cx| { - store.update_user_settings::(cx, |_| {}); - }); - - let clock = Arc::new(FakeSystemClock::new()); - - let http = Arc::new(HttpClientWithUrl::new( - Arc::new( - reqwest_client::ReqwestClient::user_agent("Zed semantic index example").unwrap(), - ), - "http://localhost:11434", - None, - )); - let client = client::Client::new(clock, http.clone(), cx); - Client::set_global(client, cx); - - let args: Vec = std::env::args().collect(); - if args.len() < 2 { - eprintln!("Usage: cargo run --example index -p semantic_index -- "); - cx.quit(); - return; - } - - // let embedding_provider = semantic_index::FakeEmbeddingProvider; - - let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY not set"); - - let embedding_provider = Arc::new(OpenAiEmbeddingProvider::new( - http, - OpenAiEmbeddingModel::TextEmbedding3Small, - open_ai::OPEN_AI_API_URL.to_string(), - api_key, - )); - - cx.spawn(async move |cx| { - let semantic_index = SemanticDb::new( - PathBuf::from("/tmp/semantic-index-db.mdb"), - embedding_provider, - cx, - ); - - let mut semantic_index = semantic_index.await.unwrap(); - - let project_path = Path::new(&args[1]); - - let project = Project::example([project_path], cx).await; - - cx.update(|cx| { - let language_registry = project.read(cx).languages().clone(); - let node_runtime = project.read(cx).node_runtime().unwrap().clone(); - languages::init(language_registry, node_runtime, cx); - }) - .unwrap(); - - let project_index = cx - .update(|cx| semantic_index.project_index(project.clone(), cx)) - .unwrap() - .unwrap(); - - let (tx, rx) = oneshot::channel(); - let mut tx = Some(tx); - let subscription = cx.update(|cx| { - cx.subscribe(&project_index, move |_, event, _| { - if let Some(tx) = tx.take() { - _ = tx.send(*event); - } - }) - }); - - let index_start = std::time::Instant::now(); - rx.await.expect("no event emitted"); - drop(subscription); - println!("Index time: {:?}", index_start.elapsed()); - - let results = cx - .update(|cx| { - let project_index = project_index.read(cx); - let query = "converting an anchor to a point"; - project_index.search(vec![query.into()], 4, cx) - }) - .unwrap() - .await - .unwrap(); - - for search_result in results { - let path = search_result.path.clone(); - - let content = cx - .update(|cx| { - let worktree = search_result.worktree.read(cx); - let entry_abs_path = worktree.abs_path().join(search_result.path.clone()); - let fs = project.read(cx).fs().clone(); - cx.spawn(async move |_| fs.load(&entry_abs_path).await.unwrap()) - }) - .unwrap() - .await; - - let range = search_result.range.clone(); - let content = content[search_result.range].to_owned(); - - println!( - "✄✄✄✄✄✄✄✄✄✄✄✄✄✄ {:?} @ {} ✄✄✄✄✄✄✄✄✄✄✄✄✄✄", - path, search_result.score - ); - println!("{:?}:{:?}:{:?}", path, range.start, range.end); - println!("{}", content); - } - - cx.background_executor() - .timer(std::time::Duration::from_secs(100000)) - .await; - - cx.update(|cx| cx.quit()).unwrap(); - }) - .detach(); - }); -} diff --git a/crates/semantic_index/fixture/main.rs b/crates/semantic_index/fixture/main.rs deleted file mode 100644 index f8796c8f4528a1113c37b72ca871bc2195f45619..0000000000000000000000000000000000000000 --- a/crates/semantic_index/fixture/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello Indexer!"); -} diff --git a/crates/semantic_index/fixture/needle.md b/crates/semantic_index/fixture/needle.md deleted file mode 100644 index 80487c9983a8f5ac151ac463126ea96aff59d6ca..0000000000000000000000000000000000000000 --- a/crates/semantic_index/fixture/needle.md +++ /dev/null @@ -1,43 +0,0 @@ -# Searching for a needle in a haystack - -When you have a large amount of text, it can be useful to search for a specific word or phrase. This is often referred to as "finding a needle in a haystack." In this markdown document, we're "hiding" a key phrase for our text search to find. Can you find it? - -## Instructions - -1. Use the search functionality in your text editor or markdown viewer to find the hidden phrase in this document. - -2. Once you've found the **phrase**, write it down and proceed to the next step. - -Honestly, I just want to fill up plenty of characters so that we chunk this markdown into several chunks. - -## Tips - -- Relax -- Take a deep breath -- Focus on the task at hand -- Don't get distracted by other text -- Use the search functionality to your advantage - -## Example code - -```python -def search_for_needle(haystack, needle): - if needle in haystack: - return True - else: - return False -``` - -```javascript -function searchForNeedle(haystack, needle) { - return haystack.includes(needle); -} -``` - -## Background - -When creating an index for a book or searching for a specific term in a large document, the ability to quickly find a specific word or phrase is essential. This is where search functionality comes in handy. However, one should _remember_ that the search is only as good as the index that was built. As they say, garbage in, garbage out! - -## Conclusion - -Searching for a needle in a haystack can be a challenging task, but with the right tools and techniques, it becomes much easier. Whether you're looking for a specific word in a document or trying to find a key piece of information in a large dataset, the ability to search efficiently is a valuable skill to have. diff --git a/crates/semantic_index/src/chunking.rs b/crates/semantic_index/src/chunking.rs deleted file mode 100644 index c1dfb6ccb5e71713f22183637d2ae14b16ac89f0..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/chunking.rs +++ /dev/null @@ -1,415 +0,0 @@ -use language::{Language, with_parser, with_query_cursor}; -use serde::{Deserialize, Serialize}; -use sha2::{Digest, Sha256}; -use std::{ - cmp::{self, Reverse}, - ops::Range, - path::Path, - sync::Arc, -}; -use streaming_iterator::StreamingIterator; -use tree_sitter::QueryCapture; -use util::ResultExt as _; - -#[derive(Copy, Clone)] -struct ChunkSizeRange { - min: usize, - max: usize, -} - -const CHUNK_SIZE_RANGE: ChunkSizeRange = ChunkSizeRange { - min: 1024, - max: 8192, -}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Chunk { - pub range: Range, - pub digest: [u8; 32], -} - -pub fn chunk_text(text: &str, language: Option<&Arc>, path: &Path) -> Vec { - chunk_text_with_size_range(text, language, path, CHUNK_SIZE_RANGE) -} - -fn chunk_text_with_size_range( - text: &str, - language: Option<&Arc>, - path: &Path, - size_config: ChunkSizeRange, -) -> Vec { - let ranges = syntactic_ranges(text, language, path).unwrap_or_default(); - chunk_text_with_syntactic_ranges(text, &ranges, size_config) -} - -fn syntactic_ranges( - text: &str, - language: Option<&Arc>, - path: &Path, -) -> Option>> { - let language = language?; - let grammar = language.grammar()?; - let outline = grammar.outline_config.as_ref()?; - let tree = with_parser(|parser| { - parser.set_language(&grammar.ts_language).log_err()?; - parser.parse(text, None) - }); - - let Some(tree) = tree else { - log::error!("failed to parse file {path:?} for chunking"); - return None; - }; - - struct RowInfo { - offset: usize, - is_comment: bool, - } - - let scope = language.default_scope(); - let line_comment_prefixes = scope.line_comment_prefixes(); - let row_infos = text - .split('\n') - .map({ - let mut offset = 0; - move |line| { - let line = line.trim_start(); - let is_comment = line_comment_prefixes - .iter() - .any(|prefix| line.starts_with(prefix.as_ref())); - let result = RowInfo { offset, is_comment }; - offset += line.len() + 1; - result - } - }) - .collect::>(); - - // Retrieve a list of ranges of outline items (types, functions, etc) in the document. - // Omit single-line outline items (e.g. struct fields, constant declarations), because - // we'll already be attempting to split on lines. - let mut ranges = with_query_cursor(|cursor| { - cursor - .matches(&outline.query, tree.root_node(), text.as_bytes()) - .filter_map_deref(|mat| { - mat.captures - .iter() - .find_map(|QueryCapture { node, index }| { - if *index == outline.item_capture_ix { - let mut start_offset = node.start_byte(); - let mut start_row = node.start_position().row; - let end_offset = node.end_byte(); - let end_row = node.end_position().row; - - // Expand the range to include any preceding comments. - while start_row > 0 && row_infos[start_row - 1].is_comment { - start_offset = row_infos[start_row - 1].offset; - start_row -= 1; - } - - if end_row > start_row { - return Some(start_offset..end_offset); - } - } - None - }) - }) - .collect::>() - }); - - ranges.sort_unstable_by_key(|range| (range.start, Reverse(range.end))); - Some(ranges) -} - -fn chunk_text_with_syntactic_ranges( - text: &str, - mut syntactic_ranges: &[Range], - size_config: ChunkSizeRange, -) -> Vec { - let mut chunks = Vec::new(); - let mut range = 0..0; - let mut range_end_nesting_depth = 0; - - // Try to split the text at line boundaries. - let mut line_ixs = text - .match_indices('\n') - .map(|(ix, _)| ix + 1) - .chain(if text.ends_with('\n') { - None - } else { - Some(text.len()) - }) - .peekable(); - - while let Some(&line_ix) = line_ixs.peek() { - // If the current position is beyond the maximum chunk size, then - // start a new chunk. - if line_ix - range.start > size_config.max { - if range.is_empty() { - range.end = cmp::min(range.start + size_config.max, line_ix); - while !text.is_char_boundary(range.end) { - range.end -= 1; - } - } - - chunks.push(Chunk { - range: range.clone(), - digest: Sha256::digest(&text[range.clone()]).into(), - }); - range_end_nesting_depth = 0; - range.start = range.end; - continue; - } - - // Discard any syntactic ranges that end before the current position. - while let Some(first_item) = syntactic_ranges.first() { - if first_item.end < line_ix { - syntactic_ranges = &syntactic_ranges[1..]; - continue; - } else { - break; - } - } - - // Count how many syntactic ranges contain the current position. - let mut nesting_depth = 0; - for range in syntactic_ranges { - if range.start > line_ix { - break; - } - if range.start < line_ix && range.end > line_ix { - nesting_depth += 1; - } - } - - // Extend the current range to this position, unless an earlier candidate - // end position was less nested syntactically. - if range.len() < size_config.min || nesting_depth <= range_end_nesting_depth { - range.end = line_ix; - range_end_nesting_depth = nesting_depth; - } - - line_ixs.next(); - } - - if !range.is_empty() { - chunks.push(Chunk { - range: range.clone(), - digest: Sha256::digest(&text[range]).into(), - }); - } - - chunks -} - -#[cfg(test)] -mod tests { - use super::*; - use language::{Language, LanguageConfig, LanguageMatcher, tree_sitter_rust}; - use unindent::Unindent as _; - - #[test] - fn test_chunk_text_with_syntax() { - let language = rust_language(); - - let text = " - struct Person { - first_name: String, - last_name: String, - age: u32, - } - - impl Person { - fn new(first_name: String, last_name: String, age: u32) -> Self { - Self { first_name, last_name, age } - } - - /// Returns the first name - /// something something something - fn first_name(&self) -> &str { - &self.first_name - } - - fn last_name(&self) -> &str { - &self.last_name - } - - fn age(&self) -> u32 { - self.age - } - } - " - .unindent(); - - let chunks = chunk_text_with_size_range( - &text, - Some(&language), - Path::new("lib.rs"), - ChunkSizeRange { - min: text.find('}').unwrap(), - max: text.find("Self {").unwrap(), - }, - ); - - // The entire impl cannot fit in a chunk, so it is split. - // Within the impl, two methods can fit in a chunk. - assert_chunks( - &text, - &chunks, - &[ - "struct Person {", // ... - "impl Person {", - " /// Returns the first name", - " fn last_name", - ], - ); - - let text = " - struct T {} - struct U {} - struct V {} - struct W { - a: T, - b: U, - } - " - .unindent(); - - let chunks = chunk_text_with_size_range( - &text, - Some(&language), - Path::new("lib.rs"), - ChunkSizeRange { - min: text.find('{').unwrap(), - max: text.find('V').unwrap(), - }, - ); - - // Two single-line structs can fit in a chunk. - // The last struct cannot fit in a chunk together - // with the previous single-line struct. - assert_chunks( - &text, - &chunks, - &[ - "struct T", // ... - "struct V", // ... - "struct W", // ... - "}", - ], - ); - } - - #[test] - fn test_chunk_with_long_lines() { - let language = rust_language(); - - let text = " - struct S { a: u32 } - struct T { a: u64 } - struct U { a: u64, b: u64, c: u64, d: u64, e: u64, f: u64, g: u64, h: u64, i: u64, j: u64 } - struct W { a: u64, b: u64, c: u64, d: u64, e: u64, f: u64, g: u64, h: u64, i: u64, j: u64 } - " - .unindent(); - - let chunks = chunk_text_with_size_range( - &text, - Some(&language), - Path::new("lib.rs"), - ChunkSizeRange { min: 32, max: 64 }, - ); - - // The line is too long to fit in one chunk - assert_chunks( - &text, - &chunks, - &[ - "struct S {", // ... - "struct U", - "4, h: u64, i: u64", // ... - "struct W", - "4, h: u64, i: u64", // ... - ], - ); - } - - #[track_caller] - fn assert_chunks(text: &str, chunks: &[Chunk], expected_chunk_text_prefixes: &[&str]) { - check_chunk_invariants(text, chunks); - - assert_eq!( - chunks.len(), - expected_chunk_text_prefixes.len(), - "unexpected number of chunks: {chunks:?}", - ); - - let mut prev_chunk_end = 0; - for (ix, chunk) in chunks.iter().enumerate() { - let expected_prefix = expected_chunk_text_prefixes[ix]; - let chunk_text = &text[chunk.range.clone()]; - if !chunk_text.starts_with(expected_prefix) { - let chunk_prefix_offset = text[prev_chunk_end..].find(expected_prefix); - if let Some(chunk_prefix_offset) = chunk_prefix_offset { - panic!( - "chunk {ix} starts at unexpected offset {}. expected {}", - chunk.range.start, - chunk_prefix_offset + prev_chunk_end - ); - } else { - panic!("invalid expected chunk prefix {ix}: {expected_prefix:?}"); - } - } - prev_chunk_end = chunk.range.end; - } - } - - #[track_caller] - fn check_chunk_invariants(text: &str, chunks: &[Chunk]) { - for (ix, chunk) in chunks.iter().enumerate() { - if ix > 0 && chunk.range.start != chunks[ix - 1].range.end { - panic!("chunk ranges are not contiguous: {:?}", chunks); - } - } - - if text.is_empty() { - assert!(chunks.is_empty()) - } else if chunks.first().unwrap().range.start != 0 - || chunks.last().unwrap().range.end != text.len() - { - panic!("chunks don't cover entire text {:?}", chunks); - } - } - - #[test] - fn test_chunk_text() { - let text = "a\n".repeat(1000); - let chunks = chunk_text(&text, None, Path::new("lib.rs")); - assert_eq!( - chunks.len(), - ((2000_f64) / (CHUNK_SIZE_RANGE.max as f64)).ceil() as usize - ); - } - - fn rust_language() -> Arc { - Arc::new( - Language::new( - LanguageConfig { - name: "Rust".into(), - matcher: LanguageMatcher { - path_suffixes: vec!["rs".to_string()], - ..Default::default() - }, - ..Default::default() - }, - Some(tree_sitter_rust::LANGUAGE.into()), - ) - .with_outline_query( - " - (function_item name: (_) @name) @item - (impl_item type: (_) @name) @item - (struct_item name: (_) @name) @item - (field_declaration name: (_) @name) @item - ", - ) - .unwrap(), - ) - } -} diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs deleted file mode 100644 index 8ca47a40230cd15cf550693a82ccffe231f5d686..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/embedding.rs +++ /dev/null @@ -1,134 +0,0 @@ -mod lmstudio; -mod ollama; -mod open_ai; - -pub use lmstudio::*; -pub use ollama::*; -pub use open_ai::*; -use sha2::{Digest, Sha256}; - -use anyhow::Result; -use futures::{FutureExt, future::BoxFuture}; -use serde::{Deserialize, Serialize}; -use std::{fmt, future}; - -/// Trait for embedding providers. Texts in, vectors out. -pub trait EmbeddingProvider: Sync + Send { - fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result>>; - fn batch_size(&self) -> usize; -} - -#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] -pub struct Embedding(Vec); - -impl Embedding { - pub fn new(mut embedding: Vec) -> Self { - let len = embedding.len(); - let mut norm = 0f32; - - for i in 0..len { - norm += embedding[i] * embedding[i]; - } - - norm = norm.sqrt(); - for dimension in &mut embedding { - *dimension /= norm; - } - - Self(embedding) - } - - fn len(&self) -> usize { - self.0.len() - } - - pub fn similarity(&self, others: &[Embedding]) -> (f32, usize) { - debug_assert!(others.iter().all(|other| self.0.len() == other.0.len())); - others - .iter() - .enumerate() - .map(|(index, other)| { - let dot_product: f32 = self - .0 - .iter() - .copied() - .zip(other.0.iter().copied()) - .map(|(a, b)| a * b) - .sum(); - (dot_product, index) - }) - .max_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)) - .unwrap_or((0.0, 0)) - } -} - -impl fmt::Display for Embedding { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let digits_to_display = 3; - - // Start the Embedding display format - write!(f, "Embedding(sized: {}; values: [", self.len())?; - - for (index, value) in self.0.iter().enumerate().take(digits_to_display) { - // Lead with comma if not the first element - if index != 0 { - write!(f, ", ")?; - } - write!(f, "{:.3}", value)?; - } - if self.len() > digits_to_display { - write!(f, "...")?; - } - write!(f, "])") - } -} - -#[derive(Debug)] -pub struct TextToEmbed<'a> { - pub text: &'a str, - pub digest: [u8; 32], -} - -impl<'a> TextToEmbed<'a> { - pub fn new(text: &'a str) -> Self { - let digest = Sha256::digest(text.as_bytes()); - Self { - text, - digest: digest.into(), - } - } -} - -pub struct FakeEmbeddingProvider; - -impl EmbeddingProvider for FakeEmbeddingProvider { - fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result>> { - let embeddings = texts - .iter() - .map(|_text| { - let mut embedding = vec![0f32; 1536]; - for i in 0..embedding.len() { - embedding[i] = i as f32; - } - Embedding::new(embedding) - }) - .collect(); - future::ready(Ok(embeddings)).boxed() - } - - fn batch_size(&self) -> usize { - 16 - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[gpui::test] - fn test_normalize_embedding() { - let normalized = Embedding::new(vec![1.0, 1.0, 1.0]); - let value: f32 = 1.0 / 3.0_f32.sqrt(); - assert_eq!(normalized, Embedding(vec![value; 3])); - } -} diff --git a/crates/semantic_index/src/embedding/lmstudio.rs b/crates/semantic_index/src/embedding/lmstudio.rs deleted file mode 100644 index 73e52aa0bfde11307a81dcbee828b86349b6efc5..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/embedding/lmstudio.rs +++ /dev/null @@ -1,70 +0,0 @@ -use anyhow::{Context as _, Result}; -use futures::{AsyncReadExt as _, FutureExt, future::BoxFuture}; -use http_client::HttpClient; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::{Embedding, EmbeddingProvider, TextToEmbed}; - -pub enum LmStudioEmbeddingModel { - NomicEmbedText, -} - -pub struct LmStudioEmbeddingProvider { - client: Arc, - model: LmStudioEmbeddingModel, -} - -#[derive(Serialize)] -struct LmStudioEmbeddingRequest { - model: String, - prompt: String, -} - -#[derive(Deserialize)] -struct LmStudioEmbeddingResponse { - embedding: Vec, -} - -impl LmStudioEmbeddingProvider { - pub fn new(client: Arc, model: LmStudioEmbeddingModel) -> Self { - Self { client, model } - } -} - -impl EmbeddingProvider for LmStudioEmbeddingProvider { - fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result>> { - let model = match self.model { - LmStudioEmbeddingModel::NomicEmbedText => "nomic-embed-text", - }; - - futures::future::try_join_all(texts.iter().map(|to_embed| { - let request = LmStudioEmbeddingRequest { - model: model.to_string(), - prompt: to_embed.text.to_string(), - }; - - let request = serde_json::to_string(&request).unwrap(); - - async { - let response = self - .client - .post_json("http://localhost:1234/api/v0/embeddings", request.into()) - .await?; - - let mut body = String::new(); - response.into_body().read_to_string(&mut body).await?; - - let response: LmStudioEmbeddingResponse = - serde_json::from_str(&body).context("Unable to parse response")?; - - Ok(Embedding::new(response.embedding)) - } - })) - .boxed() - } - - fn batch_size(&self) -> usize { - 256 - } -} diff --git a/crates/semantic_index/src/embedding/ollama.rs b/crates/semantic_index/src/embedding/ollama.rs deleted file mode 100644 index 5737609e24d0cb1adca3a155c09c6045e591c152..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/embedding/ollama.rs +++ /dev/null @@ -1,74 +0,0 @@ -use anyhow::{Context as _, Result}; -use futures::{AsyncReadExt as _, FutureExt, future::BoxFuture}; -use http_client::HttpClient; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::{Embedding, EmbeddingProvider, TextToEmbed}; - -pub enum OllamaEmbeddingModel { - NomicEmbedText, - MxbaiEmbedLarge, -} - -pub struct OllamaEmbeddingProvider { - client: Arc, - model: OllamaEmbeddingModel, -} - -#[derive(Serialize)] -struct OllamaEmbeddingRequest { - model: String, - prompt: String, -} - -#[derive(Deserialize)] -struct OllamaEmbeddingResponse { - embedding: Vec, -} - -impl OllamaEmbeddingProvider { - pub fn new(client: Arc, model: OllamaEmbeddingModel) -> Self { - Self { client, model } - } -} - -impl EmbeddingProvider for OllamaEmbeddingProvider { - fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result>> { - // - let model = match self.model { - OllamaEmbeddingModel::NomicEmbedText => "nomic-embed-text", - OllamaEmbeddingModel::MxbaiEmbedLarge => "mxbai-embed-large", - }; - - futures::future::try_join_all(texts.iter().map(|to_embed| { - let request = OllamaEmbeddingRequest { - model: model.to_string(), - prompt: to_embed.text.to_string(), - }; - - let request = serde_json::to_string(&request).unwrap(); - - async { - let response = self - .client - .post_json("http://localhost:11434/api/embeddings", request.into()) - .await?; - - let mut body = String::new(); - response.into_body().read_to_string(&mut body).await?; - - let response: OllamaEmbeddingResponse = - serde_json::from_str(&body).context("Unable to pull response")?; - - Ok(Embedding::new(response.embedding)) - } - })) - .boxed() - } - - fn batch_size(&self) -> usize { - // TODO: Figure out decent value - 10 - } -} diff --git a/crates/semantic_index/src/embedding/open_ai.rs b/crates/semantic_index/src/embedding/open_ai.rs deleted file mode 100644 index da2b3bd2e4d873a84c380006575d304d356e878a..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/embedding/open_ai.rs +++ /dev/null @@ -1,55 +0,0 @@ -use crate::{Embedding, EmbeddingProvider, TextToEmbed}; -use anyhow::Result; -use futures::{FutureExt, future::BoxFuture}; -use http_client::HttpClient; -pub use open_ai::OpenAiEmbeddingModel; -use std::sync::Arc; - -pub struct OpenAiEmbeddingProvider { - client: Arc, - model: OpenAiEmbeddingModel, - api_url: String, - api_key: String, -} - -impl OpenAiEmbeddingProvider { - pub fn new( - client: Arc, - model: OpenAiEmbeddingModel, - api_url: String, - api_key: String, - ) -> Self { - Self { - client, - model, - api_url, - api_key, - } - } -} - -impl EmbeddingProvider for OpenAiEmbeddingProvider { - fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result>> { - let embed = open_ai::embed( - self.client.as_ref(), - &self.api_url, - &self.api_key, - self.model, - texts.iter().map(|to_embed| to_embed.text), - ); - async move { - let response = embed.await?; - Ok(response - .data - .into_iter() - .map(|data| Embedding::new(data.embedding)) - .collect()) - } - .boxed() - } - - fn batch_size(&self) -> usize { - // From https://platform.openai.com/docs/api-reference/embeddings/create - 2048 - } -} diff --git a/crates/semantic_index/src/embedding_index.rs b/crates/semantic_index/src/embedding_index.rs deleted file mode 100644 index c54cd9d3c36216a00d5aca898ebe1bb0e3499f2e..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/embedding_index.rs +++ /dev/null @@ -1,470 +0,0 @@ -use crate::{ - chunking::{self, Chunk}, - embedding::{Embedding, EmbeddingProvider, TextToEmbed}, - indexing::{IndexingEntryHandle, IndexingEntrySet}, -}; -use anyhow::{Context as _, Result}; -use collections::Bound; -use feature_flags::FeatureFlagAppExt; -use fs::Fs; -use fs::MTime; -use futures::{FutureExt as _, stream::StreamExt}; -use futures_batch::ChunksTimeoutStreamExt; -use gpui::{App, AppContext as _, Entity, Task}; -use heed::types::{SerdeBincode, Str}; -use language::LanguageRegistry; -use log; -use project::{Entry, UpdatedEntriesSet, Worktree}; -use serde::{Deserialize, Serialize}; -use smol::channel; -use std::{cmp::Ordering, future::Future, iter, path::Path, pin::pin, sync::Arc, time::Duration}; -use util::ResultExt; -use worktree::Snapshot; - -pub struct EmbeddingIndex { - worktree: Entity, - db_connection: heed::Env, - db: heed::Database>, - fs: Arc, - language_registry: Arc, - embedding_provider: Arc, - entry_ids_being_indexed: Arc, -} - -impl EmbeddingIndex { - pub fn new( - worktree: Entity, - fs: Arc, - db_connection: heed::Env, - embedding_db: heed::Database>, - language_registry: Arc, - embedding_provider: Arc, - entry_ids_being_indexed: Arc, - ) -> Self { - Self { - worktree, - fs, - db_connection, - db: embedding_db, - language_registry, - embedding_provider, - entry_ids_being_indexed, - } - } - - pub fn db(&self) -> &heed::Database> { - &self.db - } - - pub fn index_entries_changed_on_disk( - &self, - cx: &App, - ) -> impl Future> + use<> { - if !cx.is_staff() { - return async move { Ok(()) }.boxed(); - } - - let worktree = self.worktree.read(cx).snapshot(); - let worktree_abs_path = worktree.abs_path().clone(); - let scan = self.scan_entries(worktree, cx); - let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx); - let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx); - let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); - async move { - futures::try_join!(scan.task, chunk.task, embed.task, persist)?; - Ok(()) - } - .boxed() - } - - pub fn index_updated_entries( - &self, - updated_entries: UpdatedEntriesSet, - cx: &App, - ) -> impl Future> + use<> { - if !cx.is_staff() { - return async move { Ok(()) }.boxed(); - } - - let worktree = self.worktree.read(cx).snapshot(); - let worktree_abs_path = worktree.abs_path().clone(); - let scan = self.scan_updated_entries(worktree, updated_entries, cx); - let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx); - let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx); - let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); - async move { - futures::try_join!(scan.task, chunk.task, embed.task, persist)?; - Ok(()) - } - .boxed() - } - - fn scan_entries(&self, worktree: Snapshot, cx: &App) -> ScanEntries { - let (updated_entries_tx, updated_entries_rx) = channel::bounded(512); - let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); - let db_connection = self.db_connection.clone(); - let db = self.db; - let entries_being_indexed = self.entry_ids_being_indexed.clone(); - let task = cx.background_spawn(async move { - let txn = db_connection - .read_txn() - .context("failed to create read transaction")?; - let mut db_entries = db - .iter(&txn) - .context("failed to create iterator")? - .move_between_keys() - .peekable(); - - let mut deletion_range: Option<(Bound<&str>, Bound<&str>)> = None; - for entry in worktree.files(false, 0) { - log::trace!("scanning for embedding index: {:?}", &entry.path); - - let entry_db_key = db_key_for_path(&entry.path); - - let mut saved_mtime = None; - while let Some(db_entry) = db_entries.peek() { - match db_entry { - Ok((db_path, db_embedded_file)) => match (*db_path).cmp(&entry_db_key) { - Ordering::Less => { - if let Some(deletion_range) = deletion_range.as_mut() { - deletion_range.1 = Bound::Included(db_path); - } else { - deletion_range = - Some((Bound::Included(db_path), Bound::Included(db_path))); - } - - db_entries.next(); - } - Ordering::Equal => { - if let Some(deletion_range) = deletion_range.take() { - deleted_entry_ranges_tx - .send(( - deletion_range.0.map(ToString::to_string), - deletion_range.1.map(ToString::to_string), - )) - .await?; - } - saved_mtime = db_embedded_file.mtime; - db_entries.next(); - break; - } - Ordering::Greater => { - break; - } - }, - Err(_) => return Err(db_entries.next().unwrap().unwrap_err())?, - } - } - - if entry.mtime != saved_mtime { - let handle = entries_being_indexed.insert(entry.id); - updated_entries_tx.send((entry.clone(), handle)).await?; - } - } - - if let Some(db_entry) = db_entries.next() { - let (db_path, _) = db_entry?; - deleted_entry_ranges_tx - .send((Bound::Included(db_path.to_string()), Bound::Unbounded)) - .await?; - } - - Ok(()) - }); - - ScanEntries { - updated_entries: updated_entries_rx, - deleted_entry_ranges: deleted_entry_ranges_rx, - task, - } - } - - fn scan_updated_entries( - &self, - worktree: Snapshot, - updated_entries: UpdatedEntriesSet, - cx: &App, - ) -> ScanEntries { - let (updated_entries_tx, updated_entries_rx) = channel::bounded(512); - let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); - let entries_being_indexed = self.entry_ids_being_indexed.clone(); - let task = cx.background_spawn(async move { - for (path, entry_id, status) in updated_entries.iter() { - match status { - project::PathChange::Added - | project::PathChange::Updated - | project::PathChange::AddedOrUpdated => { - if let Some(entry) = worktree.entry_for_id(*entry_id) - && entry.is_file() - { - let handle = entries_being_indexed.insert(entry.id); - updated_entries_tx.send((entry.clone(), handle)).await?; - } - } - project::PathChange::Removed => { - let db_path = db_key_for_path(path); - deleted_entry_ranges_tx - .send((Bound::Included(db_path.clone()), Bound::Included(db_path))) - .await?; - } - project::PathChange::Loaded => { - // Do nothing. - } - } - } - - Ok(()) - }); - - ScanEntries { - updated_entries: updated_entries_rx, - deleted_entry_ranges: deleted_entry_ranges_rx, - task, - } - } - - fn chunk_files( - &self, - worktree_abs_path: Arc, - entries: channel::Receiver<(Entry, IndexingEntryHandle)>, - cx: &App, - ) -> ChunkFiles { - let language_registry = self.language_registry.clone(); - let fs = self.fs.clone(); - let (chunked_files_tx, chunked_files_rx) = channel::bounded(2048); - let task = cx.spawn(async move |cx| { - cx.background_executor() - .scoped(|cx| { - for _ in 0..cx.num_cpus() { - cx.spawn(async { - while let Ok((entry, handle)) = entries.recv().await { - let entry_abs_path = worktree_abs_path.join(&entry.path); - if let Some(text) = fs.load(&entry_abs_path).await.ok() { - let language = language_registry - .language_for_file_path(&entry.path) - .await - .ok(); - let chunked_file = ChunkedFile { - chunks: chunking::chunk_text( - &text, - language.as_ref(), - &entry.path, - ), - handle, - path: entry.path, - mtime: entry.mtime, - text, - }; - - if chunked_files_tx.send(chunked_file).await.is_err() { - return; - } - } - } - }); - } - }) - .await; - Ok(()) - }); - - ChunkFiles { - files: chunked_files_rx, - task, - } - } - - pub fn embed_files( - embedding_provider: Arc, - chunked_files: channel::Receiver, - cx: &App, - ) -> EmbedFiles { - let embedding_provider = embedding_provider.clone(); - let (embedded_files_tx, embedded_files_rx) = channel::bounded(512); - let task = cx.background_spawn(async move { - let mut chunked_file_batches = - pin!(chunked_files.chunks_timeout(512, Duration::from_secs(2))); - while let Some(chunked_files) = chunked_file_batches.next().await { - // View the batch of files as a vec of chunks - // Flatten out to a vec of chunks that we can subdivide into batch sized pieces - // Once those are done, reassemble them back into the files in which they belong - // If any embeddings fail for a file, the entire file is discarded - - let chunks: Vec = chunked_files - .iter() - .flat_map(|file| { - file.chunks.iter().map(|chunk| TextToEmbed { - text: &file.text[chunk.range.clone()], - digest: chunk.digest, - }) - }) - .collect::>(); - - let mut embeddings: Vec> = Vec::new(); - for embedding_batch in chunks.chunks(embedding_provider.batch_size()) { - if let Some(batch_embeddings) = - embedding_provider.embed(embedding_batch).await.log_err() - { - if batch_embeddings.len() == embedding_batch.len() { - embeddings.extend(batch_embeddings.into_iter().map(Some)); - continue; - } - log::error!( - "embedding provider returned unexpected embedding count {}, expected {}", - batch_embeddings.len(), embedding_batch.len() - ); - } - - embeddings.extend(iter::repeat(None).take(embedding_batch.len())); - } - - let mut embeddings = embeddings.into_iter(); - for chunked_file in chunked_files { - let mut embedded_file = EmbeddedFile { - path: chunked_file.path, - mtime: chunked_file.mtime, - chunks: Vec::new(), - }; - - let mut embedded_all_chunks = true; - for (chunk, embedding) in - chunked_file.chunks.into_iter().zip(embeddings.by_ref()) - { - if let Some(embedding) = embedding { - embedded_file - .chunks - .push(EmbeddedChunk { chunk, embedding }); - } else { - embedded_all_chunks = false; - } - } - - if embedded_all_chunks { - embedded_files_tx - .send((embedded_file, chunked_file.handle)) - .await?; - } - } - } - Ok(()) - }); - - EmbedFiles { - files: embedded_files_rx, - task, - } - } - - fn persist_embeddings( - &self, - deleted_entry_ranges: channel::Receiver<(Bound, Bound)>, - embedded_files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>, - cx: &App, - ) -> Task> { - let db_connection = self.db_connection.clone(); - let db = self.db; - - cx.background_spawn(async move { - let mut deleted_entry_ranges = pin!(deleted_entry_ranges); - let mut embedded_files = pin!(embedded_files); - loop { - // Interleave deletions and persists of embedded files - futures::select_biased! { - deletion_range = deleted_entry_ranges.next() => { - if let Some(deletion_range) = deletion_range { - let mut txn = db_connection.write_txn()?; - let start = deletion_range.0.as_ref().map(|start| start.as_str()); - let end = deletion_range.1.as_ref().map(|end| end.as_str()); - log::debug!("deleting embeddings in range {:?}", &(start, end)); - db.delete_range(&mut txn, &(start, end))?; - txn.commit()?; - } - }, - file = embedded_files.next() => { - if let Some((file, _)) = file { - let mut txn = db_connection.write_txn()?; - log::debug!("saving embedding for file {:?}", file.path); - let key = db_key_for_path(&file.path); - db.put(&mut txn, &key, &file)?; - txn.commit()?; - } - }, - complete => break, - } - } - - Ok(()) - }) - } - - pub fn paths(&self, cx: &App) -> Task>>> { - let connection = self.db_connection.clone(); - let db = self.db; - cx.background_spawn(async move { - let tx = connection - .read_txn() - .context("failed to create read transaction")?; - let result = db - .iter(&tx)? - .map(|entry| Ok(entry?.1.path)) - .collect::>>>(); - drop(tx); - result - }) - } - - pub fn chunks_for_path(&self, path: Arc, cx: &App) -> Task>> { - let connection = self.db_connection.clone(); - let db = self.db; - cx.background_spawn(async move { - let tx = connection - .read_txn() - .context("failed to create read transaction")?; - Ok(db - .get(&tx, &db_key_for_path(&path))? - .context("no such path")? - .chunks) - }) - } -} - -struct ScanEntries { - updated_entries: channel::Receiver<(Entry, IndexingEntryHandle)>, - deleted_entry_ranges: channel::Receiver<(Bound, Bound)>, - task: Task>, -} - -struct ChunkFiles { - files: channel::Receiver, - task: Task>, -} - -pub struct ChunkedFile { - pub path: Arc, - pub mtime: Option, - pub handle: IndexingEntryHandle, - pub text: String, - pub chunks: Vec, -} - -pub struct EmbedFiles { - pub files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>, - pub task: Task>, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EmbeddedFile { - pub path: Arc, - pub mtime: Option, - pub chunks: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct EmbeddedChunk { - pub chunk: Chunk, - pub embedding: Embedding, -} - -fn db_key_for_path(path: &Arc) -> String { - path.to_string_lossy().replace('/', "\0") -} diff --git a/crates/semantic_index/src/indexing.rs b/crates/semantic_index/src/indexing.rs deleted file mode 100644 index aca9504891d0f7baa0798389f92176a3c853bc6f..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/indexing.rs +++ /dev/null @@ -1,49 +0,0 @@ -use collections::HashSet; -use parking_lot::Mutex; -use project::ProjectEntryId; -use smol::channel; -use std::sync::{Arc, Weak}; - -/// The set of entries that are currently being indexed. -pub struct IndexingEntrySet { - entry_ids: Mutex>, - tx: channel::Sender<()>, -} - -/// When dropped, removes the entry from the set of entries that are being indexed. -#[derive(Clone)] -pub(crate) struct IndexingEntryHandle { - entry_id: ProjectEntryId, - set: Weak, -} - -impl IndexingEntrySet { - pub fn new(tx: channel::Sender<()>) -> Self { - Self { - entry_ids: Default::default(), - tx, - } - } - - pub fn insert(self: &Arc, entry_id: ProjectEntryId) -> IndexingEntryHandle { - self.entry_ids.lock().insert(entry_id); - self.tx.send_blocking(()).ok(); - IndexingEntryHandle { - entry_id, - set: Arc::downgrade(self), - } - } - - pub fn len(&self) -> usize { - self.entry_ids.lock().len() - } -} - -impl Drop for IndexingEntryHandle { - fn drop(&mut self) { - if let Some(set) = self.set.upgrade() { - set.tx.send_blocking(()).ok(); - set.entry_ids.lock().remove(&self.entry_id); - } - } -} diff --git a/crates/semantic_index/src/project_index.rs b/crates/semantic_index/src/project_index.rs deleted file mode 100644 index 60b2770dd39b91b606b9c982c894bfc94952a179..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/project_index.rs +++ /dev/null @@ -1,548 +0,0 @@ -use crate::{ - embedding::{EmbeddingProvider, TextToEmbed}, - summary_index::FileSummary, - worktree_index::{WorktreeIndex, WorktreeIndexHandle}, -}; -use anyhow::{Context as _, Result, anyhow}; -use collections::HashMap; -use fs::Fs; -use futures::FutureExt; -use gpui::{ - App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity, -}; -use language::LanguageRegistry; -use log; -use project::{Project, Worktree, WorktreeId}; -use serde::{Deserialize, Serialize}; -use smol::channel; -use std::{ - cmp::Ordering, - future::Future, - num::NonZeroUsize, - ops::{Range, RangeInclusive}, - path::{Path, PathBuf}, - sync::Arc, -}; -use util::ResultExt; - -#[derive(Debug)] -pub struct SearchResult { - pub worktree: Entity, - pub path: Arc, - pub range: Range, - pub score: f32, - pub query_index: usize, -} - -#[derive(Debug, PartialEq, Eq)] -pub struct LoadedSearchResult { - pub path: Arc, - pub full_path: PathBuf, - pub excerpt_content: String, - pub row_range: RangeInclusive, - pub query_index: usize, -} - -pub struct WorktreeSearchResult { - pub worktree_id: WorktreeId, - pub path: Arc, - pub range: Range, - pub query_index: usize, - pub score: f32, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub enum Status { - Idle, - Loading, - Scanning { remaining_count: NonZeroUsize }, -} - -pub struct ProjectIndex { - db_connection: heed::Env, - project: WeakEntity, - worktree_indices: HashMap, - language_registry: Arc, - fs: Arc, - last_status: Status, - status_tx: channel::Sender<()>, - embedding_provider: Arc, - _maintain_status: Task<()>, - _subscription: Subscription, -} - -impl ProjectIndex { - pub fn new( - project: Entity, - db_connection: heed::Env, - embedding_provider: Arc, - cx: &mut Context, - ) -> Self { - let language_registry = project.read(cx).languages().clone(); - let fs = project.read(cx).fs().clone(); - let (status_tx, status_rx) = channel::unbounded(); - let mut this = ProjectIndex { - db_connection, - project: project.downgrade(), - worktree_indices: HashMap::default(), - language_registry, - fs, - status_tx, - last_status: Status::Idle, - embedding_provider, - _subscription: cx.subscribe(&project, Self::handle_project_event), - _maintain_status: cx.spawn(async move |this, cx| { - while status_rx.recv().await.is_ok() { - if this.update(cx, |this, cx| this.update_status(cx)).is_err() { - break; - } - } - }), - }; - this.update_worktree_indices(cx); - this - } - - pub fn status(&self) -> Status { - self.last_status - } - - pub fn project(&self) -> WeakEntity { - self.project.clone() - } - - pub fn fs(&self) -> Arc { - self.fs.clone() - } - - fn handle_project_event( - &mut self, - _: Entity, - event: &project::Event, - cx: &mut Context, - ) { - match event { - project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => { - self.update_worktree_indices(cx); - } - _ => {} - } - } - - fn update_worktree_indices(&mut self, cx: &mut Context) { - let Some(project) = self.project.upgrade() else { - return; - }; - - let worktrees = project - .read(cx) - .visible_worktrees(cx) - .filter_map(|worktree| { - if worktree.read(cx).is_local() { - Some((worktree.entity_id(), worktree)) - } else { - None - } - }) - .collect::>(); - - self.worktree_indices - .retain(|worktree_id, _| worktrees.contains_key(worktree_id)); - for (worktree_id, worktree) in worktrees { - self.worktree_indices.entry(worktree_id).or_insert_with(|| { - let worktree_index = WorktreeIndex::load( - worktree.clone(), - self.db_connection.clone(), - self.language_registry.clone(), - self.fs.clone(), - self.status_tx.clone(), - self.embedding_provider.clone(), - cx, - ); - - let load_worktree = cx.spawn(async move |this, cx| { - let result = match worktree_index.await { - Ok(worktree_index) => { - this.update(cx, |this, _| { - this.worktree_indices.insert( - worktree_id, - WorktreeIndexHandle::Loaded { - index: worktree_index.clone(), - }, - ); - })?; - Ok(worktree_index) - } - Err(error) => { - this.update(cx, |this, _cx| { - this.worktree_indices.remove(&worktree_id) - })?; - Err(Arc::new(error)) - } - }; - - this.update(cx, |this, cx| this.update_status(cx))?; - - result - }); - - WorktreeIndexHandle::Loading { - index: load_worktree.shared(), - } - }); - } - - self.update_status(cx); - } - - fn update_status(&mut self, cx: &mut Context) { - let mut indexing_count = 0; - let mut any_loading = false; - - for index in self.worktree_indices.values_mut() { - match index { - WorktreeIndexHandle::Loading { .. } => { - any_loading = true; - break; - } - WorktreeIndexHandle::Loaded { index, .. } => { - indexing_count += index.read(cx).entry_ids_being_indexed().len(); - } - } - } - - let status = if any_loading { - Status::Loading - } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) { - Status::Scanning { remaining_count } - } else { - Status::Idle - }; - - if status != self.last_status { - self.last_status = status; - cx.emit(status); - } - } - - pub fn search( - &self, - queries: Vec, - limit: usize, - cx: &App, - ) -> Task>> { - let (chunks_tx, chunks_rx) = channel::bounded(1024); - let mut worktree_scan_tasks = Vec::new(); - for worktree_index in self.worktree_indices.values() { - let worktree_index = worktree_index.clone(); - let chunks_tx = chunks_tx.clone(); - worktree_scan_tasks.push(cx.spawn(async move |cx| { - let index = match worktree_index { - WorktreeIndexHandle::Loading { index } => { - index.clone().await.map_err(|error| anyhow!(error))? - } - WorktreeIndexHandle::Loaded { index } => index.clone(), - }; - - index - .read_with(cx, |index, cx| { - let worktree_id = index.worktree().read(cx).id(); - let db_connection = index.db_connection().clone(); - let db = *index.embedding_index().db(); - cx.background_spawn(async move { - let txn = db_connection - .read_txn() - .context("failed to create read transaction")?; - let db_entries = db.iter(&txn).context("failed to iterate database")?; - for db_entry in db_entries { - let (_key, db_embedded_file) = db_entry?; - for chunk in db_embedded_file.chunks { - chunks_tx - .send((worktree_id, db_embedded_file.path.clone(), chunk)) - .await?; - } - } - anyhow::Ok(()) - }) - })? - .await - })); - } - drop(chunks_tx); - - let project = self.project.clone(); - let embedding_provider = self.embedding_provider.clone(); - cx.spawn(async move |cx| { - #[cfg(debug_assertions)] - let embedding_query_start = std::time::Instant::now(); - log::info!("Searching for {queries:?}"); - let queries: Vec = queries - .iter() - .map(|s| TextToEmbed::new(s.as_str())) - .collect(); - - let query_embeddings = embedding_provider.embed(&queries[..]).await?; - anyhow::ensure!( - query_embeddings.len() == queries.len(), - "The number of query embeddings does not match the number of queries" - ); - - let mut results_by_worker = Vec::new(); - for _ in 0..cx.background_executor().num_cpus() { - results_by_worker.push(Vec::::new()); - } - - #[cfg(debug_assertions)] - let search_start = std::time::Instant::now(); - cx.background_executor() - .scoped(|cx| { - for results in results_by_worker.iter_mut() { - cx.spawn(async { - while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await { - let (score, query_index) = - chunk.embedding.similarity(&query_embeddings); - - let ix = match results.binary_search_by(|probe| { - score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal) - }) { - Ok(ix) | Err(ix) => ix, - }; - if ix < limit { - results.insert( - ix, - WorktreeSearchResult { - worktree_id, - path: path.clone(), - range: chunk.chunk.range.clone(), - query_index, - score, - }, - ); - if results.len() > limit { - results.pop(); - } - } - } - }); - } - }) - .await; - - for scan_task in futures::future::join_all(worktree_scan_tasks).await { - scan_task.log_err(); - } - - project.read_with(cx, |project, cx| { - let mut search_results = Vec::with_capacity(results_by_worker.len() * limit); - for worker_results in results_by_worker { - search_results.extend(worker_results.into_iter().filter_map(|result| { - Some(SearchResult { - worktree: project.worktree_for_id(result.worktree_id, cx)?, - path: result.path, - range: result.range, - score: result.score, - query_index: result.query_index, - }) - })); - } - search_results.sort_unstable_by(|a, b| { - b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal) - }); - search_results.truncate(limit); - - #[cfg(debug_assertions)] - { - let search_elapsed = search_start.elapsed(); - log::debug!( - "searched {} entries in {:?}", - search_results.len(), - search_elapsed - ); - let embedding_query_elapsed = embedding_query_start.elapsed(); - log::debug!("embedding query took {:?}", embedding_query_elapsed); - } - - search_results - }) - }) - } - - #[cfg(test)] - pub fn path_count(&self, cx: &App) -> Result { - let mut result = 0; - for worktree_index in self.worktree_indices.values() { - if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index { - result += index.read(cx).path_count()?; - } - } - Ok(result) - } - - pub(crate) fn worktree_index( - &self, - worktree_id: WorktreeId, - cx: &App, - ) -> Option> { - for index in self.worktree_indices.values() { - if let WorktreeIndexHandle::Loaded { index, .. } = index - && index.read(cx).worktree().read(cx).id() == worktree_id - { - return Some(index.clone()); - } - } - None - } - - pub(crate) fn worktree_indices(&self, cx: &App) -> Vec> { - let mut result = self - .worktree_indices - .values() - .filter_map(|index| { - if let WorktreeIndexHandle::Loaded { index, .. } = index { - Some(index.clone()) - } else { - None - } - }) - .collect::>(); - result.sort_by_key(|index| index.read(cx).worktree().read(cx).id()); - result - } - - pub fn all_summaries(&self, cx: &App) -> Task>> { - let (summaries_tx, summaries_rx) = channel::bounded(1024); - let mut worktree_scan_tasks = Vec::new(); - for worktree_index in self.worktree_indices.values() { - let worktree_index = worktree_index.clone(); - let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone(); - worktree_scan_tasks.push(cx.spawn(async move |cx| { - let index = match worktree_index { - WorktreeIndexHandle::Loading { index } => { - index.clone().await.map_err(|error| anyhow!(error))? - } - WorktreeIndexHandle::Loaded { index } => index.clone(), - }; - - index - .read_with(cx, |index, cx| { - let db_connection = index.db_connection().clone(); - let summary_index = index.summary_index(); - let file_digest_db = summary_index.file_digest_db(); - let summary_db = summary_index.summary_db(); - - cx.background_spawn(async move { - let txn = db_connection - .read_txn() - .context("failed to create db read transaction")?; - let db_entries = file_digest_db - .iter(&txn) - .context("failed to iterate database")?; - for db_entry in db_entries { - let (file_path, db_file) = db_entry?; - - match summary_db.get(&txn, &db_file.digest) { - Ok(opt_summary) => { - // Currently, we only use summaries we already have. If the file hasn't been - // summarized yet, then we skip it and don't include it in the inferred context. - // If we want to do just-in-time summarization, this would be the place to do it! - if let Some(summary) = opt_summary { - summaries_tx - .send((file_path.to_string(), summary.to_string())) - .await?; - } else { - log::warn!("No summary found for {:?}", &db_file); - } - } - Err(err) => { - log::error!( - "Error reading from summary database: {:?}", - err - ); - } - } - } - anyhow::Ok(()) - }) - })? - .await - })); - } - drop(summaries_tx); - - let project = self.project.clone(); - cx.spawn(async move |cx| { - let mut results_by_worker = Vec::new(); - for _ in 0..cx.background_executor().num_cpus() { - results_by_worker.push(Vec::::new()); - } - - cx.background_executor() - .scoped(|cx| { - for results in results_by_worker.iter_mut() { - cx.spawn(async { - while let Ok((filename, summary)) = summaries_rx.recv().await { - results.push(FileSummary { filename, summary }); - } - }); - } - }) - .await; - - for scan_task in futures::future::join_all(worktree_scan_tasks).await { - scan_task.log_err(); - } - - project.read_with(cx, |_project, _cx| { - results_by_worker.into_iter().flatten().collect() - }) - }) - } - - /// Empty out the backlogs of all the worktrees in the project - pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future { - let flush_start = std::time::Instant::now(); - - futures::future::join_all(self.worktree_indices.values().map(|worktree_index| { - let worktree_index = worktree_index.clone(); - - cx.spawn(async move |cx| { - let index = match worktree_index { - WorktreeIndexHandle::Loading { index } => { - index.clone().await.map_err(|error| anyhow!(error))? - } - WorktreeIndexHandle::Loaded { index } => index.clone(), - }; - let worktree_abs_path = - cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?; - - index - .read_with(cx, |index, cx| { - cx.background_spawn( - index.summary_index().flush_backlog(worktree_abs_path, cx), - ) - })? - .await - }) - })) - .map(move |results| { - // Log any errors, but don't block the user. These summaries are supposed to - // improve quality by providing extra context, but they aren't hard requirements! - for result in results { - if let Err(err) = result { - log::error!("Error flushing summary backlog: {:?}", err); - } - } - - log::info!("Summary backlog flushed in {:?}", flush_start.elapsed()); - }) - } - - pub fn remaining_summaries(&self, cx: &mut Context) -> usize { - self.worktree_indices(cx) - .iter() - .map(|index| index.read(cx).summary_index().backlog_len()) - .sum() - } -} - -impl EventEmitter for ProjectIndex {} diff --git a/crates/semantic_index/src/project_index_debug_view.rs b/crates/semantic_index/src/project_index_debug_view.rs deleted file mode 100644 index 8d6a49c45caf336c03fe0a2b62ecbca9e079fc65..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/project_index_debug_view.rs +++ /dev/null @@ -1,306 +0,0 @@ -use crate::ProjectIndex; -use gpui::{ - AnyElement, App, CursorStyle, Entity, EventEmitter, FocusHandle, Focusable, IntoElement, - ListOffset, ListState, MouseMoveEvent, Render, UniformListScrollHandle, canvas, div, list, - uniform_list, -}; -use project::WorktreeId; -use settings::Settings; -use std::{ops::Range, path::Path, sync::Arc}; -use theme::ThemeSettings; -use ui::prelude::*; -use workspace::item::Item; - -pub struct ProjectIndexDebugView { - index: Entity, - rows: Vec, - selected_path: Option, - hovered_row_ix: Option, - focus_handle: FocusHandle, - list_scroll_handle: UniformListScrollHandle, - _subscription: gpui::Subscription, -} - -struct PathState { - path: Arc, - chunks: Vec, - list_state: ListState, -} - -enum Row { - Worktree(Arc), - Entry(WorktreeId, Arc), -} - -impl ProjectIndexDebugView { - pub fn new(index: Entity, window: &mut Window, cx: &mut Context) -> Self { - let mut this = Self { - rows: Vec::new(), - list_scroll_handle: UniformListScrollHandle::new(), - selected_path: None, - hovered_row_ix: None, - focus_handle: cx.focus_handle(), - _subscription: cx.subscribe_in(&index, window, |this, _, _, window, cx| { - this.update_rows(window, cx) - }), - index, - }; - this.update_rows(window, cx); - this - } - - fn update_rows(&mut self, window: &mut Window, cx: &mut Context) { - let worktree_indices = self.index.read(cx).worktree_indices(cx); - cx.spawn_in(window, async move |this, cx| { - let mut rows = Vec::new(); - - for index in worktree_indices { - let (root_path, worktree_id, worktree_paths) = - index.read_with(cx, |index, cx| { - let worktree = index.worktree().read(cx); - ( - worktree.abs_path(), - worktree.id(), - index.embedding_index().paths(cx), - ) - })?; - rows.push(Row::Worktree(root_path)); - rows.extend( - worktree_paths - .await? - .into_iter() - .map(|path| Row::Entry(worktree_id, path)), - ); - } - - this.update(cx, |this, cx| { - this.rows = rows; - cx.notify(); - }) - }) - .detach(); - } - - fn handle_path_click( - &mut self, - worktree_id: WorktreeId, - file_path: Arc, - window: &mut Window, - cx: &mut Context, - ) -> Option<()> { - let project_index = self.index.read(cx); - let fs = project_index.fs().clone(); - let worktree_index = project_index.worktree_index(worktree_id, cx)?.read(cx); - let root_path = worktree_index.worktree().read(cx).abs_path(); - let chunks = worktree_index - .embedding_index() - .chunks_for_path(file_path.clone(), cx); - - cx.spawn_in(window, async move |this, cx| { - let chunks = chunks.await?; - let content = fs.load(&root_path.join(&file_path)).await?; - let chunks = chunks - .into_iter() - .map(|chunk| { - let mut start = chunk.chunk.range.start.min(content.len()); - let mut end = chunk.chunk.range.end.min(content.len()); - while !content.is_char_boundary(start) { - start += 1; - } - while !content.is_char_boundary(end) { - end -= 1; - } - content[start..end].to_string().into() - }) - .collect::>(); - - this.update(cx, |this, cx| { - this.selected_path = Some(PathState { - path: file_path, - list_state: ListState::new(chunks.len(), gpui::ListAlignment::Top, px(100.)), - chunks, - }); - cx.notify(); - }) - }) - .detach(); - None - } - - fn render_chunk(&mut self, ix: usize, cx: &mut Context) -> AnyElement { - let buffer_font = ThemeSettings::get_global(cx).buffer_font.clone(); - let Some(state) = &self.selected_path else { - return div().into_any(); - }; - - let colors = cx.theme().colors(); - let chunk = &state.chunks[ix]; - - div() - .text_ui(cx) - .w_full() - .font(buffer_font) - .child( - h_flex() - .justify_between() - .child(format!( - "chunk {} of {}. length: {}", - ix + 1, - state.chunks.len(), - chunk.len(), - )) - .child( - h_flex() - .child( - Button::new(("prev", ix), "prev") - .disabled(ix == 0) - .on_click(cx.listener(move |this, _, _, _| { - this.scroll_to_chunk(ix.saturating_sub(1)) - })), - ) - .child( - Button::new(("next", ix), "next") - .disabled(ix + 1 == state.chunks.len()) - .on_click(cx.listener(move |this, _, _, _| { - this.scroll_to_chunk(ix + 1) - })), - ), - ), - ) - .child( - div() - .bg(colors.editor_background) - .text_xs() - .child(chunk.clone()), - ) - .into_any_element() - } - - fn scroll_to_chunk(&mut self, ix: usize) { - if let Some(state) = self.selected_path.as_mut() { - state.list_state.scroll_to(ListOffset { - item_ix: ix, - offset_in_item: px(0.), - }) - } - } -} - -impl Render for ProjectIndexDebugView { - fn render(&mut self, _window: &mut Window, cx: &mut Context) -> impl IntoElement { - if let Some(selected_path) = self.selected_path.as_ref() { - v_flex() - .child( - div() - .id("selected-path-name") - .child( - h_flex() - .justify_between() - .child(selected_path.path.to_string_lossy().to_string()) - .child("x"), - ) - .border_b_1() - .border_color(cx.theme().colors().border) - .cursor(CursorStyle::PointingHand) - .on_click(cx.listener(|this, _, _, cx| { - this.selected_path.take(); - cx.notify(); - })), - ) - .child( - list( - selected_path.list_state.clone(), - cx.processor(|this, ix, _, cx| this.render_chunk(ix, cx)), - ) - .size_full(), - ) - .size_full() - .into_any_element() - } else { - let mut list = uniform_list( - "ProjectIndexDebugView", - self.rows.len(), - cx.processor(move |this, range: Range, _, cx| { - this.rows[range] - .iter() - .enumerate() - .map(|(ix, row)| match row { - Row::Worktree(root_path) => div() - .id(ix) - .child(Label::new(root_path.to_string_lossy().to_string())), - Row::Entry(worktree_id, file_path) => div() - .id(ix) - .pl_8() - .child(Label::new(file_path.to_string_lossy().to_string())) - .on_mouse_move(cx.listener( - move |this, _: &MouseMoveEvent, _, cx| { - if this.hovered_row_ix != Some(ix) { - this.hovered_row_ix = Some(ix); - cx.notify(); - } - }, - )) - .cursor(CursorStyle::PointingHand) - .on_click(cx.listener({ - let worktree_id = *worktree_id; - let file_path = file_path.clone(); - move |this, _, window, cx| { - this.handle_path_click( - worktree_id, - file_path.clone(), - window, - cx, - ); - } - })), - }) - .collect() - }), - ) - .track_scroll(self.list_scroll_handle.clone()) - .size_full() - .text_bg(cx.theme().colors().background) - .into_any_element(); - - canvas( - move |bounds, window, cx| { - list.prepaint_as_root(bounds.origin, bounds.size.into(), window, cx); - list - }, - |_, mut list, window, cx| { - list.paint(window, cx); - }, - ) - .size_full() - .into_any_element() - } - } -} - -impl EventEmitter<()> for ProjectIndexDebugView {} - -impl Item for ProjectIndexDebugView { - type Event = (); - - fn tab_content_text(&self, _detail: usize, _cx: &App) -> SharedString { - "Project Index (Debug)".into() - } - - fn clone_on_split( - &self, - _: Option, - window: &mut Window, - cx: &mut Context, - ) -> Option> - where - Self: Sized, - { - Some(cx.new(|cx| Self::new(self.index.clone(), window, cx))) - } -} - -impl Focusable for ProjectIndexDebugView { - fn focus_handle(&self, _: &App) -> gpui::FocusHandle { - self.focus_handle.clone() - } -} diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs deleted file mode 100644 index 439791047a282771f94982c5bad4c690df497cc4..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/semantic_index.rs +++ /dev/null @@ -1,632 +0,0 @@ -mod chunking; -mod embedding; -mod embedding_index; -mod indexing; -mod project_index; -mod project_index_debug_view; -mod summary_backlog; -mod summary_index; -mod worktree_index; - -use anyhow::{Context as _, Result}; -use collections::HashMap; -use fs::Fs; -use gpui::{App, AppContext as _, AsyncApp, BorrowAppContext, Context, Entity, Global, WeakEntity}; -use language::LineEnding; -use project::{Project, Worktree}; -use std::{ - cmp::Ordering, - path::{Path, PathBuf}, - sync::Arc, -}; -use util::ResultExt as _; -use workspace::Workspace; - -pub use embedding::*; -pub use project_index::{LoadedSearchResult, ProjectIndex, SearchResult, Status}; -pub use project_index_debug_view::ProjectIndexDebugView; -pub use summary_index::FileSummary; - -pub struct SemanticDb { - embedding_provider: Arc, - db_connection: Option, - project_indices: HashMap, Entity>, -} - -impl Global for SemanticDb {} - -impl SemanticDb { - pub async fn new( - db_path: PathBuf, - embedding_provider: Arc, - cx: &mut AsyncApp, - ) -> Result { - let db_connection = cx - .background_spawn(async move { - std::fs::create_dir_all(&db_path)?; - unsafe { - heed::EnvOpenOptions::new() - .map_size(1024 * 1024 * 1024) - .max_dbs(3000) - .open(db_path) - } - }) - .await - .context("opening database connection")?; - - cx.update(|cx| { - cx.observe_new( - |workspace: &mut Workspace, _window, cx: &mut Context| { - let project = workspace.project().clone(); - - if cx.has_global::() { - cx.update_global::(|this, cx| { - this.create_project_index(project, cx); - }) - } else { - log::info!("No SemanticDb, skipping project index") - } - }, - ) - .detach(); - }) - .ok(); - - Ok(SemanticDb { - db_connection: Some(db_connection), - embedding_provider, - project_indices: HashMap::default(), - }) - } - - pub async fn load_results( - mut results: Vec, - fs: &Arc, - cx: &AsyncApp, - ) -> Result> { - let mut max_scores_by_path = HashMap::<_, (f32, usize)>::default(); - for result in &results { - let (score, query_index) = max_scores_by_path - .entry((result.worktree.clone(), result.path.clone())) - .or_default(); - if result.score > *score { - *score = result.score; - *query_index = result.query_index; - } - } - - results.sort_by(|a, b| { - let max_score_a = max_scores_by_path[&(a.worktree.clone(), a.path.clone())].0; - let max_score_b = max_scores_by_path[&(b.worktree.clone(), b.path.clone())].0; - max_score_b - .partial_cmp(&max_score_a) - .unwrap_or(Ordering::Equal) - .then_with(|| a.worktree.entity_id().cmp(&b.worktree.entity_id())) - .then_with(|| a.path.cmp(&b.path)) - .then_with(|| a.range.start.cmp(&b.range.start)) - }); - - let mut last_loaded_file: Option<(Entity, Arc, PathBuf, String)> = None; - let mut loaded_results = Vec::::new(); - for result in results { - let full_path; - let file_content; - if let Some(last_loaded_file) = - last_loaded_file - .as_ref() - .filter(|(last_worktree, last_path, _, _)| { - last_worktree == &result.worktree && last_path == &result.path - }) - { - full_path = last_loaded_file.2.clone(); - file_content = &last_loaded_file.3; - } else { - let output = result.worktree.read_with(cx, |worktree, _cx| { - let entry_abs_path = worktree.abs_path().join(&result.path); - let mut entry_full_path = PathBuf::from(worktree.root_name()); - entry_full_path.push(&result.path); - let file_content = async { - let entry_abs_path = entry_abs_path; - fs.load(&entry_abs_path).await - }; - (entry_full_path, file_content) - })?; - full_path = output.0; - let Some(content) = output.1.await.log_err() else { - continue; - }; - last_loaded_file = Some(( - result.worktree.clone(), - result.path.clone(), - full_path.clone(), - content, - )); - file_content = &last_loaded_file.as_ref().unwrap().3; - }; - - let query_index = max_scores_by_path[&(result.worktree.clone(), result.path.clone())].1; - - let mut range_start = result.range.start.min(file_content.len()); - let mut range_end = result.range.end.min(file_content.len()); - while !file_content.is_char_boundary(range_start) { - range_start += 1; - } - while !file_content.is_char_boundary(range_end) { - range_end += 1; - } - - let start_row = file_content[0..range_start].matches('\n').count() as u32; - let mut end_row = file_content[0..range_end].matches('\n').count() as u32; - let start_line_byte_offset = file_content[0..range_start] - .rfind('\n') - .map(|pos| pos + 1) - .unwrap_or_default(); - let mut end_line_byte_offset = range_end; - if file_content[..end_line_byte_offset].ends_with('\n') { - end_row -= 1; - } else { - end_line_byte_offset = file_content[range_end..] - .find('\n') - .map(|pos| range_end + pos + 1) - .unwrap_or_else(|| file_content.len()); - } - let mut excerpt_content = - file_content[start_line_byte_offset..end_line_byte_offset].to_string(); - LineEnding::normalize(&mut excerpt_content); - - if let Some(prev_result) = loaded_results.last_mut() - && prev_result.full_path == full_path - && *prev_result.row_range.end() + 1 == start_row - { - prev_result.row_range = *prev_result.row_range.start()..=end_row; - prev_result.excerpt_content.push_str(&excerpt_content); - continue; - } - - loaded_results.push(LoadedSearchResult { - path: result.path, - full_path, - excerpt_content, - row_range: start_row..=end_row, - query_index, - }); - } - - for result in &mut loaded_results { - while result.excerpt_content.ends_with("\n\n") { - result.excerpt_content.pop(); - result.row_range = - *result.row_range.start()..=result.row_range.end().saturating_sub(1) - } - } - - Ok(loaded_results) - } - - pub fn project_index( - &mut self, - project: Entity, - _cx: &mut App, - ) -> Option> { - self.project_indices.get(&project.downgrade()).cloned() - } - - pub fn remaining_summaries( - &self, - project: &WeakEntity, - cx: &mut App, - ) -> Option { - self.project_indices.get(project).map(|project_index| { - project_index.update(cx, |project_index, cx| { - project_index.remaining_summaries(cx) - }) - }) - } - - pub fn create_project_index( - &mut self, - project: Entity, - cx: &mut App, - ) -> Entity { - let project_index = cx.new(|cx| { - ProjectIndex::new( - project.clone(), - self.db_connection.clone().unwrap(), - self.embedding_provider.clone(), - cx, - ) - }); - - let project_weak = project.downgrade(); - self.project_indices - .insert(project_weak.clone(), project_index.clone()); - - cx.observe_release(&project, move |_, cx| { - if cx.has_global::() { - cx.update_global::(|this, _| { - this.project_indices.remove(&project_weak); - }) - } - }) - .detach(); - - project_index - } -} - -impl Drop for SemanticDb { - fn drop(&mut self) { - self.db_connection.take().unwrap().prepare_for_closing(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use chunking::Chunk; - use embedding_index::{ChunkedFile, EmbeddingIndex}; - use feature_flags::FeatureFlagAppExt; - use fs::FakeFs; - use futures::{FutureExt, future::BoxFuture}; - use gpui::TestAppContext; - use indexing::IndexingEntrySet; - use language::language_settings::AllLanguageSettings; - use project::{Project, ProjectEntryId}; - use serde_json::json; - use settings::SettingsStore; - use smol::channel; - use std::{future, path::Path, sync::Arc}; - use util::path; - - fn init_test(cx: &mut TestAppContext) { - zlog::init_test(); - - cx.update(|cx| { - let store = SettingsStore::test(cx); - cx.set_global(store); - language::init(cx); - cx.update_flags(false, vec![]); - Project::init_settings(cx); - SettingsStore::update(cx, |store, cx| { - store.update_user_settings::(cx, |_| {}); - }); - }); - } - - pub struct TestEmbeddingProvider { - batch_size: usize, - compute_embedding: Box Result + Send + Sync>, - } - - impl TestEmbeddingProvider { - pub fn new( - batch_size: usize, - compute_embedding: impl 'static + Fn(&str) -> Result + Send + Sync, - ) -> Self { - Self { - batch_size, - compute_embedding: Box::new(compute_embedding), - } - } - } - - impl EmbeddingProvider for TestEmbeddingProvider { - fn embed<'a>( - &'a self, - texts: &'a [TextToEmbed<'a>], - ) -> BoxFuture<'a, Result>> { - let embeddings = texts - .iter() - .map(|to_embed| (self.compute_embedding)(to_embed.text)) - .collect(); - future::ready(embeddings).boxed() - } - - fn batch_size(&self) -> usize { - self.batch_size - } - } - - #[gpui::test] - async fn test_search(cx: &mut TestAppContext) { - cx.executor().allow_parking(); - - init_test(cx); - - cx.update(|cx| { - // This functionality is staff-flagged. - cx.update_flags(true, vec![]); - }); - - let temp_dir = tempfile::tempdir().unwrap(); - - let mut semantic_index = SemanticDb::new( - temp_dir.path().into(), - Arc::new(TestEmbeddingProvider::new(16, |text| { - let mut embedding = vec![0f32; 2]; - // if the text contains garbage, give it a 1 in the first dimension - if text.contains("garbage in") { - embedding[0] = 0.9; - } else { - embedding[0] = -0.9; - } - - if text.contains("garbage out") { - embedding[1] = 0.9; - } else { - embedding[1] = -0.9; - } - - Ok(Embedding::new(embedding)) - })), - &mut cx.to_async(), - ) - .await - .unwrap(); - - let fs = FakeFs::new(cx.executor()); - let project_path = Path::new("/fake_project"); - - fs.insert_tree( - project_path, - json!({ - "fixture": { - "main.rs": include_str!("../fixture/main.rs"), - "needle.md": include_str!("../fixture/needle.md"), - } - }), - ) - .await; - - let project = Project::test(fs, [project_path], cx).await; - - let project_index = cx.update(|cx| { - let language_registry = project.read(cx).languages().clone(); - let node_runtime = project.read(cx).node_runtime().unwrap().clone(); - languages::init(language_registry, node_runtime, cx); - semantic_index.create_project_index(project.clone(), cx) - }); - - cx.run_until_parked(); - while cx - .update(|cx| semantic_index.remaining_summaries(&project.downgrade(), cx)) - .unwrap() - > 0 - { - cx.run_until_parked(); - } - - let results = cx - .update(|cx| { - let project_index = project_index.read(cx); - let query = "garbage in, garbage out"; - project_index.search(vec![query.into()], 4, cx) - }) - .await - .unwrap(); - - assert!( - results.len() > 1, - "should have found some results, but only found {:?}", - results - ); - - for result in &results { - println!("result: {:?}", result.path); - println!("score: {:?}", result.score); - } - - // Find result that is greater than 0.5 - let search_result = results.iter().find(|result| result.score > 0.9).unwrap(); - - assert_eq!( - search_result.path.to_string_lossy(), - path!("fixture/needle.md") - ); - - let content = cx - .update(|cx| { - let worktree = search_result.worktree.read(cx); - let entry_abs_path = worktree.abs_path().join(&search_result.path); - let fs = project.read(cx).fs().clone(); - cx.background_spawn(async move { fs.load(&entry_abs_path).await.unwrap() }) - }) - .await; - - let range = search_result.range.clone(); - let content = content[range].to_owned(); - - assert!(content.contains("garbage in, garbage out")); - } - - #[gpui::test] - async fn test_embed_files(cx: &mut TestAppContext) { - cx.executor().allow_parking(); - - let provider = Arc::new(TestEmbeddingProvider::new(3, |text| { - anyhow::ensure!( - !text.contains('g'), - "cannot embed text containing a 'g' character" - ); - Ok(Embedding::new( - ('a'..='z') - .map(|char| text.chars().filter(|c| *c == char).count() as f32) - .collect(), - )) - })); - - let (indexing_progress_tx, _) = channel::unbounded(); - let indexing_entries = Arc::new(IndexingEntrySet::new(indexing_progress_tx)); - - let (chunked_files_tx, chunked_files_rx) = channel::unbounded::(); - chunked_files_tx - .send_blocking(ChunkedFile { - path: Path::new("test1.md").into(), - mtime: None, - handle: indexing_entries.insert(ProjectEntryId::from_proto(0)), - text: "abcdefghijklmnop".to_string(), - chunks: [0..4, 4..8, 8..12, 12..16] - .into_iter() - .map(|range| Chunk { - range, - digest: Default::default(), - }) - .collect(), - }) - .unwrap(); - chunked_files_tx - .send_blocking(ChunkedFile { - path: Path::new("test2.md").into(), - mtime: None, - handle: indexing_entries.insert(ProjectEntryId::from_proto(1)), - text: "qrstuvwxyz".to_string(), - chunks: [0..4, 4..8, 8..10] - .into_iter() - .map(|range| Chunk { - range, - digest: Default::default(), - }) - .collect(), - }) - .unwrap(); - chunked_files_tx.close(); - - let embed_files_task = - cx.update(|cx| EmbeddingIndex::embed_files(provider.clone(), chunked_files_rx, cx)); - embed_files_task.task.await.unwrap(); - - let embedded_files_rx = embed_files_task.files; - let mut embedded_files = Vec::new(); - while let Ok((embedded_file, _)) = embedded_files_rx.recv().await { - embedded_files.push(embedded_file); - } - - assert_eq!(embedded_files.len(), 1); - assert_eq!(embedded_files[0].path.as_ref(), Path::new("test2.md")); - assert_eq!( - embedded_files[0] - .chunks - .iter() - .map(|embedded_chunk| { embedded_chunk.embedding.clone() }) - .collect::>(), - vec![ - (provider.compute_embedding)("qrst").unwrap(), - (provider.compute_embedding)("uvwx").unwrap(), - (provider.compute_embedding)("yz").unwrap(), - ], - ); - } - - #[gpui::test] - async fn test_load_search_results(cx: &mut TestAppContext) { - init_test(cx); - - let fs = FakeFs::new(cx.executor()); - let project_path = Path::new("/fake_project"); - - let file1_content = "one\ntwo\nthree\nfour\nfive\n"; - let file2_content = "aaa\nbbb\nccc\nddd\neee\n"; - - fs.insert_tree( - project_path, - json!({ - "file1.txt": file1_content, - "file2.txt": file2_content, - }), - ) - .await; - - let fs = fs as Arc; - let project = Project::test(fs.clone(), [project_path], cx).await; - let worktree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap()); - - // chunk that is already newline-aligned - let search_results = vec![SearchResult { - worktree: worktree.clone(), - path: Path::new("file1.txt").into(), - range: 0..file1_content.find("four").unwrap(), - score: 0.5, - query_index: 0, - }]; - assert_eq!( - SemanticDb::load_results(search_results, &fs, &cx.to_async()) - .await - .unwrap(), - &[LoadedSearchResult { - path: Path::new("file1.txt").into(), - full_path: "fake_project/file1.txt".into(), - excerpt_content: "one\ntwo\nthree\n".into(), - row_range: 0..=2, - query_index: 0, - }] - ); - - // chunk that is *not* newline-aligned - let search_results = vec![SearchResult { - worktree: worktree.clone(), - path: Path::new("file1.txt").into(), - range: file1_content.find("two").unwrap() + 1..file1_content.find("four").unwrap() + 2, - score: 0.5, - query_index: 0, - }]; - assert_eq!( - SemanticDb::load_results(search_results, &fs, &cx.to_async()) - .await - .unwrap(), - &[LoadedSearchResult { - path: Path::new("file1.txt").into(), - full_path: "fake_project/file1.txt".into(), - excerpt_content: "two\nthree\nfour\n".into(), - row_range: 1..=3, - query_index: 0, - }] - ); - - // chunks that are adjacent - - let search_results = vec![ - SearchResult { - worktree: worktree.clone(), - path: Path::new("file1.txt").into(), - range: file1_content.find("two").unwrap()..file1_content.len(), - score: 0.6, - query_index: 0, - }, - SearchResult { - worktree: worktree.clone(), - path: Path::new("file1.txt").into(), - range: 0..file1_content.find("two").unwrap(), - score: 0.5, - query_index: 1, - }, - SearchResult { - worktree: worktree.clone(), - path: Path::new("file2.txt").into(), - range: 0..file2_content.len(), - score: 0.8, - query_index: 1, - }, - ]; - assert_eq!( - SemanticDb::load_results(search_results, &fs, &cx.to_async()) - .await - .unwrap(), - &[ - LoadedSearchResult { - path: Path::new("file2.txt").into(), - full_path: "fake_project/file2.txt".into(), - excerpt_content: file2_content.into(), - row_range: 0..=4, - query_index: 1, - }, - LoadedSearchResult { - path: Path::new("file1.txt").into(), - full_path: "fake_project/file1.txt".into(), - excerpt_content: file1_content.into(), - row_range: 0..=4, - query_index: 0, - } - ] - ); - } -} diff --git a/crates/semantic_index/src/summary_backlog.rs b/crates/semantic_index/src/summary_backlog.rs deleted file mode 100644 index e77fa4862f7e0d300a3565acfbe38bda027d9cd7..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/summary_backlog.rs +++ /dev/null @@ -1,49 +0,0 @@ -use collections::HashMap; -use fs::MTime; -use std::{path::Path, sync::Arc}; - -const MAX_FILES_BEFORE_RESUMMARIZE: usize = 4; -const MAX_BYTES_BEFORE_RESUMMARIZE: u64 = 1_000_000; // 1 MB - -#[derive(Default, Debug)] -pub struct SummaryBacklog { - /// Key: path to a file that needs summarization, but that we haven't summarized yet. Value: that file's size on disk, in bytes, and its mtime. - files: HashMap, (u64, Option)>, - /// Cache of the sum of all values in `files`, so we don't have to traverse the whole map to check if we're over the byte limit. - total_bytes: u64, -} - -impl SummaryBacklog { - /// Store the given path in the backlog, along with how many bytes are in it. - pub fn insert(&mut self, path: Arc, bytes_on_disk: u64, mtime: Option) { - let (prev_bytes, _) = self - .files - .insert(path, (bytes_on_disk, mtime)) - .unwrap_or_default(); // Default to 0 prev_bytes - - // Update the cached total by subtracting out the old amount and adding the new one. - self.total_bytes = self.total_bytes - prev_bytes + bytes_on_disk; - } - - /// Returns true if the total number of bytes in the backlog exceeds a predefined threshold. - pub fn needs_drain(&self) -> bool { - self.files.len() > MAX_FILES_BEFORE_RESUMMARIZE || - // The whole purpose of the cached total_bytes is to make this comparison cheap. - // Otherwise we'd have to traverse the entire dictionary every time we wanted this answer. - self.total_bytes > MAX_BYTES_BEFORE_RESUMMARIZE - } - - /// Remove all the entries in the backlog and return the file paths as an iterator. - #[allow(clippy::needless_lifetimes)] // Clippy thinks this 'a can be elided, but eliding it gives a compile error - pub fn drain<'a>(&'a mut self) -> impl Iterator, Option)> + 'a { - self.total_bytes = 0; - - self.files - .drain() - .map(|(path, (_size, mtime))| (path, mtime)) - } - - pub fn len(&self) -> usize { - self.files.len() - } -} diff --git a/crates/semantic_index/src/summary_index.rs b/crates/semantic_index/src/summary_index.rs deleted file mode 100644 index 9a3eb302edaaef831f515edf3492aecf59bf17f7..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/summary_index.rs +++ /dev/null @@ -1,696 +0,0 @@ -use anyhow::{Context as _, Result, anyhow}; -use arrayvec::ArrayString; -use fs::{Fs, MTime}; -use futures::{TryFutureExt, stream::StreamExt}; -use futures_batch::ChunksTimeoutStreamExt; -use gpui::{App, AppContext as _, Entity, Task}; -use heed::{ - RoTxn, - types::{SerdeBincode, Str}, -}; -use language_model::{ - LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest, - LanguageModelRequestMessage, Role, -}; -use log; -use parking_lot::Mutex; -use project::{Entry, UpdatedEntriesSet, Worktree}; -use serde::{Deserialize, Serialize}; -use smol::channel; -use std::{ - future::Future, - path::Path, - pin::pin, - sync::Arc, - time::{Duration, Instant}, -}; -use util::ResultExt; -use worktree::Snapshot; - -use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct FileSummary { - pub filename: String, - pub summary: String, -} - -#[derive(Debug, Serialize, Deserialize)] -struct UnsummarizedFile { - // Path to the file on disk - path: Arc, - // The mtime of the file on disk - mtime: Option, - // BLAKE3 hash of the source file's contents - digest: Blake3Digest, - // The source file's contents - contents: String, -} - -#[derive(Debug, Serialize, Deserialize)] -struct SummarizedFile { - // Path to the file on disk - path: String, - // The mtime of the file on disk - mtime: Option, - // BLAKE3 hash of the source file's contents - digest: Blake3Digest, - // The LLM's summary of the file's contents - summary: String, -} - -/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246 -pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>; - -#[derive(Debug, Serialize, Deserialize)] -pub struct FileDigest { - pub mtime: Option, - pub digest: Blake3Digest, -} - -struct NeedsSummary { - files: channel::Receiver, - task: Task>, -} - -struct SummarizeFiles { - files: channel::Receiver, - task: Task>, -} - -pub struct SummaryIndex { - worktree: Entity, - fs: Arc, - db_connection: heed::Env, - file_digest_db: heed::Database>, // Key: file path. Val: BLAKE3 digest of its contents. - summary_db: heed::Database, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents. - backlog: Arc>, - _entry_ids_being_indexed: Arc, // TODO can this be removed? -} - -struct Backlogged { - paths_to_digest: channel::Receiver, Option)>>, - task: Task>, -} - -struct MightNeedSummaryFiles { - files: channel::Receiver, - task: Task>, -} - -impl SummaryIndex { - pub fn new( - worktree: Entity, - fs: Arc, - db_connection: heed::Env, - file_digest_db: heed::Database>, - summary_db: heed::Database, Str>, - _entry_ids_being_indexed: Arc, - ) -> Self { - Self { - worktree, - fs, - db_connection, - file_digest_db, - summary_db, - _entry_ids_being_indexed, - backlog: Default::default(), - } - } - - pub fn file_digest_db(&self) -> heed::Database> { - self.file_digest_db - } - - pub fn summary_db(&self) -> heed::Database, Str> { - self.summary_db - } - - pub fn index_entries_changed_on_disk( - &self, - is_auto_available: bool, - cx: &App, - ) -> impl Future> + use<> { - let start = Instant::now(); - let backlogged; - let digest; - let needs_summary; - let summaries; - let persist; - - if is_auto_available { - let worktree = self.worktree.read(cx).snapshot(); - let worktree_abs_path = worktree.abs_path().clone(); - - backlogged = self.scan_entries(worktree, cx); - digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx); - needs_summary = self.check_summary_cache(digest.files, cx); - summaries = self.summarize_files(needs_summary.files, cx); - persist = self.persist_summaries(summaries.files, cx); - } else { - // This feature is only staff-shipped, so make the rest of these no-ops. - backlogged = Backlogged { - paths_to_digest: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - digest = MightNeedSummaryFiles { - files: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - needs_summary = NeedsSummary { - files: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - summaries = SummarizeFiles { - files: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - persist = Task::ready(Ok(())); - } - - async move { - futures::try_join!( - backlogged.task, - digest.task, - needs_summary.task, - summaries.task, - persist - )?; - - if is_auto_available { - log::info!( - "Summarizing everything that changed on disk took {:?}", - start.elapsed() - ); - } - - Ok(()) - } - } - - pub fn index_updated_entries( - &mut self, - updated_entries: UpdatedEntriesSet, - is_auto_available: bool, - cx: &App, - ) -> impl Future> + use<> { - let start = Instant::now(); - let backlogged; - let digest; - let needs_summary; - let summaries; - let persist; - - if is_auto_available { - let worktree = self.worktree.read(cx).snapshot(); - let worktree_abs_path = worktree.abs_path().clone(); - - backlogged = self.scan_updated_entries(worktree, updated_entries, cx); - digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx); - needs_summary = self.check_summary_cache(digest.files, cx); - summaries = self.summarize_files(needs_summary.files, cx); - persist = self.persist_summaries(summaries.files, cx); - } else { - // This feature is only staff-shipped, so make the rest of these no-ops. - backlogged = Backlogged { - paths_to_digest: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - digest = MightNeedSummaryFiles { - files: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - needs_summary = NeedsSummary { - files: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - summaries = SummarizeFiles { - files: channel::unbounded().1, - task: Task::ready(Ok(())), - }; - persist = Task::ready(Ok(())); - } - - async move { - futures::try_join!( - backlogged.task, - digest.task, - needs_summary.task, - summaries.task, - persist - )?; - - log::debug!("Summarizing updated entries took {:?}", start.elapsed()); - - Ok(()) - } - } - - fn check_summary_cache( - &self, - might_need_summary: channel::Receiver, - cx: &App, - ) -> NeedsSummary { - let db_connection = self.db_connection.clone(); - let db = self.summary_db; - let (needs_summary_tx, needs_summary_rx) = channel::bounded(512); - let task = cx.background_spawn(async move { - let mut might_need_summary = pin!(might_need_summary); - while let Some(file) = might_need_summary.next().await { - let tx = db_connection - .read_txn() - .context("Failed to create read transaction for checking which hashes are in summary cache")?; - - match db.get(&tx, &file.digest) { - Ok(opt_answer) => { - if opt_answer.is_none() { - // It's not in the summary cache db, so we need to summarize it. - log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest); - needs_summary_tx.send(file).await?; - } else { - log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest); - } - } - Err(err) => { - log::error!("Reading from the summaries database failed: {:?}", err); - } - } - } - - Ok(()) - }); - - NeedsSummary { - files: needs_summary_rx, - task, - } - } - - fn scan_entries(&self, worktree: Snapshot, cx: &App) -> Backlogged { - let (tx, rx) = channel::bounded(512); - let db_connection = self.db_connection.clone(); - let digest_db = self.file_digest_db; - let backlog = Arc::clone(&self.backlog); - let task = cx.background_spawn(async move { - let txn = db_connection - .read_txn() - .context("failed to create read transaction")?; - - for entry in worktree.files(false, 0) { - let needs_summary = - Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry); - - if !needs_summary.is_empty() { - tx.send(needs_summary).await?; - } - } - - // TODO delete db entries for deleted files - - Ok(()) - }); - - Backlogged { - paths_to_digest: rx, - task, - } - } - - fn add_to_backlog( - backlog: Arc>, - digest_db: heed::Database>, - txn: &RoTxn<'_>, - entry: &Entry, - ) -> Vec<(Arc, Option)> { - let entry_db_key = db_key_for_path(&entry.path); - - match digest_db.get(txn, &entry_db_key) { - Ok(opt_saved_digest) => { - // The file path is the same, but the mtime is different. (Or there was no mtime.) - // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents. - if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) { - let mut backlog = backlog.lock(); - - log::info!( - "Inserting {:?} ({:?} bytes) into backlog", - &entry.path, - entry.size, - ); - backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime); - - if backlog.needs_drain() { - log::info!("Draining summary backlog..."); - return backlog.drain().collect(); - } - } - } - Err(err) => { - log::error!( - "Error trying to get file digest db entry {:?}: {:?}", - &entry_db_key, - err - ); - } - } - - Vec::new() - } - - fn scan_updated_entries( - &self, - worktree: Snapshot, - updated_entries: UpdatedEntriesSet, - cx: &App, - ) -> Backlogged { - log::info!("Scanning for updated entries that might need summarization..."); - let (tx, rx) = channel::bounded(512); - // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); - let db_connection = self.db_connection.clone(); - let digest_db = self.file_digest_db; - let backlog = Arc::clone(&self.backlog); - let task = cx.background_spawn(async move { - let txn = db_connection - .read_txn() - .context("failed to create read transaction")?; - - for (path, entry_id, status) in updated_entries.iter() { - match status { - project::PathChange::Loaded - | project::PathChange::Added - | project::PathChange::Updated - | project::PathChange::AddedOrUpdated => { - if let Some(entry) = worktree.entry_for_id(*entry_id) - && entry.is_file() - { - let needs_summary = - Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry); - - if !needs_summary.is_empty() { - tx.send(needs_summary).await?; - } - } - } - project::PathChange::Removed => { - let _db_path = db_key_for_path(path); - // TODO delete db entries for deleted files - // deleted_entry_ranges_tx - // .send((Bound::Included(db_path.clone()), Bound::Included(db_path))) - // .await?; - } - } - } - - Ok(()) - }); - - Backlogged { - paths_to_digest: rx, - // deleted_entry_ranges: deleted_entry_ranges_rx, - task, - } - } - - fn digest_files( - &self, - paths: channel::Receiver, Option)>>, - worktree_abs_path: Arc, - cx: &App, - ) -> MightNeedSummaryFiles { - let fs = self.fs.clone(); - let (rx, tx) = channel::bounded(2048); - let task = cx.spawn(async move |cx| { - cx.background_executor() - .scoped(|cx| { - for _ in 0..cx.num_cpus() { - cx.spawn(async { - while let Ok(pairs) = paths.recv().await { - // Note: we could process all these files concurrently if desired. Might or might not speed things up. - for (path, mtime) in pairs { - let entry_abs_path = worktree_abs_path.join(&path); - - // Load the file's contents and compute its hash digest. - let unsummarized_file = { - let Some(contents) = fs - .load(&entry_abs_path) - .await - .with_context(|| { - format!("failed to read path {entry_abs_path:?}") - }) - .log_err() - else { - continue; - }; - - let digest = { - let mut hasher = blake3::Hasher::new(); - // Incorporate both the (relative) file path as well as the contents of the file into the hash. - // This is because in some languages and frameworks, identical files can do different things - // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model. - hasher.update(path.display().to_string().as_bytes()); - hasher.update(contents.as_bytes()); - hasher.finalize().to_hex() - }; - - UnsummarizedFile { - digest, - contents, - path, - mtime, - } - }; - - if let Err(err) = rx - .send(unsummarized_file) - .map_err(|error| anyhow!(error)) - .await - { - log::error!("Error: {:?}", err); - - return; - } - } - } - }); - } - }) - .await; - Ok(()) - }); - - MightNeedSummaryFiles { files: tx, task } - } - - fn summarize_files( - &self, - unsummarized_files: channel::Receiver, - cx: &App, - ) -> SummarizeFiles { - let (summarized_tx, summarized_rx) = channel::bounded(512); - let task = cx.spawn(async move |cx| { - while let Ok(file) = unsummarized_files.recv().await { - log::debug!("Summarizing {:?}", file); - let summary = cx - .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))? - .await - .unwrap_or_else(|err| { - // Log a warning because we'll continue anyway. - // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries, - // but this might give bad summaries due to cutting off source code files in the middle. - log::warn!("Failed to summarize {} - {:?}", file.path.display(), err); - - String::new() - }); - - // Note that the summary could be empty because of an error talking to a cloud provider, - // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()). - if !summary.is_empty() { - summarized_tx - .send(SummarizedFile { - path: file.path.display().to_string(), - digest: file.digest, - summary, - mtime: file.mtime, - }) - .await? - } - } - - Ok(()) - }); - - SummarizeFiles { - files: summarized_rx, - task, - } - } - - fn summarize_code( - code: &str, - path: &Path, - cx: &App, - ) -> impl Future> + use<> { - let start = Instant::now(); - let (summary_model_id, use_cache): (LanguageModelId, bool) = ( - "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings. - false, // qwen2 doesn't have a cache, but we should probably infer this from the model - ); - let Some(model) = LanguageModelRegistry::read_global(cx) - .available_models(cx) - .find(|model| &model.id() == &summary_model_id) - else { - return cx.background_spawn(async move { - anyhow::bail!("Couldn't find the preferred summarization model ({summary_model_id:?}) in the language registry's available models") - }); - }; - let utf8_path = path.to_string_lossy(); - const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:"; - let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}"); - - log::debug!( - "Summarizing code by sending this prompt to {:?}: {:?}", - model.name(), - &prompt - ); - - let request = LanguageModelRequest { - thread_id: None, - prompt_id: None, - mode: None, - intent: None, - messages: vec![LanguageModelRequestMessage { - role: Role::User, - content: vec![prompt.into()], - cache: use_cache, - }], - tools: Vec::new(), - tool_choice: None, - stop: Vec::new(), - temperature: None, - thinking_allowed: true, - }; - - let code_len = code.len(); - cx.spawn(async move |cx| { - let stream = model.stream_completion(request, cx); - cx.background_spawn(async move { - let answer: String = stream - .await? - .filter_map(|event| async { - if let Ok(LanguageModelCompletionEvent::Text(text)) = event { - Some(text) - } else { - None - } - }) - .collect() - .await; - - log::info!( - "It took {:?} to summarize {:?} bytes of code.", - start.elapsed(), - code_len - ); - - log::debug!("Summary was: {:?}", &answer); - - Ok(answer) - }) - .await - - // TODO if summarization failed, put it back in the backlog! - }) - } - - fn persist_summaries( - &self, - summaries: channel::Receiver, - cx: &App, - ) -> Task> { - let db_connection = self.db_connection.clone(); - let digest_db = self.file_digest_db; - let summary_db = self.summary_db; - cx.background_spawn(async move { - let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2))); - while let Some(summaries) = summaries.next().await { - let mut txn = db_connection.write_txn()?; - for file in &summaries { - log::debug!( - "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}", - &file.path, - file.summary.len(), - file.digest - ); - digest_db.put( - &mut txn, - &file.path, - &FileDigest { - mtime: file.mtime, - digest: file.digest, - }, - )?; - summary_db.put(&mut txn, &file.digest, &file.summary)?; - } - txn.commit()?; - - drop(summaries); - log::debug!("committed summaries"); - } - - Ok(()) - }) - } - - /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately. - pub(crate) fn flush_backlog( - &self, - worktree_abs_path: Arc, - cx: &App, - ) -> impl Future> + use<> { - let start = Instant::now(); - let backlogged = { - let (tx, rx) = channel::bounded(512); - let needs_summary: Vec<(Arc, Option)> = { - let mut backlog = self.backlog.lock(); - - backlog.drain().collect() - }; - - let task = cx.background_spawn(async move { - tx.send(needs_summary).await?; - Ok(()) - }); - - Backlogged { - paths_to_digest: rx, - task, - } - }; - - let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx); - let needs_summary = self.check_summary_cache(digest.files, cx); - let summaries = self.summarize_files(needs_summary.files, cx); - let persist = self.persist_summaries(summaries.files, cx); - - async move { - futures::try_join!( - backlogged.task, - digest.task, - needs_summary.task, - summaries.task, - persist - )?; - - log::info!("Summarizing backlogged entries took {:?}", start.elapsed()); - - Ok(()) - } - } - - pub(crate) fn backlog_len(&self) -> usize { - self.backlog.lock().len() - } -} - -fn db_key_for_path(path: &Arc) -> String { - path.to_string_lossy().replace('/', "\0") -} diff --git a/crates/semantic_index/src/worktree_index.rs b/crates/semantic_index/src/worktree_index.rs deleted file mode 100644 index 84b932d2965562e2c7deb02019449ad69ade7bc0..0000000000000000000000000000000000000000 --- a/crates/semantic_index/src/worktree_index.rs +++ /dev/null @@ -1,205 +0,0 @@ -use crate::embedding::EmbeddingProvider; -use crate::embedding_index::EmbeddingIndex; -use crate::indexing::IndexingEntrySet; -use crate::summary_index::SummaryIndex; -use anyhow::Result; -use fs::Fs; -use futures::future::Shared; -use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Subscription, Task, WeakEntity}; -use language::LanguageRegistry; -use log; -use project::{UpdatedEntriesSet, Worktree}; -use smol::channel; -use std::sync::Arc; -use util::ResultExt; - -#[derive(Clone)] -pub enum WorktreeIndexHandle { - Loading { - index: Shared, Arc>>>, - }, - Loaded { - index: Entity, - }, -} - -pub struct WorktreeIndex { - worktree: Entity, - db_connection: heed::Env, - embedding_index: EmbeddingIndex, - summary_index: SummaryIndex, - entry_ids_being_indexed: Arc, - _index_entries: Task>, - _subscription: Subscription, -} - -impl WorktreeIndex { - pub fn load( - worktree: Entity, - db_connection: heed::Env, - language_registry: Arc, - fs: Arc, - status_tx: channel::Sender<()>, - embedding_provider: Arc, - cx: &mut App, - ) -> Task>> { - let worktree_for_index = worktree.clone(); - let worktree_for_summary = worktree.clone(); - let worktree_abs_path = worktree.read(cx).abs_path(); - let embedding_fs = Arc::clone(&fs); - let summary_fs = fs; - cx.spawn(async move |cx| { - let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx)); - let (embedding_index, summary_index) = cx - .background_spawn({ - let entries_being_indexed = Arc::clone(&entries_being_indexed); - let db_connection = db_connection.clone(); - async move { - let mut txn = db_connection.write_txn()?; - let embedding_index = { - let db_name = worktree_abs_path.to_string_lossy(); - let db = db_connection.create_database(&mut txn, Some(&db_name))?; - - EmbeddingIndex::new( - worktree_for_index, - embedding_fs, - db_connection.clone(), - db, - language_registry, - embedding_provider, - Arc::clone(&entries_being_indexed), - ) - }; - let summary_index = { - let file_digest_db = { - let db_name = - // Prepend something that wouldn't be found at the beginning of an - // absolute path, so we don't get db key namespace conflicts with - // embeddings, which use the abs path as a key. - format!("digests-{}", worktree_abs_path.to_string_lossy()); - db_connection.create_database(&mut txn, Some(&db_name))? - }; - let summary_db = { - let db_name = - // Prepend something that wouldn't be found at the beginning of an - // absolute path, so we don't get db key namespace conflicts with - // embeddings, which use the abs path as a key. - format!("summaries-{}", worktree_abs_path.to_string_lossy()); - db_connection.create_database(&mut txn, Some(&db_name))? - }; - SummaryIndex::new( - worktree_for_summary, - summary_fs, - db_connection.clone(), - file_digest_db, - summary_db, - Arc::clone(&entries_being_indexed), - ) - }; - txn.commit()?; - anyhow::Ok((embedding_index, summary_index)) - } - }) - .await?; - - cx.new(|cx| { - Self::new( - worktree, - db_connection, - embedding_index, - summary_index, - entries_being_indexed, - cx, - ) - }) - }) - } - - pub fn new( - worktree: Entity, - db_connection: heed::Env, - embedding_index: EmbeddingIndex, - summary_index: SummaryIndex, - entry_ids_being_indexed: Arc, - cx: &mut Context, - ) -> Self { - let (updated_entries_tx, updated_entries_rx) = channel::unbounded(); - let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| { - if let worktree::Event::UpdatedEntries(update) = event { - log::debug!("Updating entries..."); - _ = updated_entries_tx.try_send(update.clone()); - } - }); - - Self { - db_connection, - embedding_index, - summary_index, - worktree, - entry_ids_being_indexed, - _index_entries: cx.spawn(async move |this, cx| { - Self::index_entries(this, updated_entries_rx, cx).await - }), - _subscription, - } - } - - pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet { - self.entry_ids_being_indexed.as_ref() - } - - pub fn worktree(&self) -> &Entity { - &self.worktree - } - - pub fn db_connection(&self) -> &heed::Env { - &self.db_connection - } - - pub fn embedding_index(&self) -> &EmbeddingIndex { - &self.embedding_index - } - - pub fn summary_index(&self) -> &SummaryIndex { - &self.summary_index - } - - async fn index_entries( - this: WeakEntity, - updated_entries: channel::Receiver, - cx: &mut AsyncApp, - ) -> Result<()> { - let index = this.update(cx, |this, cx| { - futures::future::try_join( - this.embedding_index.index_entries_changed_on_disk(cx), - this.summary_index.index_entries_changed_on_disk(false, cx), - ) - })?; - index.await.log_err(); - - while let Ok(updated_entries) = updated_entries.recv().await { - let index = this.update(cx, |this, cx| { - futures::future::try_join( - this.embedding_index - .index_updated_entries(updated_entries.clone(), cx), - this.summary_index - .index_updated_entries(updated_entries, false, cx), - ) - })?; - index.await.log_err(); - } - - Ok(()) - } - - #[cfg(test)] - pub fn path_count(&self) -> Result { - use anyhow::Context as _; - - let txn = self - .db_connection - .read_txn() - .context("failed to create read transaction")?; - Ok(self.embedding_index().db().len(&txn)?) - } -}