1use crate::embedding::EmbeddingProvider;
2use crate::embedding_index::EmbeddingIndex;
3use crate::indexing::IndexingEntrySet;
4use crate::summary_index::SummaryIndex;
5use anyhow::Result;
6use fs::Fs;
7use futures::future::Shared;
8use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Subscription, Task, WeakEntity};
9use language::LanguageRegistry;
10use log;
11use project::{UpdatedEntriesSet, Worktree};
12use smol::channel;
13use std::sync::Arc;
14use util::ResultExt;
15
16#[derive(Clone)]
17pub enum WorktreeIndexHandle {
18 Loading {
19 index: Shared<Task<Result<Entity<WorktreeIndex>, Arc<anyhow::Error>>>>,
20 },
21 Loaded {
22 index: Entity<WorktreeIndex>,
23 },
24}
25
26pub struct WorktreeIndex {
27 worktree: Entity<Worktree>,
28 db_connection: heed::Env,
29 embedding_index: EmbeddingIndex,
30 summary_index: SummaryIndex,
31 entry_ids_being_indexed: Arc<IndexingEntrySet>,
32 _index_entries: Task<Result<()>>,
33 _subscription: Subscription,
34}
35
36impl WorktreeIndex {
37 pub fn load(
38 worktree: Entity<Worktree>,
39 db_connection: heed::Env,
40 language_registry: Arc<LanguageRegistry>,
41 fs: Arc<dyn Fs>,
42 status_tx: channel::Sender<()>,
43 embedding_provider: Arc<dyn EmbeddingProvider>,
44 cx: &mut App,
45 ) -> Task<Result<Entity<Self>>> {
46 let worktree_for_index = worktree.clone();
47 let worktree_for_summary = worktree.clone();
48 let worktree_abs_path = worktree.read(cx).abs_path();
49 let embedding_fs = Arc::clone(&fs);
50 let summary_fs = fs;
51 cx.spawn(async move |cx| {
52 let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
53 let (embedding_index, summary_index) = cx
54 .background_spawn({
55 let entries_being_indexed = Arc::clone(&entries_being_indexed);
56 let db_connection = db_connection.clone();
57 async move {
58 let mut txn = db_connection.write_txn()?;
59 let embedding_index = {
60 let db_name = worktree_abs_path.to_string_lossy();
61 let db = db_connection.create_database(&mut txn, Some(&db_name))?;
62
63 EmbeddingIndex::new(
64 worktree_for_index,
65 embedding_fs,
66 db_connection.clone(),
67 db,
68 language_registry,
69 embedding_provider,
70 Arc::clone(&entries_being_indexed),
71 )
72 };
73 let summary_index = {
74 let file_digest_db = {
75 let db_name =
76 // Prepend something that wouldn't be found at the beginning of an
77 // absolute path, so we don't get db key namespace conflicts with
78 // embeddings, which use the abs path as a key.
79 format!("digests-{}", worktree_abs_path.to_string_lossy());
80 db_connection.create_database(&mut txn, Some(&db_name))?
81 };
82 let summary_db = {
83 let db_name =
84 // Prepend something that wouldn't be found at the beginning of an
85 // absolute path, so we don't get db key namespace conflicts with
86 // embeddings, which use the abs path as a key.
87 format!("summaries-{}", worktree_abs_path.to_string_lossy());
88 db_connection.create_database(&mut txn, Some(&db_name))?
89 };
90 SummaryIndex::new(
91 worktree_for_summary,
92 summary_fs,
93 db_connection.clone(),
94 file_digest_db,
95 summary_db,
96 Arc::clone(&entries_being_indexed),
97 )
98 };
99 txn.commit()?;
100 anyhow::Ok((embedding_index, summary_index))
101 }
102 })
103 .await?;
104
105 cx.new(|cx| {
106 Self::new(
107 worktree,
108 db_connection,
109 embedding_index,
110 summary_index,
111 entries_being_indexed,
112 cx,
113 )
114 })
115 })
116 }
117
118 pub fn new(
119 worktree: Entity<Worktree>,
120 db_connection: heed::Env,
121 embedding_index: EmbeddingIndex,
122 summary_index: SummaryIndex,
123 entry_ids_being_indexed: Arc<IndexingEntrySet>,
124 cx: &mut Context<Self>,
125 ) -> Self {
126 let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
127 let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
128 if let worktree::Event::UpdatedEntries(update) = event {
129 log::debug!("Updating entries...");
130 _ = updated_entries_tx.try_send(update.clone());
131 }
132 });
133
134 Self {
135 db_connection,
136 embedding_index,
137 summary_index,
138 worktree,
139 entry_ids_being_indexed,
140 _index_entries: cx.spawn(async move |this, cx| {
141 Self::index_entries(this, updated_entries_rx, cx).await
142 }),
143 _subscription,
144 }
145 }
146
147 pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
148 self.entry_ids_being_indexed.as_ref()
149 }
150
151 pub fn worktree(&self) -> &Entity<Worktree> {
152 &self.worktree
153 }
154
155 pub fn db_connection(&self) -> &heed::Env {
156 &self.db_connection
157 }
158
159 pub fn embedding_index(&self) -> &EmbeddingIndex {
160 &self.embedding_index
161 }
162
163 pub fn summary_index(&self) -> &SummaryIndex {
164 &self.summary_index
165 }
166
167 async fn index_entries(
168 this: WeakEntity<Self>,
169 updated_entries: channel::Receiver<UpdatedEntriesSet>,
170 cx: &mut AsyncApp,
171 ) -> Result<()> {
172 let index = this.update(cx, |this, cx| {
173 futures::future::try_join(
174 this.embedding_index.index_entries_changed_on_disk(cx),
175 this.summary_index.index_entries_changed_on_disk(false, cx),
176 )
177 })?;
178 index.await.log_err();
179
180 while let Ok(updated_entries) = updated_entries.recv().await {
181 let index = this.update(cx, |this, cx| {
182 futures::future::try_join(
183 this.embedding_index
184 .index_updated_entries(updated_entries.clone(), cx),
185 this.summary_index
186 .index_updated_entries(updated_entries, false, cx),
187 )
188 })?;
189 index.await.log_err();
190 }
191
192 Ok(())
193 }
194
195 #[cfg(test)]
196 pub fn path_count(&self) -> Result<u64> {
197 use anyhow::Context as _;
198
199 let txn = self
200 .db_connection
201 .read_txn()
202 .context("failed to create read transaction")?;
203 Ok(self.embedding_index().db().len(&txn)?)
204 }
205}