project_index.rs

  1use crate::{
  2    embedding::{EmbeddingProvider, TextToEmbed},
  3    summary_index::FileSummary,
  4    worktree_index::{WorktreeIndex, WorktreeIndexHandle},
  5};
  6use anyhow::{anyhow, Context, Result};
  7use collections::HashMap;
  8use fs::Fs;
  9use futures::{stream::StreamExt, FutureExt};
 10use gpui::{
 11    AppContext, Entity, EntityId, EventEmitter, Model, ModelContext, Subscription, Task, WeakModel,
 12};
 13use language::LanguageRegistry;
 14use log;
 15use project::{Project, Worktree, WorktreeId};
 16use serde::{Deserialize, Serialize};
 17use smol::channel;
 18use std::{
 19    cmp::Ordering,
 20    future::Future,
 21    num::NonZeroUsize,
 22    ops::{Range, RangeInclusive},
 23    path::{Path, PathBuf},
 24    sync::Arc,
 25};
 26use util::ResultExt;
 27
 28#[derive(Debug)]
 29pub struct SearchResult {
 30    pub worktree: Model<Worktree>,
 31    pub path: Arc<Path>,
 32    pub range: Range<usize>,
 33    pub score: f32,
 34}
 35
 36pub struct LoadedSearchResult {
 37    pub path: Arc<Path>,
 38    pub range: Range<usize>,
 39    pub full_path: PathBuf,
 40    pub file_content: String,
 41    pub row_range: RangeInclusive<u32>,
 42}
 43
 44pub struct WorktreeSearchResult {
 45    pub worktree_id: WorktreeId,
 46    pub path: Arc<Path>,
 47    pub range: Range<usize>,
 48    pub score: f32,
 49}
 50
 51#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
 52pub enum Status {
 53    Idle,
 54    Loading,
 55    Scanning { remaining_count: NonZeroUsize },
 56}
 57
 58pub struct ProjectIndex {
 59    db_connection: heed::Env,
 60    project: WeakModel<Project>,
 61    worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
 62    language_registry: Arc<LanguageRegistry>,
 63    fs: Arc<dyn Fs>,
 64    last_status: Status,
 65    status_tx: channel::Sender<()>,
 66    embedding_provider: Arc<dyn EmbeddingProvider>,
 67    _maintain_status: Task<()>,
 68    _subscription: Subscription,
 69}
 70
 71impl ProjectIndex {
 72    pub fn new(
 73        project: Model<Project>,
 74        db_connection: heed::Env,
 75        embedding_provider: Arc<dyn EmbeddingProvider>,
 76        cx: &mut ModelContext<Self>,
 77    ) -> Self {
 78        let language_registry = project.read(cx).languages().clone();
 79        let fs = project.read(cx).fs().clone();
 80        let (status_tx, mut status_rx) = channel::unbounded();
 81        let mut this = ProjectIndex {
 82            db_connection,
 83            project: project.downgrade(),
 84            worktree_indices: HashMap::default(),
 85            language_registry,
 86            fs,
 87            status_tx,
 88            last_status: Status::Idle,
 89            embedding_provider,
 90            _subscription: cx.subscribe(&project, Self::handle_project_event),
 91            _maintain_status: cx.spawn(|this, mut cx| async move {
 92                while status_rx.next().await.is_some() {
 93                    if this
 94                        .update(&mut cx, |this, cx| this.update_status(cx))
 95                        .is_err()
 96                    {
 97                        break;
 98                    }
 99                }
100            }),
101        };
102        this.update_worktree_indices(cx);
103        this
104    }
105
106    pub fn status(&self) -> Status {
107        self.last_status
108    }
109
110    pub fn project(&self) -> WeakModel<Project> {
111        self.project.clone()
112    }
113
114    pub fn fs(&self) -> Arc<dyn Fs> {
115        self.fs.clone()
116    }
117
118    fn handle_project_event(
119        &mut self,
120        _: Model<Project>,
121        event: &project::Event,
122        cx: &mut ModelContext<Self>,
123    ) {
124        match event {
125            project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
126                self.update_worktree_indices(cx);
127            }
128            _ => {}
129        }
130    }
131
132    fn update_worktree_indices(&mut self, cx: &mut ModelContext<Self>) {
133        let Some(project) = self.project.upgrade() else {
134            return;
135        };
136
137        let worktrees = project
138            .read(cx)
139            .visible_worktrees(cx)
140            .filter_map(|worktree| {
141                if worktree.read(cx).is_local() {
142                    Some((worktree.entity_id(), worktree))
143                } else {
144                    None
145                }
146            })
147            .collect::<HashMap<_, _>>();
148
149        self.worktree_indices
150            .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
151        for (worktree_id, worktree) in worktrees {
152            self.worktree_indices.entry(worktree_id).or_insert_with(|| {
153                let worktree_index = WorktreeIndex::load(
154                    worktree.clone(),
155                    self.db_connection.clone(),
156                    self.language_registry.clone(),
157                    self.fs.clone(),
158                    self.status_tx.clone(),
159                    self.embedding_provider.clone(),
160                    cx,
161                );
162
163                let load_worktree = cx.spawn(|this, mut cx| async move {
164                    let result = match worktree_index.await {
165                        Ok(worktree_index) => {
166                            this.update(&mut cx, |this, _| {
167                                this.worktree_indices.insert(
168                                    worktree_id,
169                                    WorktreeIndexHandle::Loaded {
170                                        index: worktree_index.clone(),
171                                    },
172                                );
173                            })?;
174                            Ok(worktree_index)
175                        }
176                        Err(error) => {
177                            this.update(&mut cx, |this, _cx| {
178                                this.worktree_indices.remove(&worktree_id)
179                            })?;
180                            Err(Arc::new(error))
181                        }
182                    };
183
184                    this.update(&mut cx, |this, cx| this.update_status(cx))?;
185
186                    result
187                });
188
189                WorktreeIndexHandle::Loading {
190                    index: load_worktree.shared(),
191                }
192            });
193        }
194
195        self.update_status(cx);
196    }
197
198    fn update_status(&mut self, cx: &mut ModelContext<Self>) {
199        let mut indexing_count = 0;
200        let mut any_loading = false;
201
202        for index in self.worktree_indices.values_mut() {
203            match index {
204                WorktreeIndexHandle::Loading { .. } => {
205                    any_loading = true;
206                    break;
207                }
208                WorktreeIndexHandle::Loaded { index, .. } => {
209                    indexing_count += index.read(cx).entry_ids_being_indexed().len();
210                }
211            }
212        }
213
214        let status = if any_loading {
215            Status::Loading
216        } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
217            Status::Scanning { remaining_count }
218        } else {
219            Status::Idle
220        };
221
222        if status != self.last_status {
223            self.last_status = status;
224            cx.emit(status);
225        }
226    }
227
228    pub fn search(
229        &self,
230        query: String,
231        limit: usize,
232        cx: &AppContext,
233    ) -> Task<Result<Vec<SearchResult>>> {
234        let (chunks_tx, chunks_rx) = channel::bounded(1024);
235        let mut worktree_scan_tasks = Vec::new();
236        for worktree_index in self.worktree_indices.values() {
237            let worktree_index = worktree_index.clone();
238            let chunks_tx = chunks_tx.clone();
239            worktree_scan_tasks.push(cx.spawn(|cx| async move {
240                let index = match worktree_index {
241                    WorktreeIndexHandle::Loading { index } => {
242                        index.clone().await.map_err(|error| anyhow!(error))?
243                    }
244                    WorktreeIndexHandle::Loaded { index } => index.clone(),
245                };
246
247                index
248                    .read_with(&cx, |index, cx| {
249                        let worktree_id = index.worktree().read(cx).id();
250                        let db_connection = index.db_connection().clone();
251                        let db = *index.embedding_index().db();
252                        cx.background_executor().spawn(async move {
253                            let txn = db_connection
254                                .read_txn()
255                                .context("failed to create read transaction")?;
256                            let db_entries = db.iter(&txn).context("failed to iterate database")?;
257                            for db_entry in db_entries {
258                                let (_key, db_embedded_file) = db_entry?;
259                                for chunk in db_embedded_file.chunks {
260                                    chunks_tx
261                                        .send((worktree_id, db_embedded_file.path.clone(), chunk))
262                                        .await?;
263                                }
264                            }
265                            anyhow::Ok(())
266                        })
267                    })?
268                    .await
269            }));
270        }
271        drop(chunks_tx);
272
273        let project = self.project.clone();
274        let embedding_provider = self.embedding_provider.clone();
275        cx.spawn(|cx| async move {
276            #[cfg(debug_assertions)]
277            let embedding_query_start = std::time::Instant::now();
278            log::info!("Searching for {query}");
279
280            let query_embeddings = embedding_provider
281                .embed(&[TextToEmbed::new(&query)])
282                .await?;
283            let query_embedding = query_embeddings
284                .into_iter()
285                .next()
286                .ok_or_else(|| anyhow!("no embedding for query"))?;
287
288            let mut results_by_worker = Vec::new();
289            for _ in 0..cx.background_executor().num_cpus() {
290                results_by_worker.push(Vec::<WorktreeSearchResult>::new());
291            }
292
293            #[cfg(debug_assertions)]
294            let search_start = std::time::Instant::now();
295
296            cx.background_executor()
297                .scoped(|cx| {
298                    for results in results_by_worker.iter_mut() {
299                        cx.spawn(async {
300                            while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
301                                let score = chunk.embedding.similarity(&query_embedding);
302                                let ix = match results.binary_search_by(|probe| {
303                                    score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
304                                }) {
305                                    Ok(ix) | Err(ix) => ix,
306                                };
307                                results.insert(
308                                    ix,
309                                    WorktreeSearchResult {
310                                        worktree_id,
311                                        path: path.clone(),
312                                        range: chunk.chunk.range.clone(),
313                                        score,
314                                    },
315                                );
316                                results.truncate(limit);
317                            }
318                        });
319                    }
320                })
321                .await;
322
323            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
324                scan_task.log_err();
325            }
326
327            project.read_with(&cx, |project, cx| {
328                let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
329                for worker_results in results_by_worker {
330                    search_results.extend(worker_results.into_iter().filter_map(|result| {
331                        Some(SearchResult {
332                            worktree: project.worktree_for_id(result.worktree_id, cx)?,
333                            path: result.path,
334                            range: result.range,
335                            score: result.score,
336                        })
337                    }));
338                }
339                search_results.sort_unstable_by(|a, b| {
340                    b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
341                });
342                search_results.truncate(limit);
343
344                #[cfg(debug_assertions)]
345                {
346                    let search_elapsed = search_start.elapsed();
347                    log::debug!(
348                        "searched {} entries in {:?}",
349                        search_results.len(),
350                        search_elapsed
351                    );
352                    let embedding_query_elapsed = embedding_query_start.elapsed();
353                    log::debug!("embedding query took {:?}", embedding_query_elapsed);
354                }
355
356                search_results
357            })
358        })
359    }
360
361    #[cfg(test)]
362    pub fn path_count(&self, cx: &AppContext) -> Result<u64> {
363        let mut result = 0;
364        for worktree_index in self.worktree_indices.values() {
365            if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
366                result += index.read(cx).path_count()?;
367            }
368        }
369        Ok(result)
370    }
371
372    pub(crate) fn worktree_index(
373        &self,
374        worktree_id: WorktreeId,
375        cx: &AppContext,
376    ) -> Option<Model<WorktreeIndex>> {
377        for index in self.worktree_indices.values() {
378            if let WorktreeIndexHandle::Loaded { index, .. } = index {
379                if index.read(cx).worktree().read(cx).id() == worktree_id {
380                    return Some(index.clone());
381                }
382            }
383        }
384        None
385    }
386
387    pub(crate) fn worktree_indices(&self, cx: &AppContext) -> Vec<Model<WorktreeIndex>> {
388        let mut result = self
389            .worktree_indices
390            .values()
391            .filter_map(|index| {
392                if let WorktreeIndexHandle::Loaded { index, .. } = index {
393                    Some(index.clone())
394                } else {
395                    None
396                }
397            })
398            .collect::<Vec<_>>();
399        result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
400        result
401    }
402
403    pub fn all_summaries(&self, cx: &AppContext) -> Task<Result<Vec<FileSummary>>> {
404        let (summaries_tx, summaries_rx) = channel::bounded(1024);
405        let mut worktree_scan_tasks = Vec::new();
406        for worktree_index in self.worktree_indices.values() {
407            let worktree_index = worktree_index.clone();
408            let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
409            worktree_scan_tasks.push(cx.spawn(|cx| async move {
410                let index = match worktree_index {
411                    WorktreeIndexHandle::Loading { index } => {
412                        index.clone().await.map_err(|error| anyhow!(error))?
413                    }
414                    WorktreeIndexHandle::Loaded { index } => index.clone(),
415                };
416
417                index
418                    .read_with(&cx, |index, cx| {
419                        let db_connection = index.db_connection().clone();
420                        let summary_index = index.summary_index();
421                        let file_digest_db = summary_index.file_digest_db();
422                        let summary_db = summary_index.summary_db();
423
424                        cx.background_executor().spawn(async move {
425                            let txn = db_connection
426                                .read_txn()
427                                .context("failed to create db read transaction")?;
428                            let db_entries = file_digest_db
429                                .iter(&txn)
430                                .context("failed to iterate database")?;
431                            for db_entry in db_entries {
432                                let (file_path, db_file) = db_entry?;
433
434                                match summary_db.get(&txn, &db_file.digest) {
435                                    Ok(opt_summary) => {
436                                        // Currently, we only use summaries we already have. If the file hasn't been
437                                        // summarized yet, then we skip it and don't include it in the inferred context.
438                                        // If we want to do just-in-time summarization, this would be the place to do it!
439                                        if let Some(summary) = opt_summary {
440                                            summaries_tx
441                                                .send((file_path.to_string(), summary.to_string()))
442                                                .await?;
443                                        } else {
444                                            log::warn!("No summary found for {:?}", &db_file);
445                                        }
446                                    }
447                                    Err(err) => {
448                                        log::error!(
449                                            "Error reading from summary database: {:?}",
450                                            err
451                                        );
452                                    }
453                                }
454                            }
455                            anyhow::Ok(())
456                        })
457                    })?
458                    .await
459            }));
460        }
461        drop(summaries_tx);
462
463        let project = self.project.clone();
464        cx.spawn(|cx| async move {
465            let mut results_by_worker = Vec::new();
466            for _ in 0..cx.background_executor().num_cpus() {
467                results_by_worker.push(Vec::<FileSummary>::new());
468            }
469
470            cx.background_executor()
471                .scoped(|cx| {
472                    for results in results_by_worker.iter_mut() {
473                        cx.spawn(async {
474                            while let Ok((filename, summary)) = summaries_rx.recv().await {
475                                results.push(FileSummary { filename, summary });
476                            }
477                        });
478                    }
479                })
480                .await;
481
482            for scan_task in futures::future::join_all(worktree_scan_tasks).await {
483                scan_task.log_err();
484            }
485
486            project.read_with(&cx, |_project, _cx| {
487                results_by_worker.into_iter().flatten().collect()
488            })
489        })
490    }
491
492    /// Empty out the backlogs of all the worktrees in the project
493    pub fn flush_summary_backlogs(&self, cx: &AppContext) -> impl Future<Output = ()> {
494        let flush_start = std::time::Instant::now();
495
496        futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
497            let worktree_index = worktree_index.clone();
498
499            cx.spawn(|cx| async move {
500                let index = match worktree_index {
501                    WorktreeIndexHandle::Loading { index } => {
502                        index.clone().await.map_err(|error| anyhow!(error))?
503                    }
504                    WorktreeIndexHandle::Loaded { index } => index.clone(),
505                };
506                let worktree_abs_path =
507                    cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
508
509                index
510                    .read_with(&cx, |index, cx| {
511                        cx.background_executor()
512                            .spawn(index.summary_index().flush_backlog(worktree_abs_path, cx))
513                    })?
514                    .await
515            })
516        }))
517        .map(move |results| {
518            // Log any errors, but don't block the user. These summaries are supposed to
519            // improve quality by providing extra context, but they aren't hard requirements!
520            for result in results {
521                if let Err(err) = result {
522                    log::error!("Error flushing summary backlog: {:?}", err);
523                }
524            }
525
526            log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
527        })
528    }
529
530    pub fn remaining_summaries(&self, cx: &mut ModelContext<Self>) -> usize {
531        self.worktree_indices(cx)
532            .iter()
533            .map(|index| index.read(cx).summary_index().backlog_len())
534            .sum()
535    }
536}
537
538impl EventEmitter<Status> for ProjectIndex {}