worktree_index.rs

  1use crate::embedding::EmbeddingProvider;
  2use crate::embedding_index::EmbeddingIndex;
  3use crate::indexing::IndexingEntrySet;
  4use crate::summary_index::SummaryIndex;
  5use anyhow::Result;
  6use feature_flags::{AutoCommand, FeatureFlagAppExt};
  7use fs::Fs;
  8use futures::future::Shared;
  9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Subscription, Task, WeakEntity};
 10use language::LanguageRegistry;
 11use log;
 12use project::{UpdatedEntriesSet, Worktree};
 13use smol::channel;
 14use std::sync::Arc;
 15use util::ResultExt;
 16
 17#[derive(Clone)]
 18pub enum WorktreeIndexHandle {
 19    Loading {
 20        index: Shared<Task<Result<Entity<WorktreeIndex>, Arc<anyhow::Error>>>>,
 21    },
 22    Loaded {
 23        index: Entity<WorktreeIndex>,
 24    },
 25}
 26
 27pub struct WorktreeIndex {
 28    worktree: Entity<Worktree>,
 29    db_connection: heed::Env,
 30    embedding_index: EmbeddingIndex,
 31    summary_index: SummaryIndex,
 32    entry_ids_being_indexed: Arc<IndexingEntrySet>,
 33    _index_entries: Task<Result<()>>,
 34    _subscription: Subscription,
 35}
 36
 37impl WorktreeIndex {
 38    pub fn load(
 39        worktree: Entity<Worktree>,
 40        db_connection: heed::Env,
 41        language_registry: Arc<LanguageRegistry>,
 42        fs: Arc<dyn Fs>,
 43        status_tx: channel::Sender<()>,
 44        embedding_provider: Arc<dyn EmbeddingProvider>,
 45        cx: &mut App,
 46    ) -> Task<Result<Entity<Self>>> {
 47        let worktree_for_index = worktree.clone();
 48        let worktree_for_summary = worktree.clone();
 49        let worktree_abs_path = worktree.read(cx).abs_path();
 50        let embedding_fs = Arc::clone(&fs);
 51        let summary_fs = fs;
 52        cx.spawn(async move |cx| {
 53            let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
 54            let (embedding_index, summary_index) = cx
 55                .background_spawn({
 56                    let entries_being_indexed = Arc::clone(&entries_being_indexed);
 57                    let db_connection = db_connection.clone();
 58                    async move {
 59                        let mut txn = db_connection.write_txn()?;
 60                        let embedding_index = {
 61                            let db_name = worktree_abs_path.to_string_lossy();
 62                            let db = db_connection.create_database(&mut txn, Some(&db_name))?;
 63
 64                            EmbeddingIndex::new(
 65                                worktree_for_index,
 66                                embedding_fs,
 67                                db_connection.clone(),
 68                                db,
 69                                language_registry,
 70                                embedding_provider,
 71                                Arc::clone(&entries_being_indexed),
 72                            )
 73                        };
 74                        let summary_index = {
 75                            let file_digest_db = {
 76                                let db_name =
 77                                // Prepend something that wouldn't be found at the beginning of an
 78                                // absolute path, so we don't get db key namespace conflicts with
 79                                // embeddings, which use the abs path as a key.
 80                                format!("digests-{}", worktree_abs_path.to_string_lossy());
 81                                db_connection.create_database(&mut txn, Some(&db_name))?
 82                            };
 83                            let summary_db = {
 84                                let db_name =
 85                                // Prepend something that wouldn't be found at the beginning of an
 86                                // absolute path, so we don't get db key namespace conflicts with
 87                                // embeddings, which use the abs path as a key.
 88                                format!("summaries-{}", worktree_abs_path.to_string_lossy());
 89                                db_connection.create_database(&mut txn, Some(&db_name))?
 90                            };
 91                            SummaryIndex::new(
 92                                worktree_for_summary,
 93                                summary_fs,
 94                                db_connection.clone(),
 95                                file_digest_db,
 96                                summary_db,
 97                                Arc::clone(&entries_being_indexed),
 98                            )
 99                        };
100                        txn.commit()?;
101                        anyhow::Ok((embedding_index, summary_index))
102                    }
103                })
104                .await?;
105
106            cx.new(|cx| {
107                Self::new(
108                    worktree,
109                    db_connection,
110                    embedding_index,
111                    summary_index,
112                    entries_being_indexed,
113                    cx,
114                )
115            })
116        })
117    }
118
119    pub fn new(
120        worktree: Entity<Worktree>,
121        db_connection: heed::Env,
122        embedding_index: EmbeddingIndex,
123        summary_index: SummaryIndex,
124        entry_ids_being_indexed: Arc<IndexingEntrySet>,
125        cx: &mut Context<Self>,
126    ) -> Self {
127        let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
128        let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
129            if let worktree::Event::UpdatedEntries(update) = event {
130                log::debug!("Updating entries...");
131                _ = updated_entries_tx.try_send(update.clone());
132            }
133        });
134
135        Self {
136            db_connection,
137            embedding_index,
138            summary_index,
139            worktree,
140            entry_ids_being_indexed,
141            _index_entries: cx.spawn(async move |this, cx| {
142                Self::index_entries(this, updated_entries_rx, cx).await
143            }),
144            _subscription,
145        }
146    }
147
148    pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
149        self.entry_ids_being_indexed.as_ref()
150    }
151
152    pub fn worktree(&self) -> &Entity<Worktree> {
153        &self.worktree
154    }
155
156    pub fn db_connection(&self) -> &heed::Env {
157        &self.db_connection
158    }
159
160    pub fn embedding_index(&self) -> &EmbeddingIndex {
161        &self.embedding_index
162    }
163
164    pub fn summary_index(&self) -> &SummaryIndex {
165        &self.summary_index
166    }
167
168    async fn index_entries(
169        this: WeakEntity<Self>,
170        updated_entries: channel::Receiver<UpdatedEntriesSet>,
171        cx: &mut AsyncApp,
172    ) -> Result<()> {
173        let is_auto_available = cx.update(|cx| cx.wait_for_flag::<AutoCommand>())?.await;
174        let index = this.update(cx, |this, cx| {
175            futures::future::try_join(
176                this.embedding_index.index_entries_changed_on_disk(cx),
177                this.summary_index
178                    .index_entries_changed_on_disk(is_auto_available, cx),
179            )
180        })?;
181        index.await.log_err();
182
183        while let Ok(updated_entries) = updated_entries.recv().await {
184            let is_auto_available = cx
185                .update(|cx| cx.has_flag::<AutoCommand>())
186                .unwrap_or(false);
187
188            let index = this.update(cx, |this, cx| {
189                futures::future::try_join(
190                    this.embedding_index
191                        .index_updated_entries(updated_entries.clone(), cx),
192                    this.summary_index.index_updated_entries(
193                        updated_entries,
194                        is_auto_available,
195                        cx,
196                    ),
197                )
198            })?;
199            index.await.log_err();
200        }
201
202        Ok(())
203    }
204
205    #[cfg(test)]
206    pub fn path_count(&self) -> Result<u64> {
207        use anyhow::Context as _;
208
209        let txn = self
210            .db_connection
211            .read_txn()
212            .context("failed to create read transaction")?;
213        Ok(self.embedding_index().db().len(&txn)?)
214    }
215}