1use crate::embedding::EmbeddingProvider;
2use crate::embedding_index::EmbeddingIndex;
3use crate::indexing::IndexingEntrySet;
4use crate::summary_index::SummaryIndex;
5use anyhow::Result;
6use feature_flags::{AutoCommand, FeatureFlagAppExt};
7use fs::Fs;
8use futures::future::Shared;
9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Subscription, Task, WeakEntity};
10use language::LanguageRegistry;
11use log;
12use project::{UpdatedEntriesSet, Worktree};
13use smol::channel;
14use std::sync::Arc;
15use util::ResultExt;
16
17#[derive(Clone)]
18pub enum WorktreeIndexHandle {
19 Loading {
20 index: Shared<Task<Result<Entity<WorktreeIndex>, Arc<anyhow::Error>>>>,
21 },
22 Loaded {
23 index: Entity<WorktreeIndex>,
24 },
25}
26
27pub struct WorktreeIndex {
28 worktree: Entity<Worktree>,
29 db_connection: heed::Env,
30 embedding_index: EmbeddingIndex,
31 summary_index: SummaryIndex,
32 entry_ids_being_indexed: Arc<IndexingEntrySet>,
33 _index_entries: Task<Result<()>>,
34 _subscription: Subscription,
35}
36
37impl WorktreeIndex {
38 pub fn load(
39 worktree: Entity<Worktree>,
40 db_connection: heed::Env,
41 language_registry: Arc<LanguageRegistry>,
42 fs: Arc<dyn Fs>,
43 status_tx: channel::Sender<()>,
44 embedding_provider: Arc<dyn EmbeddingProvider>,
45 cx: &mut App,
46 ) -> Task<Result<Entity<Self>>> {
47 let worktree_for_index = worktree.clone();
48 let worktree_for_summary = worktree.clone();
49 let worktree_abs_path = worktree.read(cx).abs_path();
50 let embedding_fs = Arc::clone(&fs);
51 let summary_fs = fs;
52 cx.spawn(|mut cx| async move {
53 let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
54 let (embedding_index, summary_index) = cx
55 .background_executor()
56 .spawn({
57 let entries_being_indexed = Arc::clone(&entries_being_indexed);
58 let db_connection = db_connection.clone();
59 async move {
60 let mut txn = db_connection.write_txn()?;
61 let embedding_index = {
62 let db_name = worktree_abs_path.to_string_lossy();
63 let db = db_connection.create_database(&mut txn, Some(&db_name))?;
64
65 EmbeddingIndex::new(
66 worktree_for_index,
67 embedding_fs,
68 db_connection.clone(),
69 db,
70 language_registry,
71 embedding_provider,
72 Arc::clone(&entries_being_indexed),
73 )
74 };
75 let summary_index = {
76 let file_digest_db = {
77 let db_name =
78 // Prepend something that wouldn't be found at the beginning of an
79 // absolute path, so we don't get db key namespace conflicts with
80 // embeddings, which use the abs path as a key.
81 format!("digests-{}", worktree_abs_path.to_string_lossy());
82 db_connection.create_database(&mut txn, Some(&db_name))?
83 };
84 let summary_db = {
85 let db_name =
86 // Prepend something that wouldn't be found at the beginning of an
87 // absolute path, so we don't get db key namespace conflicts with
88 // embeddings, which use the abs path as a key.
89 format!("summaries-{}", worktree_abs_path.to_string_lossy());
90 db_connection.create_database(&mut txn, Some(&db_name))?
91 };
92 SummaryIndex::new(
93 worktree_for_summary,
94 summary_fs,
95 db_connection.clone(),
96 file_digest_db,
97 summary_db,
98 Arc::clone(&entries_being_indexed),
99 )
100 };
101 txn.commit()?;
102 anyhow::Ok((embedding_index, summary_index))
103 }
104 })
105 .await?;
106
107 cx.new(|cx| {
108 Self::new(
109 worktree,
110 db_connection,
111 embedding_index,
112 summary_index,
113 entries_being_indexed,
114 cx,
115 )
116 })
117 })
118 }
119
120 #[allow(clippy::too_many_arguments)]
121 pub fn new(
122 worktree: Entity<Worktree>,
123 db_connection: heed::Env,
124 embedding_index: EmbeddingIndex,
125 summary_index: SummaryIndex,
126 entry_ids_being_indexed: Arc<IndexingEntrySet>,
127 cx: &mut Context<Self>,
128 ) -> Self {
129 let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
130 let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
131 if let worktree::Event::UpdatedEntries(update) = event {
132 log::debug!("Updating entries...");
133 _ = updated_entries_tx.try_send(update.clone());
134 }
135 });
136
137 Self {
138 db_connection,
139 embedding_index,
140 summary_index,
141 worktree,
142 entry_ids_being_indexed,
143 _index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)),
144 _subscription,
145 }
146 }
147
148 pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
149 self.entry_ids_being_indexed.as_ref()
150 }
151
152 pub fn worktree(&self) -> &Entity<Worktree> {
153 &self.worktree
154 }
155
156 pub fn db_connection(&self) -> &heed::Env {
157 &self.db_connection
158 }
159
160 pub fn embedding_index(&self) -> &EmbeddingIndex {
161 &self.embedding_index
162 }
163
164 pub fn summary_index(&self) -> &SummaryIndex {
165 &self.summary_index
166 }
167
168 async fn index_entries(
169 this: WeakEntity<Self>,
170 updated_entries: channel::Receiver<UpdatedEntriesSet>,
171 mut cx: AsyncApp,
172 ) -> Result<()> {
173 let is_auto_available = cx.update(|cx| cx.wait_for_flag::<AutoCommand>())?.await;
174 let index = this.update(&mut cx, |this, cx| {
175 futures::future::try_join(
176 this.embedding_index.index_entries_changed_on_disk(cx),
177 this.summary_index
178 .index_entries_changed_on_disk(is_auto_available, cx),
179 )
180 })?;
181 index.await.log_err();
182
183 while let Ok(updated_entries) = updated_entries.recv().await {
184 let is_auto_available = cx
185 .update(|cx| cx.has_flag::<AutoCommand>())
186 .unwrap_or(false);
187
188 let index = this.update(&mut cx, |this, cx| {
189 futures::future::try_join(
190 this.embedding_index
191 .index_updated_entries(updated_entries.clone(), cx),
192 this.summary_index.index_updated_entries(
193 updated_entries,
194 is_auto_available,
195 cx,
196 ),
197 )
198 })?;
199 index.await.log_err();
200 }
201
202 Ok(())
203 }
204
205 #[cfg(test)]
206 pub fn path_count(&self) -> Result<u64> {
207 use anyhow::Context as _;
208
209 let txn = self
210 .db_connection
211 .read_txn()
212 .context("failed to create read transaction")?;
213 Ok(self.embedding_index().db().len(&txn)?)
214 }
215}