project_index.rs

  1use crate::{
  2    embedding::{EmbeddingProvider, TextToEmbed},
  3    summary_index::FileSummary,
  4    worktree_index::{WorktreeIndex, WorktreeIndexHandle},
  5};
  6use anyhow::{anyhow, Context as _, Result};
  7use collections::HashMap;
  8use fs::Fs;
  9use futures::FutureExt;
 10use gpui::{App, Context, Entity, EntityId, EventEmitter, Subscription, Task, WeakEntity};
 11use language::LanguageRegistry;
 12use log;
 13use project::{Project, Worktree, WorktreeId};
 14use serde::{Deserialize, Serialize};
 15use smol::channel;
 16use std::{
 17    cmp::Ordering,
 18    future::Future,
 19    num::NonZeroUsize,
 20    ops::{Range, RangeInclusive},
 21    path::{Path, PathBuf},
 22    sync::Arc,
 23};
 24use util::ResultExt;
 25
 26#[derive(Debug)]
 27pub struct SearchResult {
 28    pub worktree: Entity<Worktree>,
 29    pub path: Arc<Path>,
 30    pub range: Range<usize>,
 31    pub score: f32,
 32    pub query_index: usize,
 33}
 34
 35#[derive(Debug, PartialEq, Eq)]
 36pub struct LoadedSearchResult {
 37    pub path: Arc<Path>,
 38    pub full_path: PathBuf,
 39    pub excerpt_content: String,
 40    pub row_range: RangeInclusive<u32>,
 41    pub query_index: usize,
 42}
 43
 44pub struct WorktreeSearchResult {
 45    pub worktree_id: WorktreeId,
 46    pub path: Arc<Path>,
 47    pub range: Range<usize>,
 48    pub query_index: usize,
 49    pub score: f32,
 50}
 51
 52#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
 53pub enum Status {
 54    Idle,
 55    Loading,
 56    Scanning { remaining_count: NonZeroUsize },
 57}
 58
 59pub struct ProjectIndex {
 60    db_connection: heed::Env,
 61    project: WeakEntity<Project>,
 62    worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
 63    language_registry: Arc<LanguageRegistry>,
 64    fs: Arc<dyn Fs>,
 65    last_status: Status,
 66    status_tx: channel::Sender<()>,
 67    embedding_provider: Arc<dyn EmbeddingProvider>,
 68    _maintain_status: Task<()>,
 69    _subscription: Subscription,
 70}
 71
 72impl ProjectIndex {
 73    pub fn new(
 74        project: Entity<Project>,
 75        db_connection: heed::Env,
 76        embedding_provider: Arc<dyn EmbeddingProvider>,
 77        cx: &mut Context<Self>,
 78    ) -> Self {
 79        let language_registry = project.read(cx).languages().clone();
 80        let fs = project.read(cx).fs().clone();
 81        let (status_tx, status_rx) = channel::unbounded();
 82        let mut this = ProjectIndex {
 83            db_connection,
 84            project: project.downgrade(),
 85            worktree_indices: HashMap::default(),
 86            language_registry,
 87            fs,
 88            status_tx,
 89            last_status: Status::Idle,
 90            embedding_provider,
 91            _subscription: cx.subscribe(&project, Self::handle_project_event),
 92            _maintain_status: cx.spawn(|this, mut cx| async move {
 93                while status_rx.recv().await.is_ok() {
 94                    if this
 95                        .update(&mut cx, |this, cx| this.update_status(cx))
 96                        .is_err()
 97                    {
 98                        break;
 99                    }
100                }
101            }),
102        };
103        this.update_worktree_indices(cx);
104        this
105    }
106
107    pub fn status(&self) -> Status {
108        self.last_status
109    }
110
111    pub fn project(&self) -> WeakEntity<Project> {
112        self.project.clone()
113    }
114
115    pub fn fs(&self) -> Arc<dyn Fs> {
116        self.fs.clone()
117    }
118
119    fn handle_project_event(
120        &mut self,
121        _: Entity<Project>,
122        event: &project::Event,
123        cx: &mut Context<Self>,
124    ) {
125        match event {
126            project::Event::WorktreeAdded(_) | project::Event::WorktreeRemoved(_) => {
127                self.update_worktree_indices(cx);
128            }
129            _ => {}
130        }
131    }
132
133    fn update_worktree_indices(&mut self, cx: &mut Context<Self>) {
134        let Some(project) = self.project.upgrade() else {
135            return;
136        };
137
138        let worktrees = project
139            .read(cx)
140            .visible_worktrees(cx)
141            .filter_map(|worktree| {
142                if worktree.read(cx).is_local() {
143                    Some((worktree.entity_id(), worktree))
144                } else {
145                    None
146                }
147            })
148            .collect::<HashMap<_, _>>();
149
150        self.worktree_indices
151            .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
152        for (worktree_id, worktree) in worktrees {
153            self.worktree_indices.entry(worktree_id).or_insert_with(|| {
154                let worktree_index = WorktreeIndex::load(
155                    worktree.clone(),
156                    self.db_connection.clone(),
157                    self.language_registry.clone(),
158                    self.fs.clone(),
159                    self.status_tx.clone(),
160                    self.embedding_provider.clone(),
161                    cx,
162                );
163
164                let load_worktree = cx.spawn(|this, mut cx| async move {
165                    let result = match worktree_index.await {
166                        Ok(worktree_index) => {
167                            this.update(&mut cx, |this, _| {
168                                this.worktree_indices.insert(
169                                    worktree_id,
170                                    WorktreeIndexHandle::Loaded {
171                                        index: worktree_index.clone(),
172                                    },
173                                );
174                            })?;
175                            Ok(worktree_index)
176                        }
177                        Err(error) => {
178                            this.update(&mut cx, |this, _cx| {
179                                this.worktree_indices.remove(&worktree_id)
180                            })?;
181                            Err(Arc::new(error))
182                        }
183                    };
184
185                    this.update(&mut cx, |this, cx| this.update_status(cx))?;
186
187                    result
188                });
189
190                WorktreeIndexHandle::Loading {
191                    index: load_worktree.shared(),
192                }
193            });
194        }
195
196        self.update_status(cx);
197    }
198
199    fn update_status(&mut self, cx: &mut Context<Self>) {
200        let mut indexing_count = 0;
201        let mut any_loading = false;
202
203        for index in self.worktree_indices.values_mut() {
204            match index {
205                WorktreeIndexHandle::Loading { .. } => {
206                    any_loading = true;
207                    break;
208                }
209                WorktreeIndexHandle::Loaded { index, .. } => {
210                    indexing_count += index.read(cx).entry_ids_being_indexed().len();
211                }
212            }
213        }
214
215        let status = if any_loading {
216            Status::Loading
217        } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
218            Status::Scanning { remaining_count }
219        } else {
220            Status::Idle
221        };
222
223        if status != self.last_status {
224            self.last_status = status;
225            cx.emit(status);
226        }
227    }
228
229    pub fn search(
230        &self,
231        queries: Vec<String>,
232        limit: usize,
233        cx: &App,
234    ) -> Task<Result<Vec<SearchResult>>> {
235        let (chunks_tx, chunks_rx) = channel::bounded(1024);
236        let mut worktree_scan_tasks = Vec::new();
237        for worktree_index in self.worktree_indices.values() {
238            let worktree_index = worktree_index.clone();
239            let chunks_tx = chunks_tx.clone();
240            worktree_scan_tasks.push(cx.spawn(|cx| async move {
241                let index = match worktree_index {
242                    WorktreeIndexHandle::Loading { index } => {
243                        index.clone().await.map_err(|error| anyhow!(error))?
244                    }
245                    WorktreeIndexHandle::Loaded { index } => index.clone(),
246                };
247
248                index
249                    .read_with(&cx, |index, cx| {
250                        let worktree_id = index.worktree().read(cx).id();
251                        let db_connection = index.db_connection().clone();
252                        let db = *index.embedding_index().db();
253                        cx.background_executor().spawn(async move {
254                            let txn = db_connection
255                                .read_txn()
256                                .context("failed to create read transaction")?;
257                            let db_entries = db.iter(&txn).context("failed to iterate database")?;
258                            for db_entry in db_entries {
259                                let (_key, db_embedded_file) = db_entry?;
260                                for chunk in db_embedded_file.chunks {
261                                    chunks_tx
262                                        .send((worktree_id, db_embedded_file.path.clone(), chunk))
263                                        .await?;
264                                }
265                            }
266                            anyhow::Ok(())
267                        })
268                    })?
269                    .await
270            }));
271        }
272        drop(chunks_tx);
273
274        let project = self.project.clone();
275        let embedding_provider = self.embedding_provider.clone();
276        cx.spawn(|cx| async move {
277            #[cfg(debug_assertions)]
278            let embedding_query_start = std::time::Instant::now();
279            log::info!("Searching for {queries:?}");
280            let queries: Vec<TextToEmbed> = queries
281                .iter()
282                .map(|s| TextToEmbed::new(s.as_str()))
283                .collect();
284
285            let query_embeddings = embedding_provider.embed(&queries[..]).await?;
286            if query_embeddings.len() != queries.len() {
287                return Err(anyhow!(
288                    "The number of query embeddings does not match the number of queries"
289                ));
290            }
291
292            let mut results_by_worker = Vec::new();
293            for _ in 0..cx.background_executor().num_cpus() {
294                results_by_worker.push(Vec::<WorktreeSearchResult>::new());
295            }
296
297            #[cfg(debug_assertions)]
298            let search_start = std::time::Instant::now();
299            cx.background_executor()
300                .scoped(|cx| {
301                    for results in results_by_worker.iter_mut() {
302                        cx.spawn(async {
303                            while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
304                                let (score, query_index) =
305                                    chunk.embedding.similarity(&query_embeddings);
306
307                                let ix = match results.binary_search_by(|probe| {
308                                    score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
309                                }) {
310                                    Ok(ix) | Err(ix) => ix,
311                                };
312                                if ix < limit {
313                                    results.insert(
314                                        ix,
315                                        WorktreeSearchResult {
316                                            worktree_id,
317                                            path: path.clone(),
318                                            range: chunk.chunk.range.clone(),
319                                            query_index,
320                                            score,
321                                        },
322                                    );
323                                    if results.len() > limit {
324                                        results.pop();
325                                    }
326                                }
327                            }
328                        });
329                    }
330                })
331                .await;
332
333            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
334                scan_task.log_err();
335            }
336
337            project.read_with(&cx, |project, cx| {
338                let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
339                for worker_results in results_by_worker {
340                    search_results.extend(worker_results.into_iter().filter_map(|result| {
341                        Some(SearchResult {
342                            worktree: project.worktree_for_id(result.worktree_id, cx)?,
343                            path: result.path,
344                            range: result.range,
345                            score: result.score,
346                            query_index: result.query_index,
347                        })
348                    }));
349                }
350                search_results.sort_unstable_by(|a, b| {
351                    b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
352                });
353                search_results.truncate(limit);
354
355                #[cfg(debug_assertions)]
356                {
357                    let search_elapsed = search_start.elapsed();
358                    log::debug!(
359                        "searched {} entries in {:?}",
360                        search_results.len(),
361                        search_elapsed
362                    );
363                    let embedding_query_elapsed = embedding_query_start.elapsed();
364                    log::debug!("embedding query took {:?}", embedding_query_elapsed);
365                }
366
367                search_results
368            })
369        })
370    }
371
372    #[cfg(test)]
373    pub fn path_count(&self, cx: &App) -> Result<u64> {
374        let mut result = 0;
375        for worktree_index in self.worktree_indices.values() {
376            if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
377                result += index.read(cx).path_count()?;
378            }
379        }
380        Ok(result)
381    }
382
383    pub(crate) fn worktree_index(
384        &self,
385        worktree_id: WorktreeId,
386        cx: &App,
387    ) -> Option<Entity<WorktreeIndex>> {
388        for index in self.worktree_indices.values() {
389            if let WorktreeIndexHandle::Loaded { index, .. } = index {
390                if index.read(cx).worktree().read(cx).id() == worktree_id {
391                    return Some(index.clone());
392                }
393            }
394        }
395        None
396    }
397
398    pub(crate) fn worktree_indices(&self, cx: &App) -> Vec<Entity<WorktreeIndex>> {
399        let mut result = self
400            .worktree_indices
401            .values()
402            .filter_map(|index| {
403                if let WorktreeIndexHandle::Loaded { index, .. } = index {
404                    Some(index.clone())
405                } else {
406                    None
407                }
408            })
409            .collect::<Vec<_>>();
410        result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
411        result
412    }
413
414    pub fn all_summaries(&self, cx: &App) -> Task<Result<Vec<FileSummary>>> {
415        let (summaries_tx, summaries_rx) = channel::bounded(1024);
416        let mut worktree_scan_tasks = Vec::new();
417        for worktree_index in self.worktree_indices.values() {
418            let worktree_index = worktree_index.clone();
419            let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
420            worktree_scan_tasks.push(cx.spawn(|cx| async move {
421                let index = match worktree_index {
422                    WorktreeIndexHandle::Loading { index } => {
423                        index.clone().await.map_err(|error| anyhow!(error))?
424                    }
425                    WorktreeIndexHandle::Loaded { index } => index.clone(),
426                };
427
428                index
429                    .read_with(&cx, |index, cx| {
430                        let db_connection = index.db_connection().clone();
431                        let summary_index = index.summary_index();
432                        let file_digest_db = summary_index.file_digest_db();
433                        let summary_db = summary_index.summary_db();
434
435                        cx.background_executor().spawn(async move {
436                            let txn = db_connection
437                                .read_txn()
438                                .context("failed to create db read transaction")?;
439                            let db_entries = file_digest_db
440                                .iter(&txn)
441                                .context("failed to iterate database")?;
442                            for db_entry in db_entries {
443                                let (file_path, db_file) = db_entry?;
444
445                                match summary_db.get(&txn, &db_file.digest) {
446                                    Ok(opt_summary) => {
447                                        // Currently, we only use summaries we already have. If the file hasn't been
448                                        // summarized yet, then we skip it and don't include it in the inferred context.
449                                        // If we want to do just-in-time summarization, this would be the place to do it!
450                                        if let Some(summary) = opt_summary {
451                                            summaries_tx
452                                                .send((file_path.to_string(), summary.to_string()))
453                                                .await?;
454                                        } else {
455                                            log::warn!("No summary found for {:?}", &db_file);
456                                        }
457                                    }
458                                    Err(err) => {
459                                        log::error!(
460                                            "Error reading from summary database: {:?}",
461                                            err
462                                        );
463                                    }
464                                }
465                            }
466                            anyhow::Ok(())
467                        })
468                    })?
469                    .await
470            }));
471        }
472        drop(summaries_tx);
473
474        let project = self.project.clone();
475        cx.spawn(|cx| async move {
476            let mut results_by_worker = Vec::new();
477            for _ in 0..cx.background_executor().num_cpus() {
478                results_by_worker.push(Vec::<FileSummary>::new());
479            }
480
481            cx.background_executor()
482                .scoped(|cx| {
483                    for results in results_by_worker.iter_mut() {
484                        cx.spawn(async {
485                            while let Ok((filename, summary)) = summaries_rx.recv().await {
486                                results.push(FileSummary { filename, summary });
487                            }
488                        });
489                    }
490                })
491                .await;
492
493            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
494                scan_task.log_err();
495            }
496
497            project.read_with(&cx, |_project, _cx| {
498                results_by_worker.into_iter().flatten().collect()
499            })
500        })
501    }
502
503    /// Empty out the backlogs of all the worktrees in the project
504    pub fn flush_summary_backlogs(&self, cx: &App) -> impl Future<Output = ()> {
505        let flush_start = std::time::Instant::now();
506
507        futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
508            let worktree_index = worktree_index.clone();
509
510            cx.spawn(|cx| async move {
511                let index = match worktree_index {
512                    WorktreeIndexHandle::Loading { index } => {
513                        index.clone().await.map_err(|error| anyhow!(error))?
514                    }
515                    WorktreeIndexHandle::Loaded { index } => index.clone(),
516                };
517                let worktree_abs_path =
518                    cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
519
520                index
521                    .read_with(&cx, |index, cx| {
522                        cx.background_executor()
523                            .spawn(index.summary_index().flush_backlog(worktree_abs_path, cx))
524                    })?
525                    .await
526            })
527        }))
528        .map(move |results| {
529            // Log any errors, but don't block the user. These summaries are supposed to
530            // improve quality by providing extra context, but they aren't hard requirements!
531            for result in results {
532                if let Err(err) = result {
533                    log::error!("Error flushing summary backlog: {:?}", err);
534                }
535            }
536
537            log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
538        })
539    }
540
541    pub fn remaining_summaries(&self, cx: &mut Context<Self>) -> usize {
542        self.worktree_indices(cx)
543            .iter()
544            .map(|index| index.read(cx).summary_index().backlog_len())
545            .sum()
546    }
547}
548
549impl EventEmitter<Status> for ProjectIndex {}