worktree_index.rs

  1use crate::embedding::EmbeddingProvider;
  2use crate::embedding_index::EmbeddingIndex;
  3use crate::indexing::IndexingEntrySet;
  4use crate::summary_index::SummaryIndex;
  5use anyhow::Result;
  6use feature_flags::{AutoCommand, FeatureFlagAppExt};
  7use fs::Fs;
  8use futures::future::Shared;
  9use gpui::{
 10    AppContext, AsyncAppContext, Context, Model, ModelContext, Subscription, Task, WeakModel,
 11};
 12use language::LanguageRegistry;
 13use log;
 14use project::{UpdatedEntriesSet, Worktree};
 15use smol::channel;
 16use std::sync::Arc;
 17use util::ResultExt;
 18
 19#[derive(Clone)]
 20pub enum WorktreeIndexHandle {
 21    Loading {
 22        index: Shared<Task<Result<Model<WorktreeIndex>, Arc<anyhow::Error>>>>,
 23    },
 24    Loaded {
 25        index: Model<WorktreeIndex>,
 26    },
 27}
 28
 29pub struct WorktreeIndex {
 30    worktree: Model<Worktree>,
 31    db_connection: heed::Env,
 32    embedding_index: EmbeddingIndex,
 33    summary_index: SummaryIndex,
 34    entry_ids_being_indexed: Arc<IndexingEntrySet>,
 35    _index_entries: Task<Result<()>>,
 36    _subscription: Subscription,
 37}
 38
 39impl WorktreeIndex {
 40    pub fn load(
 41        worktree: Model<Worktree>,
 42        db_connection: heed::Env,
 43        language_registry: Arc<LanguageRegistry>,
 44        fs: Arc<dyn Fs>,
 45        status_tx: channel::Sender<()>,
 46        embedding_provider: Arc<dyn EmbeddingProvider>,
 47        cx: &mut AppContext,
 48    ) -> Task<Result<Model<Self>>> {
 49        let worktree_for_index = worktree.clone();
 50        let worktree_for_summary = worktree.clone();
 51        let worktree_abs_path = worktree.read(cx).abs_path();
 52        let embedding_fs = Arc::clone(&fs);
 53        let summary_fs = fs;
 54        cx.spawn(|mut cx| async move {
 55            let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
 56            let (embedding_index, summary_index) = cx
 57                .background_executor()
 58                .spawn({
 59                    let entries_being_indexed = Arc::clone(&entries_being_indexed);
 60                    let db_connection = db_connection.clone();
 61                    async move {
 62                        let mut txn = db_connection.write_txn()?;
 63                        let embedding_index = {
 64                            let db_name = worktree_abs_path.to_string_lossy();
 65                            let db = db_connection.create_database(&mut txn, Some(&db_name))?;
 66
 67                            EmbeddingIndex::new(
 68                                worktree_for_index,
 69                                embedding_fs,
 70                                db_connection.clone(),
 71                                db,
 72                                language_registry,
 73                                embedding_provider,
 74                                Arc::clone(&entries_being_indexed),
 75                            )
 76                        };
 77                        let summary_index = {
 78                            let file_digest_db = {
 79                                let db_name =
 80                                // Prepend something that wouldn't be found at the beginning of an
 81                                // absolute path, so we don't get db key namespace conflicts with
 82                                // embeddings, which use the abs path as a key.
 83                                format!("digests-{}", worktree_abs_path.to_string_lossy());
 84                                db_connection.create_database(&mut txn, Some(&db_name))?
 85                            };
 86                            let summary_db = {
 87                                let db_name =
 88                                // Prepend something that wouldn't be found at the beginning of an
 89                                // absolute path, so we don't get db key namespace conflicts with
 90                                // embeddings, which use the abs path as a key.
 91                                format!("summaries-{}", worktree_abs_path.to_string_lossy());
 92                                db_connection.create_database(&mut txn, Some(&db_name))?
 93                            };
 94                            SummaryIndex::new(
 95                                worktree_for_summary,
 96                                summary_fs,
 97                                db_connection.clone(),
 98                                file_digest_db,
 99                                summary_db,
100                                Arc::clone(&entries_being_indexed),
101                            )
102                        };
103                        txn.commit()?;
104                        anyhow::Ok((embedding_index, summary_index))
105                    }
106                })
107                .await?;
108
109            cx.new_model(|cx| {
110                Self::new(
111                    worktree,
112                    db_connection,
113                    embedding_index,
114                    summary_index,
115                    entries_being_indexed,
116                    cx,
117                )
118            })
119        })
120    }
121
122    #[allow(clippy::too_many_arguments)]
123    pub fn new(
124        worktree: Model<Worktree>,
125        db_connection: heed::Env,
126        embedding_index: EmbeddingIndex,
127        summary_index: SummaryIndex,
128        entry_ids_being_indexed: Arc<IndexingEntrySet>,
129        cx: &mut ModelContext<Self>,
130    ) -> Self {
131        let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
132        let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
133            if let worktree::Event::UpdatedEntries(update) = event {
134                log::debug!("Updating entries...");
135                _ = updated_entries_tx.try_send(update.clone());
136            }
137        });
138
139        Self {
140            db_connection,
141            embedding_index,
142            summary_index,
143            worktree,
144            entry_ids_being_indexed,
145            _index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)),
146            _subscription,
147        }
148    }
149
150    pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
151        self.entry_ids_being_indexed.as_ref()
152    }
153
154    pub fn worktree(&self) -> &Model<Worktree> {
155        &self.worktree
156    }
157
158    pub fn db_connection(&self) -> &heed::Env {
159        &self.db_connection
160    }
161
162    pub fn embedding_index(&self) -> &EmbeddingIndex {
163        &self.embedding_index
164    }
165
166    pub fn summary_index(&self) -> &SummaryIndex {
167        &self.summary_index
168    }
169
170    async fn index_entries(
171        this: WeakModel<Self>,
172        updated_entries: channel::Receiver<UpdatedEntriesSet>,
173        mut cx: AsyncAppContext,
174    ) -> Result<()> {
175        let is_auto_available = cx.update(|cx| cx.wait_for_flag::<AutoCommand>())?.await;
176        let index = this.update(&mut cx, |this, cx| {
177            futures::future::try_join(
178                this.embedding_index.index_entries_changed_on_disk(cx),
179                this.summary_index
180                    .index_entries_changed_on_disk(is_auto_available, cx),
181            )
182        })?;
183        index.await.log_err();
184
185        while let Ok(updated_entries) = updated_entries.recv().await {
186            let is_auto_available = cx
187                .update(|cx| cx.has_flag::<AutoCommand>())
188                .unwrap_or(false);
189
190            let index = this.update(&mut cx, |this, cx| {
191                futures::future::try_join(
192                    this.embedding_index
193                        .index_updated_entries(updated_entries.clone(), cx),
194                    this.summary_index.index_updated_entries(
195                        updated_entries,
196                        is_auto_available,
197                        cx,
198                    ),
199                )
200            })?;
201            index.await.log_err();
202        }
203
204        Ok(())
205    }
206
207    #[cfg(test)]
208    pub fn path_count(&self) -> Result<u64> {
209        use anyhow::Context;
210
211        let txn = self
212            .db_connection
213            .read_txn()
214            .context("failed to create read transaction")?;
215        Ok(self.embedding_index().db().len(&txn)?)
216    }
217}