1use crate::{
2 embedding::{EmbeddingProvider, TextToEmbed},
3 summary_index::FileSummary,
4 worktree_index::{WorktreeIndex, WorktreeIndexHandle},
5};
6use anyhow::{anyhow, Context as _, Result};
7use collections::HashMap;
8use fs::Fs;
9use futures::FutureExt;
10use gpui::{
11 App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity,
12};
13use language::LanguageRegistry;
14use log;
15use project::{Project, Worktree, WorktreeId};
16use serde::{Deserialize, Serialize};
17use smol::channel;
18use std::{
19 cmp::Ordering,
20 future::Future,
21 num::NonZeroUsize,
22 ops::{Range, RangeInclusive},
23 path::{Path, PathBuf},
24 sync::Arc,
25};
26use util::ResultExt;
27
28#[derive(Debug)]
29pub struct SearchResult {
30 pub worktree: Entity<Worktree>,
31 pub path: Arc<Path>,
32 pub range: Range<usize>,
33 pub score: f32,
34 pub query_index: usize,
35}
36
37#[derive(Debug, PartialEq, Eq)]
38pub struct LoadedSearchResult {
39 pub path: Arc<Path>,
40 pub full_path: PathBuf,
41 pub excerpt_content: String,
42 pub row_range: RangeInclusive<u32>,
43 pub query_index: usize,
44}
45
46pub struct WorktreeSearchResult {
47 pub worktree_id: WorktreeId,
48 pub path: Arc<Path>,
49 pub range: Range<usize>,
50 pub query_index: usize,
51 pub score: f32,
52}
53
54#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
55pub enum Status {
56 Idle,
57 Loading,
58 Scanning { remaining_count: NonZeroUsize },
59}
60
61pub struct ProjectIndex {
62 db_connection: heed::Env,
63 project: WeakEntity<Project>,
64 worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
65 language_registry: Arc<LanguageRegistry>,
66 fs: Arc<dyn Fs>,
67 last_status: Status,
68 status_tx: channel::Sender<()>,
69 embedding_provider: Arc<dyn EmbeddingProvider>,
70 _maintain_status: Task<()>,
71 _subscription: Subscription,
72}
73
74impl ProjectIndex {
75 pub fn new(
76 project: Entity<Project>,
77 db_connection: heed::Env,
78 embedding_provider: Arc<dyn EmbeddingProvider>,
79 cx: &mut Context<Self>,
80 ) -> Self {
81 let language_registry = project.read(cx).languages().clone();
82 let fs = project.read(cx).fs().clone();
83 let (status_tx, status_rx) = channel::unbounded();
84 let mut this = ProjectIndex {
85 db_connection,
86 project: project.downgrade(),
87 worktree_indices: HashMap::default(),
88 language_registry,
89 fs,
90 status_tx,
91 last_status: Status::Idle,
92 embedding_provider,
93 _subscription: cx.subscribe(&project, Self::handle_project_event),
94 _maintain_status: cx.spawn(|this, mut cx| async move {
95 while status_rx.recv().await.is_ok() {
96 if this
97 .update(&mut cx, |this, cx| this.update_status(cx))
98 .is_err()
99 {
100 break;
101 }
102 }
103 }),
104 };
105 this.update_worktree_indices(cx);
106 this
107 }
108
109 pub fn status(&self) -> Status {
110 self.last_status
111 }
112
113 pub fn project(&self) -> WeakEntity<Project> {
114 self.project.clone()
115 }
116
117 pub fn fs(&self) -> Arc<dyn Fs> {
118 self.fs.clone()
119 }
120
121 fn handle_project_event(
122 &mut self,
123 _: Entity<Project>,
124 event: &project::Event,
125 cx: &mut Context<Self>,
126 ) {
127 match event {
128 project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
129 self.update_worktree_indices(cx);
130 }
131 _ => {}
132 }
133 }
134
135 fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
136 let Some(project) = self.project.upgrade() else {
137 return;
138 };
139
140 let worktrees = project
141 .read(cx)
142 .visible_worktrees(cx)
143 .filter_map(|worktree| {
144 if worktree.read(cx).is_local() {
145 Some((worktree.entity_id(), worktree))
146 } else {
147 None
148 }
149 })
150 .collect::<HashMap<_, _>>();
151
152 self.worktree_indices
153 .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
154 for (worktree_id, worktree) in worktrees {
155 self.worktree_indices.entry(worktree_id).or_insert_with(|| {
156 let worktree_index = WorktreeIndex::load(
157 worktree.clone(),
158 self.db_connection.clone(),
159 self.language_registry.clone(),
160 self.fs.clone(),
161 self.status_tx.clone(),
162 self.embedding_provider.clone(),
163 cx,
164 );
165
166 let load_worktree = cx.spawn(|this, mut cx| async move {
167 let result = match worktree_index.await {
168 Ok(worktree_index) => {
169 this.update(&mut cx, |this, _| {
170 this.worktree_indices.insert(
171 worktree_id,
172 WorktreeIndexHandle::Loaded {
173 index: worktree_index.clone(),
174 },
175 );
176 })?;
177 Ok(worktree_index)
178 }
179 Err(error) => {
180 this.update(&mut cx, |this, _cx| {
181 this.worktree_indices.remove(&worktree_id)
182 })?;
183 Err(Arc::new(error))
184 }
185 };
186
187 this.update(&mut cx, |this, cx| this.update_status(cx))?;
188
189 result
190 });
191
192 WorktreeIndexHandle::Loading {
193 index: load_worktree.shared(),
194 }
195 });
196 }
197
198 self.update_status(cx);
199 }
200
201 fn update_status(&mut self, cx: &mut Context<Self>) {
202 let mut indexing_count = 0;
203 let mut any_loading = false;
204
205 for index in self.worktree_indices.values_mut() {
206 match index {
207 WorktreeIndexHandle::Loading { .. } => {
208 any_loading = true;
209 break;
210 }
211 WorktreeIndexHandle::Loaded { index, .. } => {
212 indexing_count += index.read(cx).entry_ids_being_indexed().len();
213 }
214 }
215 }
216
217 let status = if any_loading {
218 Status::Loading
219 } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
220 Status::Scanning { remaining_count }
221 } else {
222 Status::Idle
223 };
224
225 if status != self.last_status {
226 self.last_status = status;
227 cx.emit(status);
228 }
229 }
230
231 pub fn search(
232 &self,
233 queries: Vec<String>,
234 limit: usize,
235 cx: &App,
236 ) -> Task<Result<Vec<SearchResult>>> {
237 let (chunks_tx, chunks_rx) = channel::bounded(1024);
238 let mut worktree_scan_tasks = Vec::new();
239 for worktree_index in self.worktree_indices.values() {
240 let worktree_index = worktree_index.clone();
241 let chunks_tx = chunks_tx.clone();
242 worktree_scan_tasks.push(cx.spawn(|cx| async move {
243 let index = match worktree_index {
244 WorktreeIndexHandle::Loading { index } => {
245 index.clone().await.map_err(|error| anyhow!(error))?
246 }
247 WorktreeIndexHandle::Loaded { index } => index.clone(),
248 };
249
250 index
251 .read_with(&cx, |index, cx| {
252 let worktree_id = index.worktree().read(cx).id();
253 let db_connection = index.db_connection().clone();
254 let db = *index.embedding_index().db();
255 cx.background_spawn(async move {
256 let txn = db_connection
257 .read_txn()
258 .context("failed to create read transaction")?;
259 let db_entries = db.iter(&txn).context("failed to iterate database")?;
260 for db_entry in db_entries {
261 let (_key, db_embedded_file) = db_entry?;
262 for chunk in db_embedded_file.chunks {
263 chunks_tx
264 .send((worktree_id, db_embedded_file.path.clone(), chunk))
265 .await?;
266 }
267 }
268 anyhow::Ok(())
269 })
270 })?
271 .await
272 }));
273 }
274 drop(chunks_tx);
275
276 let project = self.project.clone();
277 let embedding_provider = self.embedding_provider.clone();
278 cx.spawn(|cx| async move {
279 #[cfg(debug_assertions)]
280 let embedding_query_start = std::time::Instant::now();
281 log::info!("Searching for {queries:?}");
282 let queries: Vec<TextToEmbed> = queries
283 .iter()
284 .map(|s| TextToEmbed::new(s.as_str()))
285 .collect();
286
287 let query_embeddings = embedding_provider.embed(&queries[..]).await?;
288 if query_embeddings.len() != queries.len() {
289 return Err(anyhow!(
290 "The number of query embeddings does not match the number of queries"
291 ));
292 }
293
294 let mut results_by_worker = Vec::new();
295 for _ in 0..cx.background_executor().num_cpus() {
296 results_by_worker.push(Vec::<WorktreeSearchResult>::new());
297 }
298
299 #[cfg(debug_assertions)]
300 let search_start = std::time::Instant::now();
301 cx.background_executor()
302 .scoped(|cx| {
303 for results in results_by_worker.iter_mut() {
304 cx.spawn(async {
305 while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
306 let (score, query_index) =
307 chunk.embedding.similarity(&query_embeddings);
308
309 let ix = match results.binary_search_by(|probe| {
310 score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
311 }) {
312 Ok(ix) | Err(ix) => ix,
313 };
314 if ix < limit {
315 results.insert(
316 ix,
317 WorktreeSearchResult {
318 worktree_id,
319 path: path.clone(),
320 range: chunk.chunk.range.clone(),
321 query_index,
322 score,
323 },
324 );
325 if results.len() > limit {
326 results.pop();
327 }
328 }
329 }
330 });
331 }
332 })
333 .await;
334
335 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
336 scan_task.log_err();
337 }
338
339 project.read_with(&cx, |project, cx| {
340 let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
341 for worker_results in results_by_worker {
342 search_results.extend(worker_results.into_iter().filter_map(|result| {
343 Some(SearchResult {
344 worktree: project.worktree_for_id(result.worktree_id, cx)?,
345 path: result.path,
346 range: result.range,
347 score: result.score,
348 query_index: result.query_index,
349 })
350 }));
351 }
352 search_results.sort_unstable_by(|a, b| {
353 b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
354 });
355 search_results.truncate(limit);
356
357 #[cfg(debug_assertions)]
358 {
359 let search_elapsed = search_start.elapsed();
360 log::debug!(
361 "searched {} entries in {:?}",
362 search_results.len(),
363 search_elapsed
364 );
365 let embedding_query_elapsed = embedding_query_start.elapsed();
366 log::debug!("embedding query took {:?}", embedding_query_elapsed);
367 }
368
369 search_results
370 })
371 })
372 }
373
374 #[cfg(test)]
375 pub fn path_count(&self, cx: &App) -> Result<u64> {
376 let mut result = 0;
377 for worktree_index in self.worktree_indices.values() {
378 if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
379 result += index.read(cx).path_count()?;
380 }
381 }
382 Ok(result)
383 }
384
385 pub(crate) fn worktree_index(
386 &self,
387 worktree_id: WorktreeId,
388 cx: &App,
389 ) -> Option<Entity<WorktreeIndex>> {
390 for index in self.worktree_indices.values() {
391 if let WorktreeIndexHandle::Loaded { index, .. } = index {
392 if index.read(cx).worktree().read(cx).id() == worktree_id {
393 return Some(index.clone());
394 }
395 }
396 }
397 None
398 }
399
400 pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
401 let mut result = self
402 .worktree_indices
403 .values()
404 .filter_map(|index| {
405 if let WorktreeIndexHandle::Loaded { index, .. } = index {
406 Some(index.clone())
407 } else {
408 None
409 }
410 })
411 .collect::<Vec<_>>();
412 result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
413 result
414 }
415
416 pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
417 let (summaries_tx, summaries_rx) = channel::bounded(1024);
418 let mut worktree_scan_tasks = Vec::new();
419 for worktree_index in self.worktree_indices.values() {
420 let worktree_index = worktree_index.clone();
421 let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
422 worktree_scan_tasks.push(cx.spawn(|cx| async move {
423 let index = match worktree_index {
424 WorktreeIndexHandle::Loading { index } => {
425 index.clone().await.map_err(|error| anyhow!(error))?
426 }
427 WorktreeIndexHandle::Loaded { index } => index.clone(),
428 };
429
430 index
431 .read_with(&cx, |index, cx| {
432 let db_connection = index.db_connection().clone();
433 let summary_index = index.summary_index();
434 let file_digest_db = summary_index.file_digest_db();
435 let summary_db = summary_index.summary_db();
436
437 cx.background_spawn(async move {
438 let txn = db_connection
439 .read_txn()
440 .context("failed to create db read transaction")?;
441 let db_entries = file_digest_db
442 .iter(&txn)
443 .context("failed to iterate database")?;
444 for db_entry in db_entries {
445 let (file_path, db_file) = db_entry?;
446
447 match summary_db.get(&txn, &db_file.digest) {
448 Ok(opt_summary) => {
449 // Currently, we only use summaries we already have. If the file hasn't been
450 // summarized yet, then we skip it and don't include it in the inferred context.
451 // If we want to do just-in-time summarization, this would be the place to do it!
452 if let Some(summary) = opt_summary {
453 summaries_tx
454 .send((file_path.to_string(), summary.to_string()))
455 .await?;
456 } else {
457 log::warn!("No summary found for {:?}", &db_file);
458 }
459 }
460 Err(err) => {
461 log::error!(
462 "Error reading from summary database: {:?}",
463 err
464 );
465 }
466 }
467 }
468 anyhow::Ok(())
469 })
470 })?
471 .await
472 }));
473 }
474 drop(summaries_tx);
475
476 let project = self.project.clone();
477 cx.spawn(|cx| async move {
478 let mut results_by_worker = Vec::new();
479 for _ in 0..cx.background_executor().num_cpus() {
480 results_by_worker.push(Vec::<FileSummary>::new());
481 }
482
483 cx.background_executor()
484 .scoped(|cx| {
485 for results in results_by_worker.iter_mut() {
486 cx.spawn(async {
487 while let Ok((filename, summary)) = summaries_rx.recv().await {
488 results.push(FileSummary { filename, summary });
489 }
490 });
491 }
492 })
493 .await;
494
495 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
496 scan_task.log_err();
497 }
498
499 project.read_with(&cx, |_project, _cx| {
500 results_by_worker.into_iter().flatten().collect()
501 })
502 })
503 }
504
505 /// Empty out the backlogs of all the worktrees in the project
506 pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
507 let flush_start = std::time::Instant::now();
508
509 futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
510 let worktree_index = worktree_index.clone();
511
512 cx.spawn(|cx| async move {
513 let index = match worktree_index {
514 WorktreeIndexHandle::Loading { index } => {
515 index.clone().await.map_err(|error| anyhow!(error))?
516 }
517 WorktreeIndexHandle::Loaded { index } => index.clone(),
518 };
519 let worktree_abs_path =
520 cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
521
522 index
523 .read_with(&cx, |index, cx| {
524 cx.background_spawn(
525 index.summary_index().flush_backlog(worktree_abs_path, cx),
526 )
527 })?
528 .await
529 })
530 }))
531 .map(move |results| {
532 // Log any errors, but don't block the user. These summaries are supposed to
533 // improve quality by providing extra context, but they aren't hard requirements!
534 for result in results {
535 if let Err(err) = result {
536 log::error!("Error flushing summary backlog: {:?}", err);
537 }
538 }
539
540 log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
541 })
542 }
543
544 pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
545 self.worktree_indices(cx)
546 .iter()
547 .map(|index| index.read(cx).summary_index().backlog_len())
548 .sum()
549 }
550}
551
552impl EventEmitter<Status> for ProjectIndex {}