project_index.rs

  1use crate::{
  2    embedding::{EmbeddingProvider, TextToEmbed},
  3    summary_index::FileSummary,
  4    worktree_index::{WorktreeIndex, WorktreeIndexHandle},
  5};
  6use anyhow::{Context as _, Result, anyhow};
  7use collections::HashMap;
  8use fs::Fs;
  9use futures::FutureExt;
 10use gpui::{
 11    App, AppContext as _, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity,
 12};
 13use language::LanguageRegistry;
 14use log;
 15use project::{Project, Worktree, WorktreeId};
 16use serde::{Deserialize, Serialize};
 17use smol::channel;
 18use std::{
 19    cmp::Ordering,
 20    future::Future,
 21    num::NonZeroUsize,
 22    ops::{Range, RangeInclusive},
 23    path::{Path, PathBuf},
 24    sync::Arc,
 25};
 26use util::ResultExt;
 27
 28#[derive(Debug)]
 29pub struct SearchResult {
 30    pub worktree: Entity<Worktree>,
 31    pub path: Arc<Path>,
 32    pub range: Range<usize>,
 33    pub score: f32,
 34    pub query_index: usize,
 35}
 36
 37#[derive(Debug, PartialEq, Eq)]
 38pub struct LoadedSearchResult {
 39    pub path: Arc<Path>,
 40    pub full_path: PathBuf,
 41    pub excerpt_content: String,
 42    pub row_range: RangeInclusive<u32>,
 43    pub query_index: usize,
 44}
 45
 46pub struct WorktreeSearchResult {
 47    pub worktree_id: WorktreeId,
 48    pub path: Arc<Path>,
 49    pub range: Range<usize>,
 50    pub query_index: usize,
 51    pub score: f32,
 52}
 53
 54#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
 55pub enum Status {
 56    Idle,
 57    Loading,
 58    Scanning { remaining_count: NonZeroUsize },
 59}
 60
 61pub struct ProjectIndex {
 62    db_connection: heed::Env,
 63    project: WeakEntity<Project>,
 64    worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
 65    language_registry: Arc<LanguageRegistry>,
 66    fs: Arc<dyn Fs>,
 67    last_status: Status,
 68    status_tx: channel::Sender<()>,
 69    embedding_provider: Arc<dyn EmbeddingProvider>,
 70    _maintain_status: Task<()>,
 71    _subscription: Subscription,
 72}
 73
 74impl ProjectIndex {
 75    pub fn new(
 76        project: Entity<Project>,
 77        db_connection: heed::Env,
 78        embedding_provider: Arc<dyn EmbeddingProvider>,
 79        cx: &mut Context<Self>,
 80    ) -> Self {
 81        let language_registry = project.read(cx).languages().clone();
 82        let fs = project.read(cx).fs().clone();
 83        let (status_tx, status_rx) = channel::unbounded();
 84        let mut this = ProjectIndex {
 85            db_connection,
 86            project: project.downgrade(),
 87            worktree_indices: HashMap::default(),
 88            language_registry,
 89            fs,
 90            status_tx,
 91            last_status: Status::Idle,
 92            embedding_provider,
 93            _subscription: cx.subscribe(&project, Self::handle_project_event),
 94            _maintain_status: cx.spawn(async move |this, cx| {
 95                while status_rx.recv().await.is_ok() {
 96                    if this.update(cx, |this, cx| this.update_status(cx)).is_err() {
 97                        break;
 98                    }
 99                }
100            }),
101        };
102        this.update_worktree_indices(cx);
103        this
104    }
105
106    pub fn status(&self) -> Status {
107        self.last_status
108    }
109
110    pub fn project(&self) -> WeakEntity<Project> {
111        self.project.clone()
112    }
113
114    pub fn fs(&self) -> Arc<dyn Fs> {
115        self.fs.clone()
116    }
117
118    fn handle_project_event(
119        &mut self,
120        _: Entity<Project>,
121        event: &project::Event,
122        cx: &mut Context<Self>,
123    ) {
124        match event {
125            project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
126                self.update_worktree_indices(cx);
127            }
128            _ => {}
129        }
130    }
131
132    fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
133        let Some(project) = self.project.upgrade() else {
134            return;
135        };
136
137        let worktrees = project
138            .read(cx)
139            .visible_worktrees(cx)
140            .filter_map(|worktree| {
141                if worktree.read(cx).is_local() {
142                    Some((worktree.entity_id(), worktree))
143                } else {
144                    None
145                }
146            })
147            .collect::<HashMap<_, _>>();
148
149        self.worktree_indices
150            .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
151        for (worktree_id, worktree) in worktrees {
152            self.worktree_indices.entry(worktree_id).or_insert_with(|| {
153                let worktree_index = WorktreeIndex::load(
154                    worktree.clone(),
155                    self.db_connection.clone(),
156                    self.language_registry.clone(),
157                    self.fs.clone(),
158                    self.status_tx.clone(),
159                    self.embedding_provider.clone(),
160                    cx,
161                );
162
163                let load_worktree = cx.spawn(async move |this, cx| {
164                    let result = match worktree_index.await {
165                        Ok(worktree_index) => {
166                            this.update(cx, |this, _| {
167                                this.worktree_indices.insert(
168                                    worktree_id,
169                                    WorktreeIndexHandle::Loaded {
170                                        index: worktree_index.clone(),
171                                    },
172                                );
173                            })?;
174                            Ok(worktree_index)
175                        }
176                        Err(error) => {
177                            this.update(cx, |this, _cx| {
178                                this.worktree_indices.remove(&worktree_id)
179                            })?;
180                            Err(Arc::new(error))
181                        }
182                    };
183
184                    this.update(cx, |this, cx| this.update_status(cx))?;
185
186                    result
187                });
188
189                WorktreeIndexHandle::Loading {
190                    index: load_worktree.shared(),
191                }
192            });
193        }
194
195        self.update_status(cx);
196    }
197
198    fn update_status(&mut self, cx: &mut Context<Self>) {
199        let mut indexing_count = 0;
200        let mut any_loading = false;
201
202        for index in self.worktree_indices.values_mut() {
203            match index {
204                WorktreeIndexHandle::Loading { .. } => {
205                    any_loading = true;
206                    break;
207                }
208                WorktreeIndexHandle::Loaded { index, .. } => {
209                    indexing_count += index.read(cx).entry_ids_being_indexed().len();
210                }
211            }
212        }
213
214        let status = if any_loading {
215            Status::Loading
216        } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
217            Status::Scanning { remaining_count }
218        } else {
219            Status::Idle
220        };
221
222        if status != self.last_status {
223            self.last_status = status;
224            cx.emit(status);
225        }
226    }
227
228    pub fn search(
229        &self,
230        queries: Vec<String>,
231        limit: usize,
232        cx: &App,
233    ) -> Task<Result<Vec<SearchResult>>> {
234        let (chunks_tx, chunks_rx) = channel::bounded(1024);
235        let mut worktree_scan_tasks = Vec::new();
236        for worktree_index in self.worktree_indices.values() {
237            let worktree_index = worktree_index.clone();
238            let chunks_tx = chunks_tx.clone();
239            worktree_scan_tasks.push(cx.spawn(async move |cx| {
240                let index = match worktree_index {
241                    WorktreeIndexHandle::Loading { index } => {
242                        index.clone().await.map_err(|error| anyhow!(error))?
243                    }
244                    WorktreeIndexHandle::Loaded { index } => index.clone(),
245                };
246
247                index
248                    .read_with(cx, |index, cx| {
249                        let worktree_id = index.worktree().read(cx).id();
250                        let db_connection = index.db_connection().clone();
251                        let db = *index.embedding_index().db();
252                        cx.background_spawn(async move {
253                            let txn = db_connection
254                                .read_txn()
255                                .context("failed to create read transaction")?;
256                            let db_entries = db.iter(&txn).context("failed to iterate database")?;
257                            for db_entry in db_entries {
258                                let (_key, db_embedded_file) = db_entry?;
259                                for chunk in db_embedded_file.chunks {
260                                    chunks_tx
261                                        .send((worktree_id, db_embedded_file.path.clone(), chunk))
262                                        .await?;
263                                }
264                            }
265                            anyhow::Ok(())
266                        })
267                    })?
268                    .await
269            }));
270        }
271        drop(chunks_tx);
272
273        let project = self.project.clone();
274        let embedding_provider = self.embedding_provider.clone();
275        cx.spawn(async move |cx| {
276            #[cfg(debug_assertions)]
277            let embedding_query_start = std::time::Instant::now();
278            log::info!("Searching for {queries:?}");
279            let queries: Vec<TextToEmbed> = queries
280                .iter()
281                .map(|s| TextToEmbed::new(s.as_str()))
282                .collect();
283
284            let query_embeddings = embedding_provider.embed(&queries[..]).await?;
285            anyhow::ensure!(
286                query_embeddings.len() == queries.len(),
287                "The number of query embeddings does not match the number of queries"
288            );
289
290            let mut results_by_worker = Vec::new();
291            for _ in 0..cx.background_executor().num_cpus() {
292                results_by_worker.push(Vec::<WorktreeSearchResult>::new());
293            }
294
295            #[cfg(debug_assertions)]
296            let search_start = std::time::Instant::now();
297            cx.background_executor()
298                .scoped(|cx| {
299                    for results in results_by_worker.iter_mut() {
300                        cx.spawn(async {
301                            while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
302                                let (score, query_index) =
303                                    chunk.embedding.similarity(&query_embeddings);
304
305                                let ix = match results.binary_search_by(|probe| {
306                                    score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
307                                }) {
308                                    Ok(ix) | Err(ix) => ix,
309                                };
310                                if ix < limit {
311                                    results.insert(
312                                        ix,
313                                        WorktreeSearchResult {
314                                            worktree_id,
315                                            path: path.clone(),
316                                            range: chunk.chunk.range.clone(),
317                                            query_index,
318                                            score,
319                                        },
320                                    );
321                                    if results.len() > limit {
322                                        results.pop();
323                                    }
324                                }
325                            }
326                        });
327                    }
328                })
329                .await;
330
331            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
332                scan_task.log_err();
333            }
334
335            project.read_with(cx, |project, cx| {
336                let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
337                for worker_results in results_by_worker {
338                    search_results.extend(worker_results.into_iter().filter_map(|result| {
339                        Some(SearchResult {
340                            worktree: project.worktree_for_id(result.worktree_id, cx)?,
341                            path: result.path,
342                            range: result.range,
343                            score: result.score,
344                            query_index: result.query_index,
345                        })
346                    }));
347                }
348                search_results.sort_unstable_by(|a, b| {
349                    b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
350                });
351                search_results.truncate(limit);
352
353                #[cfg(debug_assertions)]
354                {
355                    let search_elapsed = search_start.elapsed();
356                    log::debug!(
357                        "searched {} entries in {:?}",
358                        search_results.len(),
359                        search_elapsed
360                    );
361                    let embedding_query_elapsed = embedding_query_start.elapsed();
362                    log::debug!("embedding query took {:?}", embedding_query_elapsed);
363                }
364
365                search_results
366            })
367        })
368    }
369
370    #[cfg(test)]
371    pub fn path_count(&self, cx: &App) -> Result<u64> {
372        let mut result = 0;
373        for worktree_index in self.worktree_indices.values() {
374            if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
375                result += index.read(cx).path_count()?;
376            }
377        }
378        Ok(result)
379    }
380
381    pub(crate) fn worktree_index(
382        &self,
383        worktree_id: WorktreeId,
384        cx: &App,
385    ) -> Option<Entity<WorktreeIndex>> {
386        for index in self.worktree_indices.values() {
387            if let WorktreeIndexHandle::Loaded { index, .. } = index
388                && index.read(cx).worktree().read(cx).id() == worktree_id
389            {
390                return Some(index.clone());
391            }
392        }
393        None
394    }
395
396    pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
397        let mut result = self
398            .worktree_indices
399            .values()
400            .filter_map(|index| {
401                if let WorktreeIndexHandle::Loaded { index, .. } = index {
402                    Some(index.clone())
403                } else {
404                    None
405                }
406            })
407            .collect::<Vec<_>>();
408        result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
409        result
410    }
411
412    pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
413        let (summaries_tx, summaries_rx) = channel::bounded(1024);
414        let mut worktree_scan_tasks = Vec::new();
415        for worktree_index in self.worktree_indices.values() {
416            let worktree_index = worktree_index.clone();
417            let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
418            worktree_scan_tasks.push(cx.spawn(async move |cx| {
419                let index = match worktree_index {
420                    WorktreeIndexHandle::Loading { index } => {
421                        index.clone().await.map_err(|error| anyhow!(error))?
422                    }
423                    WorktreeIndexHandle::Loaded { index } => index.clone(),
424                };
425
426                index
427                    .read_with(cx, |index, cx| {
428                        let db_connection = index.db_connection().clone();
429                        let summary_index = index.summary_index();
430                        let file_digest_db = summary_index.file_digest_db();
431                        let summary_db = summary_index.summary_db();
432
433                        cx.background_spawn(async move {
434                            let txn = db_connection
435                                .read_txn()
436                                .context("failed to create db read transaction")?;
437                            let db_entries = file_digest_db
438                                .iter(&txn)
439                                .context("failed to iterate database")?;
440                            for db_entry in db_entries {
441                                let (file_path, db_file) = db_entry?;
442
443                                match summary_db.get(&txn, &db_file.digest) {
444                                    Ok(opt_summary) => {
445                                        // Currently, we only use summaries we already have. If the file hasn't been
446                                        // summarized yet, then we skip it and don't include it in the inferred context.
447                                        // If we want to do just-in-time summarization, this would be the place to do it!
448                                        if let Some(summary) = opt_summary {
449                                            summaries_tx
450                                                .send((file_path.to_string(), summary.to_string()))
451                                                .await?;
452                                        } else {
453                                            log::warn!("No summary found for {:?}", &db_file);
454                                        }
455                                    }
456                                    Err(err) => {
457                                        log::error!(
458                                            "Error reading from summary database: {:?}",
459                                            err
460                                        );
461                                    }
462                                }
463                            }
464                            anyhow::Ok(())
465                        })
466                    })?
467                    .await
468            }));
469        }
470        drop(summaries_tx);
471
472        let project = self.project.clone();
473        cx.spawn(async move |cx| {
474            let mut results_by_worker = Vec::new();
475            for _ in 0..cx.background_executor().num_cpus() {
476                results_by_worker.push(Vec::<FileSummary>::new());
477            }
478
479            cx.background_executor()
480                .scoped(|cx| {
481                    for results in results_by_worker.iter_mut() {
482                        cx.spawn(async {
483                            while let Ok((filename, summary)) = summaries_rx.recv().await {
484                                results.push(FileSummary { filename, summary });
485                            }
486                        });
487                    }
488                })
489                .await;
490
491            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
492                scan_task.log_err();
493            }
494
495            project.read_with(cx, |_project, _cx| {
496                results_by_worker.into_iter().flatten().collect()
497            })
498        })
499    }
500
501    /// Empty out the backlogs of all the worktrees in the project
502    pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
503        let flush_start = std::time::Instant::now();
504
505        futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
506            let worktree_index = worktree_index.clone();
507
508            cx.spawn(async move |cx| {
509                let index = match worktree_index {
510                    WorktreeIndexHandle::Loading { index } => {
511                        index.clone().await.map_err(|error| anyhow!(error))?
512                    }
513                    WorktreeIndexHandle::Loaded { index } => index.clone(),
514                };
515                let worktree_abs_path =
516                    cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
517
518                index
519                    .read_with(cx, |index, cx| {
520                        cx.background_spawn(
521                            index.summary_index().flush_backlog(worktree_abs_path, cx),
522                        )
523                    })?
524                    .await
525            })
526        }))
527        .map(move |results| {
528            // Log any errors, but don't block the user. These summaries are supposed to
529            // improve quality by providing extra context, but they aren't hard requirements!
530            for result in results {
531                if let Err(err) = result {
532                    log::error!("Error flushing summary backlog: {:?}", err);
533                }
534            }
535
536            log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
537        })
538    }
539
540    pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
541        self.worktree_indices(cx)
542            .iter()
543            .map(|index| index.read(cx).summary_index().backlog_len())
544            .sum()
545    }
546}
547
548impl EventEmitter<Status> for ProjectIndex {}