worktree_index.rs

  1use crate::embedding::EmbeddingProvider;
  2use crate::embedding_index::EmbeddingIndex;
  3use crate::indexing::IndexingEntrySet;
  4use crate::summary_index::SummaryIndex;
  5use anyhow::Result;
  6use fs::Fs;
  7use futures::future::Shared;
  8use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Subscription, Task, WeakEntity};
  9use language::LanguageRegistry;
 10use log;
 11use project::{UpdatedEntriesSet, Worktree};
 12use smol::channel;
 13use std::sync::Arc;
 14use util::ResultExt;
 15
 16#[derive(Clone)]
 17pub enum WorktreeIndexHandle {
 18    Loading {
 19        index: Shared<Task<Result<Entity<WorktreeIndex>, Arc<anyhow::Error>>>>,
 20    },
 21    Loaded {
 22        index: Entity<WorktreeIndex>,
 23    },
 24}
 25
 26pub struct WorktreeIndex {
 27    worktree: Entity<Worktree>,
 28    db_connection: heed::Env,
 29    embedding_index: EmbeddingIndex,
 30    summary_index: SummaryIndex,
 31    entry_ids_being_indexed: Arc<IndexingEntrySet>,
 32    _index_entries: Task<Result<()>>,
 33    _subscription: Subscription,
 34}
 35
 36impl WorktreeIndex {
 37    pub fn load(
 38        worktree: Entity<Worktree>,
 39        db_connection: heed::Env,
 40        language_registry: Arc<LanguageRegistry>,
 41        fs: Arc<dyn Fs>,
 42        status_tx: channel::Sender<()>,
 43        embedding_provider: Arc<dyn EmbeddingProvider>,
 44        cx: &mut App,
 45    ) -> Task<Result<Entity<Self>>> {
 46        let worktree_for_index = worktree.clone();
 47        let worktree_for_summary = worktree.clone();
 48        let worktree_abs_path = worktree.read(cx).abs_path();
 49        let embedding_fs = Arc::clone(&fs);
 50        let summary_fs = fs;
 51        cx.spawn(async move |cx| {
 52            let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
 53            let (embedding_index, summary_index) = cx
 54                .background_spawn({
 55                    let entries_being_indexed = Arc::clone(&entries_being_indexed);
 56                    let db_connection = db_connection.clone();
 57                    async move {
 58                        let mut txn = db_connection.write_txn()?;
 59                        let embedding_index = {
 60                            let db_name = worktree_abs_path.to_string_lossy();
 61                            let db = db_connection.create_database(&mut txn, Some(&db_name))?;
 62
 63                            EmbeddingIndex::new(
 64                                worktree_for_index,
 65                                embedding_fs,
 66                                db_connection.clone(),
 67                                db,
 68                                language_registry,
 69                                embedding_provider,
 70                                Arc::clone(&entries_being_indexed),
 71                            )
 72                        };
 73                        let summary_index = {
 74                            let file_digest_db = {
 75                                let db_name =
 76                                // Prepend something that wouldn't be found at the beginning of an
 77                                // absolute path, so we don't get db key namespace conflicts with
 78                                // embeddings, which use the abs path as a key.
 79                                format!("digests-{}", worktree_abs_path.to_string_lossy());
 80                                db_connection.create_database(&mut txn, Some(&db_name))?
 81                            };
 82                            let summary_db = {
 83                                let db_name =
 84                                // Prepend something that wouldn't be found at the beginning of an
 85                                // absolute path, so we don't get db key namespace conflicts with
 86                                // embeddings, which use the abs path as a key.
 87                                format!("summaries-{}", worktree_abs_path.to_string_lossy());
 88                                db_connection.create_database(&mut txn, Some(&db_name))?
 89                            };
 90                            SummaryIndex::new(
 91                                worktree_for_summary,
 92                                summary_fs,
 93                                db_connection.clone(),
 94                                file_digest_db,
 95                                summary_db,
 96                                Arc::clone(&entries_being_indexed),
 97                            )
 98                        };
 99                        txn.commit()?;
100                        anyhow::Ok((embedding_index, summary_index))
101                    }
102                })
103                .await?;
104
105            cx.new(|cx| {
106                Self::new(
107                    worktree,
108                    db_connection,
109                    embedding_index,
110                    summary_index,
111                    entries_being_indexed,
112                    cx,
113                )
114            })
115        })
116    }
117
118    pub fn new(
119        worktree: Entity<Worktree>,
120        db_connection: heed::Env,
121        embedding_index: EmbeddingIndex,
122        summary_index: SummaryIndex,
123        entry_ids_being_indexed: Arc<IndexingEntrySet>,
124        cx: &mut Context<Self>,
125    ) -> Self {
126        let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
127        let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
128            if let worktree::Event::UpdatedEntries(update) = event {
129                log::debug!("Updating entries...");
130                _ = updated_entries_tx.try_send(update.clone());
131            }
132        });
133
134        Self {
135            db_connection,
136            embedding_index,
137            summary_index,
138            worktree,
139            entry_ids_being_indexed,
140            _index_entries: cx.spawn(async move |this, cx| {
141                Self::index_entries(this, updated_entries_rx, cx).await
142            }),
143            _subscription,
144        }
145    }
146
147    pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
148        self.entry_ids_being_indexed.as_ref()
149    }
150
151    pub fn worktree(&self) -> &Entity<Worktree> {
152        &self.worktree
153    }
154
155    pub fn db_connection(&self) -> &heed::Env {
156        &self.db_connection
157    }
158
159    pub fn embedding_index(&self) -> &EmbeddingIndex {
160        &self.embedding_index
161    }
162
163    pub fn summary_index(&self) -> &SummaryIndex {
164        &self.summary_index
165    }
166
167    async fn index_entries(
168        this: WeakEntity<Self>,
169        updated_entries: channel::Receiver<UpdatedEntriesSet>,
170        cx: &mut AsyncApp,
171    ) -> Result<()> {
172        let index = this.update(cx, |this, cx| {
173            futures::future::try_join(
174                this.embedding_index.index_entries_changed_on_disk(cx),
175                this.summary_index.index_entries_changed_on_disk(false, cx),
176            )
177        })?;
178        index.await.log_err();
179
180        while let Ok(updated_entries) = updated_entries.recv().await {
181            let index = this.update(cx, |this, cx| {
182                futures::future::try_join(
183                    this.embedding_index
184                        .index_updated_entries(updated_entries.clone(), cx),
185                    this.summary_index
186                        .index_updated_entries(updated_entries, false, cx),
187                )
188            })?;
189            index.await.log_err();
190        }
191
192        Ok(())
193    }
194
195    #[cfg(test)]
196    pub fn path_count(&self) -> Result<u64> {
197        use anyhow::Context as _;
198
199        let txn = self
200            .db_connection
201            .read_txn()
202            .context("failed to create read transaction")?;
203        Ok(self.embedding_index().db().len(&txn)?)
204    }
205}