1use crate::{
2 embedding::{EmbeddingProvider, TextToEmbed},
3 summary_index::FileSummary,
4 worktree_index::{WorktreeIndex, WorktreeIndexHandle},
5};
6use anyhow::{Context as _, Result, anyhow};
7use collections::HashMap;
8use fs::Fs;
9use futures::FutureExt;
10use gpui::{
11 App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity,
12};
13use language::LanguageRegistry;
14use log;
15use project::{Project, Worktree, WorktreeId};
16use serde::{Deserialize, Serialize};
17use smol::channel;
18use std::{
19 cmp::Ordering,
20 future::Future,
21 num::NonZeroUsize,
22 ops::{Range, RangeInclusive},
23 path::{Path, PathBuf},
24 sync::Arc,
25};
26use util::ResultExt;
27
28#[derive(Debug)]
29pub struct SearchResult {
30 pub worktree: Entity<Worktree>,
31 pub path: Arc<Path>,
32 pub range: Range<usize>,
33 pub score: f32,
34 pub query_index: usize,
35}
36
37#[derive(Debug, PartialEq, Eq)]
38pub struct LoadedSearchResult {
39 pub path: Arc<Path>,
40 pub full_path: PathBuf,
41 pub excerpt_content: String,
42 pub row_range: RangeInclusive<u32>,
43 pub query_index: usize,
44}
45
46pub struct WorktreeSearchResult {
47 pub worktree_id: WorktreeId,
48 pub path: Arc<Path>,
49 pub range: Range<usize>,
50 pub query_index: usize,
51 pub score: f32,
52}
53
54#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
55pub enum Status {
56 Idle,
57 Loading,
58 Scanning { remaining_count: NonZeroUsize },
59}
60
61pub struct ProjectIndex {
62 db_connection: heed::Env,
63 project: WeakEntity<Project>,
64 worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
65 language_registry: Arc<LanguageRegistry>,
66 fs: Arc<dyn Fs>,
67 last_status: Status,
68 status_tx: channel::Sender<()>,
69 embedding_provider: Arc<dyn EmbeddingProvider>,
70 _maintain_status: Task<()>,
71 _subscription: Subscription,
72}
73
74impl ProjectIndex {
75 pub fn new(
76 project: Entity<Project>,
77 db_connection: heed::Env,
78 embedding_provider: Arc<dyn EmbeddingProvider>,
79 cx: &mut Context<Self>,
80 ) -> Self {
81 let language_registry = project.read(cx).languages().clone();
82 let fs = project.read(cx).fs().clone();
83 let (status_tx, status_rx) = channel::unbounded();
84 let mut this = ProjectIndex {
85 db_connection,
86 project: project.downgrade(),
87 worktree_indices: HashMap::default(),
88 language_registry,
89 fs,
90 status_tx,
91 last_status: Status::Idle,
92 embedding_provider,
93 _subscription: cx.subscribe(&project, Self::handle_project_event),
94 _maintain_status: cx.spawn(async move |this, cx| {
95 while status_rx.recv().await.is_ok() {
96 if this.update(cx, |this, cx| this.update_status(cx)).is_err() {
97 break;
98 }
99 }
100 }),
101 };
102 this.update_worktree_indices(cx);
103 this
104 }
105
106 pub fn status(&self) -> Status {
107 self.last_status
108 }
109
110 pub fn project(&self) -> WeakEntity<Project> {
111 self.project.clone()
112 }
113
114 pub fn fs(&self) -> Arc<dyn Fs> {
115 self.fs.clone()
116 }
117
118 fn handle_project_event(
119 &mut self,
120 _: Entity<Project>,
121 event: &project::Event,
122 cx: &mut Context<Self>,
123 ) {
124 match event {
125 project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
126 self.update_worktree_indices(cx);
127 }
128 _ => {}
129 }
130 }
131
132 fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
133 let Some(project) = self.project.upgrade() else {
134 return;
135 };
136
137 let worktrees = project
138 .read(cx)
139 .visible_worktrees(cx)
140 .filter_map(|worktree| {
141 if worktree.read(cx).is_local() {
142 Some((worktree.entity_id(), worktree))
143 } else {
144 None
145 }
146 })
147 .collect::<HashMap<_, _>>();
148
149 self.worktree_indices
150 .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
151 for (worktree_id, worktree) in worktrees {
152 self.worktree_indices.entry(worktree_id).or_insert_with(|| {
153 let worktree_index = WorktreeIndex::load(
154 worktree.clone(),
155 self.db_connection.clone(),
156 self.language_registry.clone(),
157 self.fs.clone(),
158 self.status_tx.clone(),
159 self.embedding_provider.clone(),
160 cx,
161 );
162
163 let load_worktree = cx.spawn(async move |this, cx| {
164 let result = match worktree_index.await {
165 Ok(worktree_index) => {
166 this.update(cx, |this, _| {
167 this.worktree_indices.insert(
168 worktree_id,
169 WorktreeIndexHandle::Loaded {
170 index: worktree_index.clone(),
171 },
172 );
173 })?;
174 Ok(worktree_index)
175 }
176 Err(error) => {
177 this.update(cx, |this, _cx| {
178 this.worktree_indices.remove(&worktree_id)
179 })?;
180 Err(Arc::new(error))
181 }
182 };
183
184 this.update(cx, |this, cx| this.update_status(cx))?;
185
186 result
187 });
188
189 WorktreeIndexHandle::Loading {
190 index: load_worktree.shared(),
191 }
192 });
193 }
194
195 self.update_status(cx);
196 }
197
198 fn update_status(&mut self, cx: &mut Context<Self>) {
199 let mut indexing_count = 0;
200 let mut any_loading = false;
201
202 for index in self.worktree_indices.values_mut() {
203 match index {
204 WorktreeIndexHandle::Loading { .. } => {
205 any_loading = true;
206 break;
207 }
208 WorktreeIndexHandle::Loaded { index, .. } => {
209 indexing_count += index.read(cx).entry_ids_being_indexed().len();
210 }
211 }
212 }
213
214 let status = if any_loading {
215 Status::Loading
216 } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
217 Status::Scanning { remaining_count }
218 } else {
219 Status::Idle
220 };
221
222 if status != self.last_status {
223 self.last_status = status;
224 cx.emit(status);
225 }
226 }
227
228 pub fn search(
229 &self,
230 queries: Vec<String>,
231 limit: usize,
232 cx: &App,
233 ) -> Task<Result<Vec<SearchResult>>> {
234 let (chunks_tx, chunks_rx) = channel::bounded(1024);
235 let mut worktree_scan_tasks = Vec::new();
236 for worktree_index in self.worktree_indices.values() {
237 let worktree_index = worktree_index.clone();
238 let chunks_tx = chunks_tx.clone();
239 worktree_scan_tasks.push(cx.spawn(async move |cx| {
240 let index = match worktree_index {
241 WorktreeIndexHandle::Loading { index } => {
242 index.clone().await.map_err(|error| anyhow!(error))?
243 }
244 WorktreeIndexHandle::Loaded { index } => index.clone(),
245 };
246
247 index
248 .read_with(cx, |index, cx| {
249 let worktree_id = index.worktree().read(cx).id();
250 let db_connection = index.db_connection().clone();
251 let db = *index.embedding_index().db();
252 cx.background_spawn(async move {
253 let txn = db_connection
254 .read_txn()
255 .context("failed to create read transaction")?;
256 let db_entries = db.iter(&txn).context("failed to iterate database")?;
257 for db_entry in db_entries {
258 let (_key, db_embedded_file) = db_entry?;
259 for chunk in db_embedded_file.chunks {
260 chunks_tx
261 .send((worktree_id, db_embedded_file.path.clone(), chunk))
262 .await?;
263 }
264 }
265 anyhow::Ok(())
266 })
267 })?
268 .await
269 }));
270 }
271 drop(chunks_tx);
272
273 let project = self.project.clone();
274 let embedding_provider = self.embedding_provider.clone();
275 cx.spawn(async move |cx| {
276 #[cfg(debug_assertions)]
277 let embedding_query_start = std::time::Instant::now();
278 log::info!("Searching for {queries:?}");
279 let queries: Vec<TextToEmbed> = queries
280 .iter()
281 .map(|s| TextToEmbed::new(s.as_str()))
282 .collect();
283
284 let query_embeddings = embedding_provider.embed(&queries[..]).await?;
285 anyhow::ensure!(
286 query_embeddings.len() == queries.len(),
287 "The number of query embeddings does not match the number of queries"
288 );
289
290 let mut results_by_worker = Vec::new();
291 for _ in 0..cx.background_executor().num_cpus() {
292 results_by_worker.push(Vec::<WorktreeSearchResult>::new());
293 }
294
295 #[cfg(debug_assertions)]
296 let search_start = std::time::Instant::now();
297 cx.background_executor()
298 .scoped(|cx| {
299 for results in results_by_worker.iter_mut() {
300 cx.spawn(async {
301 while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
302 let (score, query_index) =
303 chunk.embedding.similarity(&query_embeddings);
304
305 let ix = match results.binary_search_by(|probe| {
306 score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
307 }) {
308 Ok(ix) | Err(ix) => ix,
309 };
310 if ix < limit {
311 results.insert(
312 ix,
313 WorktreeSearchResult {
314 worktree_id,
315 path: path.clone(),
316 range: chunk.chunk.range.clone(),
317 query_index,
318 score,
319 },
320 );
321 if results.len() > limit {
322 results.pop();
323 }
324 }
325 }
326 });
327 }
328 })
329 .await;
330
331 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
332 scan_task.log_err();
333 }
334
335 project.read_with(cx, |project, cx| {
336 let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
337 for worker_results in results_by_worker {
338 search_results.extend(worker_results.into_iter().filter_map(|result| {
339 Some(SearchResult {
340 worktree: project.worktree_for_id(result.worktree_id, cx)?,
341 path: result.path,
342 range: result.range,
343 score: result.score,
344 query_index: result.query_index,
345 })
346 }));
347 }
348 search_results.sort_unstable_by(|a, b| {
349 b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
350 });
351 search_results.truncate(limit);
352
353 #[cfg(debug_assertions)]
354 {
355 let search_elapsed = search_start.elapsed();
356 log::debug!(
357 "searched {} entries in {:?}",
358 search_results.len(),
359 search_elapsed
360 );
361 let embedding_query_elapsed = embedding_query_start.elapsed();
362 log::debug!("embedding query took {:?}", embedding_query_elapsed);
363 }
364
365 search_results
366 })
367 })
368 }
369
370 #[cfg(test)]
371 pub fn path_count(&self, cx: &App) -> Result<u64> {
372 let mut result = 0;
373 for worktree_index in self.worktree_indices.values() {
374 if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
375 result += index.read(cx).path_count()?;
376 }
377 }
378 Ok(result)
379 }
380
381 pub(crate) fn worktree_index(
382 &self,
383 worktree_id: WorktreeId,
384 cx: &App,
385 ) -> Option<Entity<WorktreeIndex>> {
386 for index in self.worktree_indices.values() {
387 if let WorktreeIndexHandle::Loaded { index, .. } = index
388 && index.read(cx).worktree().read(cx).id() == worktree_id {
389 return Some(index.clone());
390 }
391 }
392 None
393 }
394
395 pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
396 let mut result = self
397 .worktree_indices
398 .values()
399 .filter_map(|index| {
400 if let WorktreeIndexHandle::Loaded { index, .. } = index {
401 Some(index.clone())
402 } else {
403 None
404 }
405 })
406 .collect::<Vec<_>>();
407 result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
408 result
409 }
410
411 pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
412 let (summaries_tx, summaries_rx) = channel::bounded(1024);
413 let mut worktree_scan_tasks = Vec::new();
414 for worktree_index in self.worktree_indices.values() {
415 let worktree_index = worktree_index.clone();
416 let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
417 worktree_scan_tasks.push(cx.spawn(async move |cx| {
418 let index = match worktree_index {
419 WorktreeIndexHandle::Loading { index } => {
420 index.clone().await.map_err(|error| anyhow!(error))?
421 }
422 WorktreeIndexHandle::Loaded { index } => index.clone(),
423 };
424
425 index
426 .read_with(cx, |index, cx| {
427 let db_connection = index.db_connection().clone();
428 let summary_index = index.summary_index();
429 let file_digest_db = summary_index.file_digest_db();
430 let summary_db = summary_index.summary_db();
431
432 cx.background_spawn(async move {
433 let txn = db_connection
434 .read_txn()
435 .context("failed to create db read transaction")?;
436 let db_entries = file_digest_db
437 .iter(&txn)
438 .context("failed to iterate database")?;
439 for db_entry in db_entries {
440 let (file_path, db_file) = db_entry?;
441
442 match summary_db.get(&txn, &db_file.digest) {
443 Ok(opt_summary) => {
444 // Currently, we only use summaries we already have. If the file hasn't been
445 // summarized yet, then we skip it and don't include it in the inferred context.
446 // If we want to do just-in-time summarization, this would be the place to do it!
447 if let Some(summary) = opt_summary {
448 summaries_tx
449 .send((file_path.to_string(), summary.to_string()))
450 .await?;
451 } else {
452 log::warn!("No summary found for {:?}", &db_file);
453 }
454 }
455 Err(err) => {
456 log::error!(
457 "Error reading from summary database: {:?}",
458 err
459 );
460 }
461 }
462 }
463 anyhow::Ok(())
464 })
465 })?
466 .await
467 }));
468 }
469 drop(summaries_tx);
470
471 let project = self.project.clone();
472 cx.spawn(async move |cx| {
473 let mut results_by_worker = Vec::new();
474 for _ in 0..cx.background_executor().num_cpus() {
475 results_by_worker.push(Vec::<FileSummary>::new());
476 }
477
478 cx.background_executor()
479 .scoped(|cx| {
480 for results in results_by_worker.iter_mut() {
481 cx.spawn(async {
482 while let Ok((filename, summary)) = summaries_rx.recv().await {
483 results.push(FileSummary { filename, summary });
484 }
485 });
486 }
487 })
488 .await;
489
490 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
491 scan_task.log_err();
492 }
493
494 project.read_with(cx, |_project, _cx| {
495 results_by_worker.into_iter().flatten().collect()
496 })
497 })
498 }
499
500 /// Empty out the backlogs of all the worktrees in the project
501 pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
502 let flush_start = std::time::Instant::now();
503
504 futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
505 let worktree_index = worktree_index.clone();
506
507 cx.spawn(async move |cx| {
508 let index = match worktree_index {
509 WorktreeIndexHandle::Loading { index } => {
510 index.clone().await.map_err(|error| anyhow!(error))?
511 }
512 WorktreeIndexHandle::Loaded { index } => index.clone(),
513 };
514 let worktree_abs_path =
515 cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
516
517 index
518 .read_with(cx, |index, cx| {
519 cx.background_spawn(
520 index.summary_index().flush_backlog(worktree_abs_path, cx),
521 )
522 })?
523 .await
524 })
525 }))
526 .map(move |results| {
527 // Log any errors, but don't block the user. These summaries are supposed to
528 // improve quality by providing extra context, but they aren't hard requirements!
529 for result in results {
530 if let Err(err) = result {
531 log::error!("Error flushing summary backlog: {:?}", err);
532 }
533 }
534
535 log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
536 })
537 }
538
539 pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
540 self.worktree_indices(cx)
541 .iter()
542 .map(|index| index.read(cx).summary_index().backlog_len())
543 .sum()
544 }
545}
546
547impl EventEmitter<Status> for ProjectIndex {}