1use crate::{
2 embedding::{EmbeddingProvider, TextToEmbed},
3 summary_index::FileSummary,
4 worktree_index::{WorktreeIndex, WorktreeIndexHandle},
5};
6use anyhow::{anyhow, Context as _, Result};
7use collections::HashMap;
8use fs::Fs;
9use futures::FutureExt;
10use gpui::{App, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity};
11use language::LanguageRegistry;
12use log;
13use project::{Project, Worktree, WorktreeId};
14use serde::{Deserialize, Serialize};
15use smol::channel;
16use std::{
17 cmp::Ordering,
18 future::Future,
19 num::NonZeroUsize,
20 ops::{Range, RangeInclusive},
21 path::{Path, PathBuf},
22 sync::Arc,
23};
24use util::ResultExt;
25
26#[derive(Debug)]
27pub struct SearchResult {
28 pub worktree: Entity<Worktree>,
29 pub path: Arc<Path>,
30 pub range: Range<usize>,
31 pub score: f32,
32 pub query_index: usize,
33}
34
35#[derive(Debug, PartialEq, Eq)]
36pub struct LoadedSearchResult {
37 pub path: Arc<Path>,
38 pub full_path: PathBuf,
39 pub excerpt_content: String,
40 pub row_range: RangeInclusive<u32>,
41 pub query_index: usize,
42}
43
44pub struct WorktreeSearchResult {
45 pub worktree_id: WorktreeId,
46 pub path: Arc<Path>,
47 pub range: Range<usize>,
48 pub query_index: usize,
49 pub score: f32,
50}
51
52#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
53pub enum Status {
54 Idle,
55 Loading,
56 Scanning { remaining_count: NonZeroUsize },
57}
58
59pub struct ProjectIndex {
60 db_connection: heed::Env,
61 project: WeakEntity<Project>,
62 worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
63 language_registry: Arc<LanguageRegistry>,
64 fs: Arc<dyn Fs>,
65 last_status: Status,
66 status_tx: channel::Sender<()>,
67 embedding_provider: Arc<dyn EmbeddingProvider>,
68 _maintain_status: Task<()>,
69 _subscription: Subscription,
70}
71
72impl ProjectIndex {
73 pub fn new(
74 project: Entity<Project>,
75 db_connection: heed::Env,
76 embedding_provider: Arc<dyn EmbeddingProvider>,
77 cx: &mut Context<Self>,
78 ) -> Self {
79 let language_registry = project.read(cx).languages().clone();
80 let fs = project.read(cx).fs().clone();
81 let (status_tx, status_rx) = channel::unbounded();
82 let mut this = ProjectIndex {
83 db_connection,
84 project: project.downgrade(),
85 worktree_indices: HashMap::default(),
86 language_registry,
87 fs,
88 status_tx,
89 last_status: Status::Idle,
90 embedding_provider,
91 _subscription: cx.subscribe(&project, Self::handle_project_event),
92 _maintain_status: cx.spawn(|this, mut cx| async move {
93 while status_rx.recv().await.is_ok() {
94 if this
95 .update(&mut cx, |this, cx| this.update_status(cx))
96 .is_err()
97 {
98 break;
99 }
100 }
101 }),
102 };
103 this.update_worktree_indices(cx);
104 this
105 }
106
107 pub fn status(&self) -> Status {
108 self.last_status
109 }
110
111 pub fn project(&self) -> WeakEntity<Project> {
112 self.project.clone()
113 }
114
115 pub fn fs(&self) -> Arc<dyn Fs> {
116 self.fs.clone()
117 }
118
119 fn handle_project_event(
120 &mut self,
121 _: Entity<Project>,
122 event: &project::Event,
123 cx: &mut Context<Self>,
124 ) {
125 match event {
126 project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
127 self.update_worktree_indices(cx);
128 }
129 _ => {}
130 }
131 }
132
133 fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
134 let Some(project) = self.project.upgrade() else {
135 return;
136 };
137
138 let worktrees = project
139 .read(cx)
140 .visible_worktrees(cx)
141 .filter_map(|worktree| {
142 if worktree.read(cx).is_local() {
143 Some((worktree.entity_id(), worktree))
144 } else {
145 None
146 }
147 })
148 .collect::<HashMap<_, _>>();
149
150 self.worktree_indices
151 .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
152 for (worktree_id, worktree) in worktrees {
153 self.worktree_indices.entry(worktree_id).or_insert_with(|| {
154 let worktree_index = WorktreeIndex::load(
155 worktree.clone(),
156 self.db_connection.clone(),
157 self.language_registry.clone(),
158 self.fs.clone(),
159 self.status_tx.clone(),
160 self.embedding_provider.clone(),
161 cx,
162 );
163
164 let load_worktree = cx.spawn(|this, mut cx| async move {
165 let result = match worktree_index.await {
166 Ok(worktree_index) => {
167 this.update(&mut cx, |this, _| {
168 this.worktree_indices.insert(
169 worktree_id,
170 WorktreeIndexHandle::Loaded {
171 index: worktree_index.clone(),
172 },
173 );
174 })?;
175 Ok(worktree_index)
176 }
177 Err(error) => {
178 this.update(&mut cx, |this, _cx| {
179 this.worktree_indices.remove(&worktree_id)
180 })?;
181 Err(Arc::new(error))
182 }
183 };
184
185 this.update(&mut cx, |this, cx| this.update_status(cx))?;
186
187 result
188 });
189
190 WorktreeIndexHandle::Loading {
191 index: load_worktree.shared(),
192 }
193 });
194 }
195
196 self.update_status(cx);
197 }
198
199 fn update_status(&mut self, cx: &mut Context<Self>) {
200 let mut indexing_count = 0;
201 let mut any_loading = false;
202
203 for index in self.worktree_indices.values_mut() {
204 match index {
205 WorktreeIndexHandle::Loading { .. } => {
206 any_loading = true;
207 break;
208 }
209 WorktreeIndexHandle::Loaded { index, .. } => {
210 indexing_count += index.read(cx).entry_ids_being_indexed().len();
211 }
212 }
213 }
214
215 let status = if any_loading {
216 Status::Loading
217 } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
218 Status::Scanning { remaining_count }
219 } else {
220 Status::Idle
221 };
222
223 if status != self.last_status {
224 self.last_status = status;
225 cx.emit(status);
226 }
227 }
228
229 pub fn search(
230 &self,
231 queries: Vec<String>,
232 limit: usize,
233 cx: &App,
234 ) -> Task<Result<Vec<SearchResult>>> {
235 let (chunks_tx, chunks_rx) = channel::bounded(1024);
236 let mut worktree_scan_tasks = Vec::new();
237 for worktree_index in self.worktree_indices.values() {
238 let worktree_index = worktree_index.clone();
239 let chunks_tx = chunks_tx.clone();
240 worktree_scan_tasks.push(cx.spawn(|cx| async move {
241 let index = match worktree_index {
242 WorktreeIndexHandle::Loading { index } => {
243 index.clone().await.map_err(|error| anyhow!(error))?
244 }
245 WorktreeIndexHandle::Loaded { index } => index.clone(),
246 };
247
248 index
249 .read_with(&cx, |index, cx| {
250 let worktree_id = index.worktree().read(cx).id();
251 let db_connection = index.db_connection().clone();
252 let db = *index.embedding_index().db();
253 cx.background_executor().spawn(async move {
254 let txn = db_connection
255 .read_txn()
256 .context("failed to create read transaction")?;
257 let db_entries = db.iter(&txn).context("failed to iterate database")?;
258 for db_entry in db_entries {
259 let (_key, db_embedded_file) = db_entry?;
260 for chunk in db_embedded_file.chunks {
261 chunks_tx
262 .send((worktree_id, db_embedded_file.path.clone(), chunk))
263 .await?;
264 }
265 }
266 anyhow::Ok(())
267 })
268 })?
269 .await
270 }));
271 }
272 drop(chunks_tx);
273
274 let project = self.project.clone();
275 let embedding_provider = self.embedding_provider.clone();
276 cx.spawn(|cx| async move {
277 #[cfg(debug_assertions)]
278 let embedding_query_start = std::time::Instant::now();
279 log::info!("Searching for {queries:?}");
280 let queries: Vec<TextToEmbed> = queries
281 .iter()
282 .map(|s| TextToEmbed::new(s.as_str()))
283 .collect();
284
285 let query_embeddings = embedding_provider.embed(&queries[..]).await?;
286 if query_embeddings.len() != queries.len() {
287 return Err(anyhow!(
288 "The number of query embeddings does not match the number of queries"
289 ));
290 }
291
292 let mut results_by_worker = Vec::new();
293 for _ in 0..cx.background_executor().num_cpus() {
294 results_by_worker.push(Vec::<WorktreeSearchResult>::new());
295 }
296
297 #[cfg(debug_assertions)]
298 let search_start = std::time::Instant::now();
299 cx.background_executor()
300 .scoped(|cx| {
301 for results in results_by_worker.iter_mut() {
302 cx.spawn(async {
303 while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
304 let (score, query_index) =
305 chunk.embedding.similarity(&query_embeddings);
306
307 let ix = match results.binary_search_by(|probe| {
308 score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
309 }) {
310 Ok(ix) | Err(ix) => ix,
311 };
312 if ix < limit {
313 results.insert(
314 ix,
315 WorktreeSearchResult {
316 worktree_id,
317 path: path.clone(),
318 range: chunk.chunk.range.clone(),
319 query_index,
320 score,
321 },
322 );
323 if results.len() > limit {
324 results.pop();
325 }
326 }
327 }
328 });
329 }
330 })
331 .await;
332
333 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
334 scan_task.log_err();
335 }
336
337 project.read_with(&cx, |project, cx| {
338 let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
339 for worker_results in results_by_worker {
340 search_results.extend(worker_results.into_iter().filter_map(|result| {
341 Some(SearchResult {
342 worktree: project.worktree_for_id(result.worktree_id, cx)?,
343 path: result.path,
344 range: result.range,
345 score: result.score,
346 query_index: result.query_index,
347 })
348 }));
349 }
350 search_results.sort_unstable_by(|a, b| {
351 b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
352 });
353 search_results.truncate(limit);
354
355 #[cfg(debug_assertions)]
356 {
357 let search_elapsed = search_start.elapsed();
358 log::debug!(
359 "searched {} entries in {:?}",
360 search_results.len(),
361 search_elapsed
362 );
363 let embedding_query_elapsed = embedding_query_start.elapsed();
364 log::debug!("embedding query took {:?}", embedding_query_elapsed);
365 }
366
367 search_results
368 })
369 })
370 }
371
372 #[cfg(test)]
373 pub fn path_count(&self, cx: &App) -> Result<u64> {
374 let mut result = 0;
375 for worktree_index in self.worktree_indices.values() {
376 if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
377 result += index.read(cx).path_count()?;
378 }
379 }
380 Ok(result)
381 }
382
383 pub(crate) fn worktree_index(
384 &self,
385 worktree_id: WorktreeId,
386 cx: &App,
387 ) -> Option<Entity<WorktreeIndex>> {
388 for index in self.worktree_indices.values() {
389 if let WorktreeIndexHandle::Loaded { index, .. } = index {
390 if index.read(cx).worktree().read(cx).id() == worktree_id {
391 return Some(index.clone());
392 }
393 }
394 }
395 None
396 }
397
398 pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
399 let mut result = self
400 .worktree_indices
401 .values()
402 .filter_map(|index| {
403 if let WorktreeIndexHandle::Loaded { index, .. } = index {
404 Some(index.clone())
405 } else {
406 None
407 }
408 })
409 .collect::<Vec<_>>();
410 result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
411 result
412 }
413
414 pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
415 let (summaries_tx, summaries_rx) = channel::bounded(1024);
416 let mut worktree_scan_tasks = Vec::new();
417 for worktree_index in self.worktree_indices.values() {
418 let worktree_index = worktree_index.clone();
419 let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
420 worktree_scan_tasks.push(cx.spawn(|cx| async move {
421 let index = match worktree_index {
422 WorktreeIndexHandle::Loading { index } => {
423 index.clone().await.map_err(|error| anyhow!(error))?
424 }
425 WorktreeIndexHandle::Loaded { index } => index.clone(),
426 };
427
428 index
429 .read_with(&cx, |index, cx| {
430 let db_connection = index.db_connection().clone();
431 let summary_index = index.summary_index();
432 let file_digest_db = summary_index.file_digest_db();
433 let summary_db = summary_index.summary_db();
434
435 cx.background_executor().spawn(async move {
436 let txn = db_connection
437 .read_txn()
438 .context("failed to create db read transaction")?;
439 let db_entries = file_digest_db
440 .iter(&txn)
441 .context("failed to iterate database")?;
442 for db_entry in db_entries {
443 let (file_path, db_file) = db_entry?;
444
445 match summary_db.get(&txn, &db_file.digest) {
446 Ok(opt_summary) => {
447 // Currently, we only use summaries we already have. If the file hasn't been
448 // summarized yet, then we skip it and don't include it in the inferred context.
449 // If we want to do just-in-time summarization, this would be the place to do it!
450 if let Some(summary) = opt_summary {
451 summaries_tx
452 .send((file_path.to_string(), summary.to_string()))
453 .await?;
454 } else {
455 log::warn!("No summary found for {:?}", &db_file);
456 }
457 }
458 Err(err) => {
459 log::error!(
460 "Error reading from summary database: {:?}",
461 err
462 );
463 }
464 }
465 }
466 anyhow::Ok(())
467 })
468 })?
469 .await
470 }));
471 }
472 drop(summaries_tx);
473
474 let project = self.project.clone();
475 cx.spawn(|cx| async move {
476 let mut results_by_worker = Vec::new();
477 for _ in 0..cx.background_executor().num_cpus() {
478 results_by_worker.push(Vec::<FileSummary>::new());
479 }
480
481 cx.background_executor()
482 .scoped(|cx| {
483 for results in results_by_worker.iter_mut() {
484 cx.spawn(async {
485 while let Ok((filename, summary)) = summaries_rx.recv().await {
486 results.push(FileSummary { filename, summary });
487 }
488 });
489 }
490 })
491 .await;
492
493 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
494 scan_task.log_err();
495 }
496
497 project.read_with(&cx, |_project, _cx| {
498 results_by_worker.into_iter().flatten().collect()
499 })
500 })
501 }
502
503 /// Empty out the backlogs of all the worktrees in the project
504 pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
505 let flush_start = std::time::Instant::now();
506
507 futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
508 let worktree_index = worktree_index.clone();
509
510 cx.spawn(|cx| async move {
511 let index = match worktree_index {
512 WorktreeIndexHandle::Loading { index } => {
513 index.clone().await.map_err(|error| anyhow!(error))?
514 }
515 WorktreeIndexHandle::Loaded { index } => index.clone(),
516 };
517 let worktree_abs_path =
518 cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
519
520 index
521 .read_with(&cx, |index, cx| {
522 cx.background_executor()
523 .spawn(index.summary_index().flush_backlog(worktree_abs_path, cx))
524 })?
525 .await
526 })
527 }))
528 .map(move |results| {
529 // Log any errors, but don't block the user. These summaries are supposed to
530 // improve quality by providing extra context, but they aren't hard requirements!
531 for result in results {
532 if let Err(err) = result {
533 log::error!("Error flushing summary backlog: {:?}", err);
534 }
535 }
536
537 log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
538 })
539 }
540
541 pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
542 self.worktree_indices(cx)
543 .iter()
544 .map(|index| index.read(cx).summary_index().backlog_len())
545 .sum()
546 }
547}
548
549impl EventEmitter<Status> for ProjectIndex {}