project_index.rs

  1use crate::{
  2    embedding::{EmbeddingProvider, TextToEmbed},
  3    summary_index::FileSummary,
  4    worktree_index::{WorktreeIndex, WorktreeIndexHandle},
  5};
  6use anyhow::{anyhow, Context as _, Result};
  7use collections::HashMap;
  8use fs::Fs;
  9use futures::FutureExt;
 10use gpui::{
 11    App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity,
 12};
 13use language::LanguageRegistry;
 14use log;
 15use project::{Project, Worktree, WorktreeId};
 16use serde::{Deserialize, Serialize};
 17use smol::channel;
 18use std::{
 19    cmp::Ordering,
 20    future::Future,
 21    num::NonZeroUsize,
 22    ops::{Range, RangeInclusive},
 23    path::{Path, PathBuf},
 24    sync::Arc,
 25};
 26use util::ResultExt;
 27
 28#[derive(Debug)]
 29pub struct SearchResult {
 30    pub worktree: Entity<Worktree>,
 31    pub path: Arc<Path>,
 32    pub range: Range<usize>,
 33    pub score: f32,
 34    pub query_index: usize,
 35}
 36
 37#[derive(Debug, PartialEq, Eq)]
 38pub struct LoadedSearchResult {
 39    pub path: Arc<Path>,
 40    pub full_path: PathBuf,
 41    pub excerpt_content: String,
 42    pub row_range: RangeInclusive<u32>,
 43    pub query_index: usize,
 44}
 45
 46pub struct WorktreeSearchResult {
 47    pub worktree_id: WorktreeId,
 48    pub path: Arc<Path>,
 49    pub range: Range<usize>,
 50    pub query_index: usize,
 51    pub score: f32,
 52}
 53
 54#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
 55pub enum Status {
 56    Idle,
 57    Loading,
 58    Scanning { remaining_count: NonZeroUsize },
 59}
 60
 61pub struct ProjectIndex {
 62    db_connection: heed::Env,
 63    project: WeakEntity<Project>,
 64    worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
 65    language_registry: Arc<LanguageRegistry>,
 66    fs: Arc<dyn Fs>,
 67    last_status: Status,
 68    status_tx: channel::Sender<()>,
 69    embedding_provider: Arc<dyn EmbeddingProvider>,
 70    _maintain_status: Task<()>,
 71    _subscription: Subscription,
 72}
 73
 74impl ProjectIndex {
 75    pub fn new(
 76        project: Entity<Project>,
 77        db_connection: heed::Env,
 78        embedding_provider: Arc<dyn EmbeddingProvider>,
 79        cx: &mut Context<Self>,
 80    ) -> Self {
 81        let language_registry = project.read(cx).languages().clone();
 82        let fs = project.read(cx).fs().clone();
 83        let (status_tx, status_rx) = channel::unbounded();
 84        let mut this = ProjectIndex {
 85            db_connection,
 86            project: project.downgrade(),
 87            worktree_indices: HashMap::default(),
 88            language_registry,
 89            fs,
 90            status_tx,
 91            last_status: Status::Idle,
 92            embedding_provider,
 93            _subscription: cx.subscribe(&project, Self::handle_project_event),
 94            _maintain_status: cx.spawn(|this, mut cx| async move {
 95                while status_rx.recv().await.is_ok() {
 96                    if this
 97                        .update(&mut cx, |this, cx| this.update_status(cx))
 98                        .is_err()
 99                    {
100                        break;
101                    }
102                }
103            }),
104        };
105        this.update_worktree_indices(cx);
106        this
107    }
108
109    pub fn status(&self) -> Status {
110        self.last_status
111    }
112
113    pub fn project(&self) -> WeakEntity<Project> {
114        self.project.clone()
115    }
116
117    pub fn fs(&self) -> Arc<dyn Fs> {
118        self.fs.clone()
119    }
120
121    fn handle_project_event(
122        &mut self,
123        _: Entity<Project>,
124        event: &project::Event,
125        cx: &mut Context<Self>,
126    ) {
127        match event {
128            project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
129                self.update_worktree_indices(cx);
130            }
131            _ => {}
132        }
133    }
134
135    fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
136        let Some(project) = self.project.upgrade() else {
137            return;
138        };
139
140        let worktrees = project
141            .read(cx)
142            .visible_worktrees(cx)
143            .filter_map(|worktree| {
144                if worktree.read(cx).is_local() {
145                    Some((worktree.entity_id(), worktree))
146                } else {
147                    None
148                }
149            })
150            .collect::<HashMap<_, _>>();
151
152        self.worktree_indices
153            .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
154        for (worktree_id, worktree) in worktrees {
155            self.worktree_indices.entry(worktree_id).or_insert_with(|| {
156                let worktree_index = WorktreeIndex::load(
157                    worktree.clone(),
158                    self.db_connection.clone(),
159                    self.language_registry.clone(),
160                    self.fs.clone(),
161                    self.status_tx.clone(),
162                    self.embedding_provider.clone(),
163                    cx,
164                );
165
166                let load_worktree = cx.spawn(|this, mut cx| async move {
167                    let result = match worktree_index.await {
168                        Ok(worktree_index) => {
169                            this.update(&mut cx, |this, _| {
170                                this.worktree_indices.insert(
171                                    worktree_id,
172                                    WorktreeIndexHandle::Loaded {
173                                        index: worktree_index.clone(),
174                                    },
175                                );
176                            })?;
177                            Ok(worktree_index)
178                        }
179                        Err(error) => {
180                            this.update(&mut cx, |this, _cx| {
181                                this.worktree_indices.remove(&worktree_id)
182                            })?;
183                            Err(Arc::new(error))
184                        }
185                    };
186
187                    this.update(&mut cx, |this, cx| this.update_status(cx))?;
188
189                    result
190                });
191
192                WorktreeIndexHandle::Loading {
193                    index: load_worktree.shared(),
194                }
195            });
196        }
197
198        self.update_status(cx);
199    }
200
201    fn update_status(&mut self, cx: &mut Context<Self>) {
202        let mut indexing_count = 0;
203        let mut any_loading = false;
204
205        for index in self.worktree_indices.values_mut() {
206            match index {
207                WorktreeIndexHandle::Loading { .. } => {
208                    any_loading = true;
209                    break;
210                }
211                WorktreeIndexHandle::Loaded { index, .. } => {
212                    indexing_count += index.read(cx).entry_ids_being_indexed().len();
213                }
214            }
215        }
216
217        let status = if any_loading {
218            Status::Loading
219        } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
220            Status::Scanning { remaining_count }
221        } else {
222            Status::Idle
223        };
224
225        if status != self.last_status {
226            self.last_status = status;
227            cx.emit(status);
228        }
229    }
230
231    pub fn search(
232        &self,
233        queries: Vec<String>,
234        limit: usize,
235        cx: &App,
236    ) -> Task<Result<Vec<SearchResult>>> {
237        let (chunks_tx, chunks_rx) = channel::bounded(1024);
238        let mut worktree_scan_tasks = Vec::new();
239        for worktree_index in self.worktree_indices.values() {
240            let worktree_index = worktree_index.clone();
241            let chunks_tx = chunks_tx.clone();
242            worktree_scan_tasks.push(cx.spawn(|cx| async move {
243                let index = match worktree_index {
244                    WorktreeIndexHandle::Loading { index } => {
245                        index.clone().await.map_err(|error| anyhow!(error))?
246                    }
247                    WorktreeIndexHandle::Loaded { index } => index.clone(),
248                };
249
250                index
251                    .read_with(&cx, |index, cx| {
252                        let worktree_id = index.worktree().read(cx).id();
253                        let db_connection = index.db_connection().clone();
254                        let db = *index.embedding_index().db();
255                        cx.background_spawn(async move {
256                            let txn = db_connection
257                                .read_txn()
258                                .context("failed to create read transaction")?;
259                            let db_entries = db.iter(&txn).context("failed to iterate database")?;
260                            for db_entry in db_entries {
261                                let (_key, db_embedded_file) = db_entry?;
262                                for chunk in db_embedded_file.chunks {
263                                    chunks_tx
264                                        .send((worktree_id, db_embedded_file.path.clone(), chunk))
265                                        .await?;
266                                }
267                            }
268                            anyhow::Ok(())
269                        })
270                    })?
271                    .await
272            }));
273        }
274        drop(chunks_tx);
275
276        let project = self.project.clone();
277        let embedding_provider = self.embedding_provider.clone();
278        cx.spawn(|cx| async move {
279            #[cfg(debug_assertions)]
280            let embedding_query_start = std::time::Instant::now();
281            log::info!("Searching for {queries:?}");
282            let queries: Vec<TextToEmbed> = queries
283                .iter()
284                .map(|s| TextToEmbed::new(s.as_str()))
285                .collect();
286
287            let query_embeddings = embedding_provider.embed(&queries[..]).await?;
288            if query_embeddings.len() != queries.len() {
289                return Err(anyhow!(
290                    "The number of query embeddings does not match the number of queries"
291                ));
292            }
293
294            let mut results_by_worker = Vec::new();
295            for _ in 0..cx.background_executor().num_cpus() {
296                results_by_worker.push(Vec::<WorktreeSearchResult>::new());
297            }
298
299            #[cfg(debug_assertions)]
300            let search_start = std::time::Instant::now();
301            cx.background_executor()
302                .scoped(|cx| {
303                    for results in results_by_worker.iter_mut() {
304                        cx.spawn(async {
305                            while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
306                                let (score, query_index) =
307                                    chunk.embedding.similarity(&query_embeddings);
308
309                                let ix = match results.binary_search_by(|probe| {
310                                    score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
311                                }) {
312                                    Ok(ix) | Err(ix) => ix,
313                                };
314                                if ix < limit {
315                                    results.insert(
316                                        ix,
317                                        WorktreeSearchResult {
318                                            worktree_id,
319                                            path: path.clone(),
320                                            range: chunk.chunk.range.clone(),
321                                            query_index,
322                                            score,
323                                        },
324                                    );
325                                    if results.len() > limit {
326                                        results.pop();
327                                    }
328                                }
329                            }
330                        });
331                    }
332                })
333                .await;
334
335            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
336                scan_task.log_err();
337            }
338
339            project.read_with(&cx, |project, cx| {
340                let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
341                for worker_results in results_by_worker {
342                    search_results.extend(worker_results.into_iter().filter_map(|result| {
343                        Some(SearchResult {
344                            worktree: project.worktree_for_id(result.worktree_id, cx)?,
345                            path: result.path,
346                            range: result.range,
347                            score: result.score,
348                            query_index: result.query_index,
349                        })
350                    }));
351                }
352                search_results.sort_unstable_by(|a, b| {
353                    b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
354                });
355                search_results.truncate(limit);
356
357                #[cfg(debug_assertions)]
358                {
359                    let search_elapsed = search_start.elapsed();
360                    log::debug!(
361                        "searched {} entries in {:?}",
362                        search_results.len(),
363                        search_elapsed
364                    );
365                    let embedding_query_elapsed = embedding_query_start.elapsed();
366                    log::debug!("embedding query took {:?}", embedding_query_elapsed);
367                }
368
369                search_results
370            })
371        })
372    }
373
374    #[cfg(test)]
375    pub fn path_count(&self, cx: &App) -> Result<u64> {
376        let mut result = 0;
377        for worktree_index in self.worktree_indices.values() {
378            if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
379                result += index.read(cx).path_count()?;
380            }
381        }
382        Ok(result)
383    }
384
385    pub(crate) fn worktree_index(
386        &self,
387        worktree_id: WorktreeId,
388        cx: &App,
389    ) -> Option<Entity<WorktreeIndex>> {
390        for index in self.worktree_indices.values() {
391            if let WorktreeIndexHandle::Loaded { index, .. } = index {
392                if index.read(cx).worktree().read(cx).id() == worktree_id {
393                    return Some(index.clone());
394                }
395            }
396        }
397        None
398    }
399
400    pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
401        let mut result = self
402            .worktree_indices
403            .values()
404            .filter_map(|index| {
405                if let WorktreeIndexHandle::Loaded { index, .. } = index {
406                    Some(index.clone())
407                } else {
408                    None
409                }
410            })
411            .collect::<Vec<_>>();
412        result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
413        result
414    }
415
416    pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
417        let (summaries_tx, summaries_rx) = channel::bounded(1024);
418        let mut worktree_scan_tasks = Vec::new();
419        for worktree_index in self.worktree_indices.values() {
420            let worktree_index = worktree_index.clone();
421            let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
422            worktree_scan_tasks.push(cx.spawn(|cx| async move {
423                let index = match worktree_index {
424                    WorktreeIndexHandle::Loading { index } => {
425                        index.clone().await.map_err(|error| anyhow!(error))?
426                    }
427                    WorktreeIndexHandle::Loaded { index } => index.clone(),
428                };
429
430                index
431                    .read_with(&cx, |index, cx| {
432                        let db_connection = index.db_connection().clone();
433                        let summary_index = index.summary_index();
434                        let file_digest_db = summary_index.file_digest_db();
435                        let summary_db = summary_index.summary_db();
436
437                        cx.background_spawn(async move {
438                            let txn = db_connection
439                                .read_txn()
440                                .context("failed to create db read transaction")?;
441                            let db_entries = file_digest_db
442                                .iter(&txn)
443                                .context("failed to iterate database")?;
444                            for db_entry in db_entries {
445                                let (file_path, db_file) = db_entry?;
446
447                                match summary_db.get(&txn, &db_file.digest) {
448                                    Ok(opt_summary) => {
449                                        // Currently, we only use summaries we already have. If the file hasn't been
450                                        // summarized yet, then we skip it and don't include it in the inferred context.
451                                        // If we want to do just-in-time summarization, this would be the place to do it!
452                                        if let Some(summary) = opt_summary {
453                                            summaries_tx
454                                                .send((file_path.to_string(), summary.to_string()))
455                                                .await?;
456                                        } else {
457                                            log::warn!("No summary found for {:?}", &db_file);
458                                        }
459                                    }
460                                    Err(err) => {
461                                        log::error!(
462                                            "Error reading from summary database: {:?}",
463                                            err
464                                        );
465                                    }
466                                }
467                            }
468                            anyhow::Ok(())
469                        })
470                    })?
471                    .await
472            }));
473        }
474        drop(summaries_tx);
475
476        let project = self.project.clone();
477        cx.spawn(|cx| async move {
478            let mut results_by_worker = Vec::new();
479            for _ in 0..cx.background_executor().num_cpus() {
480                results_by_worker.push(Vec::<FileSummary>::new());
481            }
482
483            cx.background_executor()
484                .scoped(|cx| {
485                    for results in results_by_worker.iter_mut() {
486                        cx.spawn(async {
487                            while let Ok((filename, summary)) = summaries_rx.recv().await {
488                                results.push(FileSummary { filename, summary });
489                            }
490                        });
491                    }
492                })
493                .await;
494
495            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
496                scan_task.log_err();
497            }
498
499            project.read_with(&cx, |_project, _cx| {
500                results_by_worker.into_iter().flatten().collect()
501            })
502        })
503    }
504
505    /// Empty out the backlogs of all the worktrees in the project
506    pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
507        let flush_start = std::time::Instant::now();
508
509        futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
510            let worktree_index = worktree_index.clone();
511
512            cx.spawn(|cx| async move {
513                let index = match worktree_index {
514                    WorktreeIndexHandle::Loading { index } => {
515                        index.clone().await.map_err(|error| anyhow!(error))?
516                    }
517                    WorktreeIndexHandle::Loaded { index } => index.clone(),
518                };
519                let worktree_abs_path =
520                    cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
521
522                index
523                    .read_with(&cx, |index, cx| {
524                        cx.background_spawn(
525                            index.summary_index().flush_backlog(worktree_abs_path, cx),
526                        )
527                    })?
528                    .await
529            })
530        }))
531        .map(move |results| {
532            // Log any errors, but don't block the user. These summaries are supposed to
533            // improve quality by providing extra context, but they aren't hard requirements!
534            for result in results {
535                if let Err(err) = result {
536                    log::error!("Error flushing summary backlog: {:?}", err);
537                }
538            }
539
540            log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
541        })
542    }
543
544    pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
545        self.worktree_indices(cx)
546            .iter()
547            .map(|index| index.read(cx).summary_index().backlog_len())
548            .sum()
549    }
550}
551
552impl EventEmitter<Status> for ProjectIndex {}