1use crate::{
2 embedding::{EmbeddingProvider, TextToEmbed},
3 summary_index::FileSummary,
4 worktree_index::{WorktreeIndex, WorktreeIndexHandle},
5};
6use anyhow::{Context as _, Result, anyhow};
7use collections::HashMap;
8use fs::Fs;
9use futures::FutureExt;
10use gpui::{
11 App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity,
12};
13use language::LanguageRegistry;
14use log;
15use project::{Project, Worktree, WorktreeId};
16use serde::{Deserialize, Serialize};
17use smol::channel;
18use std::{
19 cmp::Ordering,
20 future::Future,
21 num::NonZeroUsize,
22 ops::{Range, RangeInclusive},
23 path::{Path, PathBuf},
24 sync::Arc,
25};
26use util::ResultExt;
27
28#[derive(Debug)]
29pub struct SearchResult {
30 pub worktree: Entity<Worktree>,
31 pub path: Arc<Path>,
32 pub range: Range<usize>,
33 pub score: f32,
34 pub query_index: usize,
35}
36
37#[derive(Debug, PartialEq, Eq)]
38pub struct LoadedSearchResult {
39 pub path: Arc<Path>,
40 pub full_path: PathBuf,
41 pub excerpt_content: String,
42 pub row_range: RangeInclusive<u32>,
43 pub query_index: usize,
44}
45
46pub struct WorktreeSearchResult {
47 pub worktree_id: WorktreeId,
48 pub path: Arc<Path>,
49 pub range: Range<usize>,
50 pub query_index: usize,
51 pub score: f32,
52}
53
54#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
55pub enum Status {
56 Idle,
57 Loading,
58 Scanning { remaining_count: NonZeroUsize },
59}
60
61pub struct ProjectIndex {
62 db_connection: heed::Env,
63 project: WeakEntity<Project>,
64 worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
65 language_registry: Arc<LanguageRegistry>,
66 fs: Arc<dyn Fs>,
67 last_status: Status,
68 status_tx: channel::Sender<()>,
69 embedding_provider: Arc<dyn EmbeddingProvider>,
70 _maintain_status: Task<()>,
71 _subscription: Subscription,
72}
73
74impl ProjectIndex {
75 pub fn new(
76 project: Entity<Project>,
77 db_connection: heed::Env,
78 embedding_provider: Arc<dyn EmbeddingProvider>,
79 cx: &mut Context<Self>,
80 ) -> Self {
81 let language_registry = project.read(cx).languages().clone();
82 let fs = project.read(cx).fs().clone();
83 let (status_tx, status_rx) = channel::unbounded();
84 let mut this = ProjectIndex {
85 db_connection,
86 project: project.downgrade(),
87 worktree_indices: HashMap::default(),
88 language_registry,
89 fs,
90 status_tx,
91 last_status: Status::Idle,
92 embedding_provider,
93 _subscription: cx.subscribe(&project, Self::handle_project_event),
94 _maintain_status: cx.spawn(async move |this, cx| {
95 while status_rx.recv().await.is_ok() {
96 if this.update(cx, |this, cx| this.update_status(cx)).is_err() {
97 break;
98 }
99 }
100 }),
101 };
102 this.update_worktree_indices(cx);
103 this
104 }
105
106 pub fn status(&self) -> Status {
107 self.last_status
108 }
109
110 pub fn project(&self) -> WeakEntity<Project> {
111 self.project.clone()
112 }
113
114 pub fn fs(&self) -> Arc<dyn Fs> {
115 self.fs.clone()
116 }
117
118 fn handle_project_event(
119 &mut self,
120 _: Entity<Project>,
121 event: &project::Event,
122 cx: &mut Context<Self>,
123 ) {
124 match event {
125 project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
126 self.update_worktree_indices(cx);
127 }
128 _ => {}
129 }
130 }
131
132 fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
133 let Some(project) = self.project.upgrade() else {
134 return;
135 };
136
137 let worktrees = project
138 .read(cx)
139 .visible_worktrees(cx)
140 .filter_map(|worktree| {
141 if worktree.read(cx).is_local() {
142 Some((worktree.entity_id(), worktree))
143 } else {
144 None
145 }
146 })
147 .collect::<HashMap<_, _>>();
148
149 self.worktree_indices
150 .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
151 for (worktree_id, worktree) in worktrees {
152 self.worktree_indices.entry(worktree_id).or_insert_with(|| {
153 let worktree_index = WorktreeIndex::load(
154 worktree.clone(),
155 self.db_connection.clone(),
156 self.language_registry.clone(),
157 self.fs.clone(),
158 self.status_tx.clone(),
159 self.embedding_provider.clone(),
160 cx,
161 );
162
163 let load_worktree = cx.spawn(async move |this, cx| {
164 let result = match worktree_index.await {
165 Ok(worktree_index) => {
166 this.update(cx, |this, _| {
167 this.worktree_indices.insert(
168 worktree_id,
169 WorktreeIndexHandle::Loaded {
170 index: worktree_index.clone(),
171 },
172 );
173 })?;
174 Ok(worktree_index)
175 }
176 Err(error) => {
177 this.update(cx, |this, _cx| {
178 this.worktree_indices.remove(&worktree_id)
179 })?;
180 Err(Arc::new(error))
181 }
182 };
183
184 this.update(cx, |this, cx| this.update_status(cx))?;
185
186 result
187 });
188
189 WorktreeIndexHandle::Loading {
190 index: load_worktree.shared(),
191 }
192 });
193 }
194
195 self.update_status(cx);
196 }
197
198 fn update_status(&mut self, cx: &mut Context<Self>) {
199 let mut indexing_count = 0;
200 let mut any_loading = false;
201
202 for index in self.worktree_indices.values_mut() {
203 match index {
204 WorktreeIndexHandle::Loading { .. } => {
205 any_loading = true;
206 break;
207 }
208 WorktreeIndexHandle::Loaded { index, .. } => {
209 indexing_count += index.read(cx).entry_ids_being_indexed().len();
210 }
211 }
212 }
213
214 let status = if any_loading {
215 Status::Loading
216 } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
217 Status::Scanning { remaining_count }
218 } else {
219 Status::Idle
220 };
221
222 if status != self.last_status {
223 self.last_status = status;
224 cx.emit(status);
225 }
226 }
227
228 pub fn search(
229 &self,
230 queries: Vec<String>,
231 limit: usize,
232 cx: &App,
233 ) -> Task<Result<Vec<SearchResult>>> {
234 let (chunks_tx, chunks_rx) = channel::bounded(1024);
235 let mut worktree_scan_tasks = Vec::new();
236 for worktree_index in self.worktree_indices.values() {
237 let worktree_index = worktree_index.clone();
238 let chunks_tx = chunks_tx.clone();
239 worktree_scan_tasks.push(cx.spawn(async move |cx| {
240 let index = match worktree_index {
241 WorktreeIndexHandle::Loading { index } => {
242 index.clone().await.map_err(|error| anyhow!(error))?
243 }
244 WorktreeIndexHandle::Loaded { index } => index.clone(),
245 };
246
247 index
248 .read_with(cx, |index, cx| {
249 let worktree_id = index.worktree().read(cx).id();
250 let db_connection = index.db_connection().clone();
251 let db = *index.embedding_index().db();
252 cx.background_spawn(async move {
253 let txn = db_connection
254 .read_txn()
255 .context("failed to create read transaction")?;
256 let db_entries = db.iter(&txn).context("failed to iterate database")?;
257 for db_entry in db_entries {
258 let (_key, db_embedded_file) = db_entry?;
259 for chunk in db_embedded_file.chunks {
260 chunks_tx
261 .send((worktree_id, db_embedded_file.path.clone(), chunk))
262 .await?;
263 }
264 }
265 anyhow::Ok(())
266 })
267 })?
268 .await
269 }));
270 }
271 drop(chunks_tx);
272
273 let project = self.project.clone();
274 let embedding_provider = self.embedding_provider.clone();
275 cx.spawn(async move |cx| {
276 #[cfg(debug_assertions)]
277 let embedding_query_start = std::time::Instant::now();
278 log::info!("Searching for {queries:?}");
279 let queries: Vec<TextToEmbed> = queries
280 .iter()
281 .map(|s| TextToEmbed::new(s.as_str()))
282 .collect();
283
284 let query_embeddings = embedding_provider.embed(&queries[..]).await?;
285 anyhow::ensure!(
286 query_embeddings.len() == queries.len(),
287 "The number of query embeddings does not match the number of queries"
288 );
289
290 let mut results_by_worker = Vec::new();
291 for _ in 0..cx.background_executor().num_cpus() {
292 results_by_worker.push(Vec::<WorktreeSearchResult>::new());
293 }
294
295 #[cfg(debug_assertions)]
296 let search_start = std::time::Instant::now();
297 cx.background_executor()
298 .scoped(|cx| {
299 for results in results_by_worker.iter_mut() {
300 cx.spawn(async {
301 while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
302 let (score, query_index) =
303 chunk.embedding.similarity(&query_embeddings);
304
305 let ix = match results.binary_search_by(|probe| {
306 score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
307 }) {
308 Ok(ix) | Err(ix) => ix,
309 };
310 if ix < limit {
311 results.insert(
312 ix,
313 WorktreeSearchResult {
314 worktree_id,
315 path: path.clone(),
316 range: chunk.chunk.range.clone(),
317 query_index,
318 score,
319 },
320 );
321 if results.len() > limit {
322 results.pop();
323 }
324 }
325 }
326 });
327 }
328 })
329 .await;
330
331 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
332 scan_task.log_err();
333 }
334
335 project.read_with(cx, |project, cx| {
336 let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
337 for worker_results in results_by_worker {
338 search_results.extend(worker_results.into_iter().filter_map(|result| {
339 Some(SearchResult {
340 worktree: project.worktree_for_id(result.worktree_id, cx)?,
341 path: result.path,
342 range: result.range,
343 score: result.score,
344 query_index: result.query_index,
345 })
346 }));
347 }
348 search_results.sort_unstable_by(|a, b| {
349 b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
350 });
351 search_results.truncate(limit);
352
353 #[cfg(debug_assertions)]
354 {
355 let search_elapsed = search_start.elapsed();
356 log::debug!(
357 "searched {} entries in {:?}",
358 search_results.len(),
359 search_elapsed
360 );
361 let embedding_query_elapsed = embedding_query_start.elapsed();
362 log::debug!("embedding query took {:?}", embedding_query_elapsed);
363 }
364
365 search_results
366 })
367 })
368 }
369
370 #[cfg(test)]
371 pub fn path_count(&self, cx: &App) -> Result<u64> {
372 let mut result = 0;
373 for worktree_index in self.worktree_indices.values() {
374 if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
375 result += index.read(cx).path_count()?;
376 }
377 }
378 Ok(result)
379 }
380
381 pub(crate) fn worktree_index(
382 &self,
383 worktree_id: WorktreeId,
384 cx: &App,
385 ) -> Option<Entity<WorktreeIndex>> {
386 for index in self.worktree_indices.values() {
387 if let WorktreeIndexHandle::Loaded { index, .. } = index
388 && index.read(cx).worktree().read(cx).id() == worktree_id
389 {
390 return Some(index.clone());
391 }
392 }
393 None
394 }
395
396 pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
397 let mut result = self
398 .worktree_indices
399 .values()
400 .filter_map(|index| {
401 if let WorktreeIndexHandle::Loaded { index, .. } = index {
402 Some(index.clone())
403 } else {
404 None
405 }
406 })
407 .collect::<Vec<_>>();
408 result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
409 result
410 }
411
412 pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
413 let (summaries_tx, summaries_rx) = channel::bounded(1024);
414 let mut worktree_scan_tasks = Vec::new();
415 for worktree_index in self.worktree_indices.values() {
416 let worktree_index = worktree_index.clone();
417 let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
418 worktree_scan_tasks.push(cx.spawn(async move |cx| {
419 let index = match worktree_index {
420 WorktreeIndexHandle::Loading { index } => {
421 index.clone().await.map_err(|error| anyhow!(error))?
422 }
423 WorktreeIndexHandle::Loaded { index } => index.clone(),
424 };
425
426 index
427 .read_with(cx, |index, cx| {
428 let db_connection = index.db_connection().clone();
429 let summary_index = index.summary_index();
430 let file_digest_db = summary_index.file_digest_db();
431 let summary_db = summary_index.summary_db();
432
433 cx.background_spawn(async move {
434 let txn = db_connection
435 .read_txn()
436 .context("failed to create db read transaction")?;
437 let db_entries = file_digest_db
438 .iter(&txn)
439 .context("failed to iterate database")?;
440 for db_entry in db_entries {
441 let (file_path, db_file) = db_entry?;
442
443 match summary_db.get(&txn, &db_file.digest) {
444 Ok(opt_summary) => {
445 // Currently, we only use summaries we already have. If the file hasn't been
446 // summarized yet, then we skip it and don't include it in the inferred context.
447 // If we want to do just-in-time summarization, this would be the place to do it!
448 if let Some(summary) = opt_summary {
449 summaries_tx
450 .send((file_path.to_string(), summary.to_string()))
451 .await?;
452 } else {
453 log::warn!("No summary found for {:?}", &db_file);
454 }
455 }
456 Err(err) => {
457 log::error!(
458 "Error reading from summary database: {:?}",
459 err
460 );
461 }
462 }
463 }
464 anyhow::Ok(())
465 })
466 })?
467 .await
468 }));
469 }
470 drop(summaries_tx);
471
472 let project = self.project.clone();
473 cx.spawn(async move |cx| {
474 let mut results_by_worker = Vec::new();
475 for _ in 0..cx.background_executor().num_cpus() {
476 results_by_worker.push(Vec::<FileSummary>::new());
477 }
478
479 cx.background_executor()
480 .scoped(|cx| {
481 for results in results_by_worker.iter_mut() {
482 cx.spawn(async {
483 while let Ok((filename, summary)) = summaries_rx.recv().await {
484 results.push(FileSummary { filename, summary });
485 }
486 });
487 }
488 })
489 .await;
490
491 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
492 scan_task.log_err();
493 }
494
495 project.read_with(cx, |_project, _cx| {
496 results_by_worker.into_iter().flatten().collect()
497 })
498 })
499 }
500
501 /// Empty out the backlogs of all the worktrees in the project
502 pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
503 let flush_start = std::time::Instant::now();
504
505 futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
506 let worktree_index = worktree_index.clone();
507
508 cx.spawn(async move |cx| {
509 let index = match worktree_index {
510 WorktreeIndexHandle::Loading { index } => {
511 index.clone().await.map_err(|error| anyhow!(error))?
512 }
513 WorktreeIndexHandle::Loaded { index } => index.clone(),
514 };
515 let worktree_abs_path =
516 cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
517
518 index
519 .read_with(cx, |index, cx| {
520 cx.background_spawn(
521 index.summary_index().flush_backlog(worktree_abs_path, cx),
522 )
523 })?
524 .await
525 })
526 }))
527 .map(move |results| {
528 // Log any errors, but don't block the user. These summaries are supposed to
529 // improve quality by providing extra context, but they aren't hard requirements!
530 for result in results {
531 if let Err(err) = result {
532 log::error!("Error flushing summary backlog: {:?}", err);
533 }
534 }
535
536 log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
537 })
538 }
539
540 pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
541 self.worktree_indices(cx)
542 .iter()
543 .map(|index| index.read(cx).summary_index().backlog_len())
544 .sum()
545 }
546}
547
548impl EventEmitter<Status> for ProjectIndex {}