1use crate::embedding::EmbeddingProvider;
2use crate::embedding_index::EmbeddingIndex;
3use crate::indexing::IndexingEntrySet;
4use crate::summary_index::SummaryIndex;
5use anyhow::Result;
6use feature_flags::{AutoCommand, FeatureFlagAppExt};
7use fs::Fs;
8use futures::future::Shared;
9use gpui::{
10 AppContext, AsyncAppContext, Context, Model, ModelContext, Subscription, Task, WeakModel,
11};
12use language::LanguageRegistry;
13use log;
14use project::{UpdatedEntriesSet, Worktree};
15use smol::channel;
16use std::sync::Arc;
17use util::ResultExt;
18
19#[derive(Clone)]
20pub enum WorktreeIndexHandle {
21 Loading {
22 index: Shared<Task<Result<Model<WorktreeIndex>, Arc<anyhow::Error>>>>,
23 },
24 Loaded {
25 index: Model<WorktreeIndex>,
26 },
27}
28
29pub struct WorktreeIndex {
30 worktree: Model<Worktree>,
31 db_connection: heed::Env,
32 embedding_index: EmbeddingIndex,
33 summary_index: SummaryIndex,
34 entry_ids_being_indexed: Arc<IndexingEntrySet>,
35 _index_entries: Task<Result<()>>,
36 _subscription: Subscription,
37}
38
39impl WorktreeIndex {
40 pub fn load(
41 worktree: Model<Worktree>,
42 db_connection: heed::Env,
43 language_registry: Arc<LanguageRegistry>,
44 fs: Arc<dyn Fs>,
45 status_tx: channel::Sender<()>,
46 embedding_provider: Arc<dyn EmbeddingProvider>,
47 cx: &mut AppContext,
48 ) -> Task<Result<Model<Self>>> {
49 let worktree_for_index = worktree.clone();
50 let worktree_for_summary = worktree.clone();
51 let worktree_abs_path = worktree.read(cx).abs_path();
52 let embedding_fs = Arc::clone(&fs);
53 let summary_fs = fs;
54 cx.spawn(|mut cx| async move {
55 let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
56 let (embedding_index, summary_index) = cx
57 .background_executor()
58 .spawn({
59 let entries_being_indexed = Arc::clone(&entries_being_indexed);
60 let db_connection = db_connection.clone();
61 async move {
62 let mut txn = db_connection.write_txn()?;
63 let embedding_index = {
64 let db_name = worktree_abs_path.to_string_lossy();
65 let db = db_connection.create_database(&mut txn, Some(&db_name))?;
66
67 EmbeddingIndex::new(
68 worktree_for_index,
69 embedding_fs,
70 db_connection.clone(),
71 db,
72 language_registry,
73 embedding_provider,
74 Arc::clone(&entries_being_indexed),
75 )
76 };
77 let summary_index = {
78 let file_digest_db = {
79 let db_name =
80 // Prepend something that wouldn't be found at the beginning of an
81 // absolute path, so we don't get db key namespace conflicts with
82 // embeddings, which use the abs path as a key.
83 format!("digests-{}", worktree_abs_path.to_string_lossy());
84 db_connection.create_database(&mut txn, Some(&db_name))?
85 };
86 let summary_db = {
87 let db_name =
88 // Prepend something that wouldn't be found at the beginning of an
89 // absolute path, so we don't get db key namespace conflicts with
90 // embeddings, which use the abs path as a key.
91 format!("summaries-{}", worktree_abs_path.to_string_lossy());
92 db_connection.create_database(&mut txn, Some(&db_name))?
93 };
94 SummaryIndex::new(
95 worktree_for_summary,
96 summary_fs,
97 db_connection.clone(),
98 file_digest_db,
99 summary_db,
100 Arc::clone(&entries_being_indexed),
101 )
102 };
103 txn.commit()?;
104 anyhow::Ok((embedding_index, summary_index))
105 }
106 })
107 .await?;
108
109 cx.new_model(|cx| {
110 Self::new(
111 worktree,
112 db_connection,
113 embedding_index,
114 summary_index,
115 entries_being_indexed,
116 cx,
117 )
118 })
119 })
120 }
121
122 #[allow(clippy::too_many_arguments)]
123 pub fn new(
124 worktree: Model<Worktree>,
125 db_connection: heed::Env,
126 embedding_index: EmbeddingIndex,
127 summary_index: SummaryIndex,
128 entry_ids_being_indexed: Arc<IndexingEntrySet>,
129 cx: &mut ModelContext<Self>,
130 ) -> Self {
131 let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
132 let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
133 if let worktree::Event::UpdatedEntries(update) = event {
134 log::debug!("Updating entries...");
135 _ = updated_entries_tx.try_send(update.clone());
136 }
137 });
138
139 Self {
140 db_connection,
141 embedding_index,
142 summary_index,
143 worktree,
144 entry_ids_being_indexed,
145 _index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)),
146 _subscription,
147 }
148 }
149
150 pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
151 self.entry_ids_being_indexed.as_ref()
152 }
153
154 pub fn worktree(&self) -> &Model<Worktree> {
155 &self.worktree
156 }
157
158 pub fn db_connection(&self) -> &heed::Env {
159 &self.db_connection
160 }
161
162 pub fn embedding_index(&self) -> &EmbeddingIndex {
163 &self.embedding_index
164 }
165
166 pub fn summary_index(&self) -> &SummaryIndex {
167 &self.summary_index
168 }
169
170 async fn index_entries(
171 this: WeakModel<Self>,
172 updated_entries: channel::Receiver<UpdatedEntriesSet>,
173 mut cx: AsyncAppContext,
174 ) -> Result<()> {
175 let is_auto_available = cx.update(|cx| cx.wait_for_flag::<AutoCommand>())?.await;
176 let index = this.update(&mut cx, |this, cx| {
177 futures::future::try_join(
178 this.embedding_index.index_entries_changed_on_disk(cx),
179 this.summary_index
180 .index_entries_changed_on_disk(is_auto_available, cx),
181 )
182 })?;
183 index.await.log_err();
184
185 while let Ok(updated_entries) = updated_entries.recv().await {
186 let is_auto_available = cx
187 .update(|cx| cx.has_flag::<AutoCommand>())
188 .unwrap_or(false);
189
190 let index = this.update(&mut cx, |this, cx| {
191 futures::future::try_join(
192 this.embedding_index
193 .index_updated_entries(updated_entries.clone(), cx),
194 this.summary_index.index_updated_entries(
195 updated_entries,
196 is_auto_available,
197 cx,
198 ),
199 )
200 })?;
201 index.await.log_err();
202 }
203
204 Ok(())
205 }
206
207 #[cfg(test)]
208 pub fn path_count(&self) -> Result<u64> {
209 use anyhow::Context;
210
211 let txn = self
212 .db_connection
213 .read_txn()
214 .context("failed to create read transaction")?;
215 Ok(self.embedding_index().db().len(&txn)?)
216 }
217}