1use crate::{
2 embedding::{EmbeddingProvider, TextToEmbed},
3 summary_index::FileSummary,
4 worktree_index::{WorktreeIndex, WorktreeIndexHandle},
5};
6use anyhow::{anyhow, Context, Result};
7use collections::HashMap;
8use fs::Fs;
9use futures::{stream::StreamExt, FutureExt};
10use gpui::{
11 AppContext, Entity, EntityId, EventEmitter, Model, ModelContext, Subscription, Task, WeakModel,
12};
13use language::LanguageRegistry;
14use log;
15use project::{Project, Worktree, WorktreeId};
16use serde::{Deserialize, Serialize};
17use smol::channel;
18use std::{
19 cmp::Ordering,
20 future::Future,
21 num::NonZeroUsize,
22 ops::{Range, RangeInclusive},
23 path::{Path, PathBuf},
24 sync::Arc,
25};
26use util::ResultExt;
27
28#[derive(Debug)]
29pub struct SearchResult {
30 pub worktree: Model<Worktree>,
31 pub path: Arc<Path>,
32 pub range: Range<usize>,
33 pub score: f32,
34}
35
36pub struct LoadedSearchResult {
37 pub path: Arc<Path>,
38 pub range: Range<usize>,
39 pub full_path: PathBuf,
40 pub file_content: String,
41 pub row_range: RangeInclusive<u32>,
42}
43
44pub struct WorktreeSearchResult {
45 pub worktree_id: WorktreeId,
46 pub path: Arc<Path>,
47 pub range: Range<usize>,
48 pub score: f32,
49}
50
51#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
52pub enum Status {
53 Idle,
54 Loading,
55 Scanning { remaining_count: NonZeroUsize },
56}
57
58pub struct ProjectIndex {
59 db_connection: heed::Env,
60 project: WeakModel<Project>,
61 worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
62 language_registry: Arc<LanguageRegistry>,
63 fs: Arc<dyn Fs>,
64 last_status: Status,
65 status_tx: channel::Sender<()>,
66 embedding_provider: Arc<dyn EmbeddingProvider>,
67 _maintain_status: Task<()>,
68 _subscription: Subscription,
69}
70
71impl ProjectIndex {
72 pub fn new(
73 project: Model<Project>,
74 db_connection: heed::Env,
75 embedding_provider: Arc<dyn EmbeddingProvider>,
76 cx: &mut ModelContext<Self>,
77 ) -> Self {
78 let language_registry = project.read(cx).languages().clone();
79 let fs = project.read(cx).fs().clone();
80 let (status_tx, mut status_rx) = channel::unbounded();
81 let mut this = ProjectIndex {
82 db_connection,
83 project: project.downgrade(),
84 worktree_indices: HashMap::default(),
85 language_registry,
86 fs,
87 status_tx,
88 last_status: Status::Idle,
89 embedding_provider,
90 _subscription: cx.subscribe(&project, Self::handle_project_event),
91 _maintain_status: cx.spawn(|this, mut cx| async move {
92 while status_rx.next().await.is_some() {
93 if this
94 .update(&mut cx, |this, cx| this.update_status(cx))
95 .is_err()
96 {
97 break;
98 }
99 }
100 }),
101 };
102 this.update_worktree_indices(cx);
103 this
104 }
105
106 pub fn status(&self) -> Status {
107 self.last_status
108 }
109
110 pub fn project(&self) -> WeakModel<Project> {
111 self.project.clone()
112 }
113
114 pub fn fs(&self) -> Arc<dyn Fs> {
115 self.fs.clone()
116 }
117
118 fn handle_project_event(
119 &mut self,
120 _: Model<Project>,
121 event: &project::Event,
122 cx: &mut ModelContext<Self>,
123 ) {
124 match event {
125 project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
126 self.update_worktree_indices(cx);
127 }
128 _ => {}
129 }
130 }
131
132 fn update_worktree_indices(&mut self, cx: &mut ModelContext<Self>) {
133 let Some(project) = self.project.upgrade() else {
134 return;
135 };
136
137 let worktrees = project
138 .read(cx)
139 .visible_worktrees(cx)
140 .filter_map(|worktree| {
141 if worktree.read(cx).is_local() {
142 Some((worktree.entity_id(), worktree))
143 } else {
144 None
145 }
146 })
147 .collect::<HashMap<_, _>>();
148
149 self.worktree_indices
150 .retain(|worktree_id, _| worktrees.contains_key(worktree_id));
151 for (worktree_id, worktree) in worktrees {
152 self.worktree_indices.entry(worktree_id).or_insert_with(|| {
153 let worktree_index = WorktreeIndex::load(
154 worktree.clone(),
155 self.db_connection.clone(),
156 self.language_registry.clone(),
157 self.fs.clone(),
158 self.status_tx.clone(),
159 self.embedding_provider.clone(),
160 cx,
161 );
162
163 let load_worktree = cx.spawn(|this, mut cx| async move {
164 let result = match worktree_index.await {
165 Ok(worktree_index) => {
166 this.update(&mut cx, |this, _| {
167 this.worktree_indices.insert(
168 worktree_id,
169 WorktreeIndexHandle::Loaded {
170 index: worktree_index.clone(),
171 },
172 );
173 })?;
174 Ok(worktree_index)
175 }
176 Err(error) => {
177 this.update(&mut cx, |this, _cx| {
178 this.worktree_indices.remove(&worktree_id)
179 })?;
180 Err(Arc::new(error))
181 }
182 };
183
184 this.update(&mut cx, |this, cx| this.update_status(cx))?;
185
186 result
187 });
188
189 WorktreeIndexHandle::Loading {
190 index: load_worktree.shared(),
191 }
192 });
193 }
194
195 self.update_status(cx);
196 }
197
198 fn update_status(&mut self, cx: &mut ModelContext<Self>) {
199 let mut indexing_count = 0;
200 let mut any_loading = false;
201
202 for index in self.worktree_indices.values_mut() {
203 match index {
204 WorktreeIndexHandle::Loading { .. } => {
205 any_loading = true;
206 break;
207 }
208 WorktreeIndexHandle::Loaded { index, .. } => {
209 indexing_count += index.read(cx).entry_ids_being_indexed().len();
210 }
211 }
212 }
213
214 let status = if any_loading {
215 Status::Loading
216 } else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
217 Status::Scanning { remaining_count }
218 } else {
219 Status::Idle
220 };
221
222 if status != self.last_status {
223 self.last_status = status;
224 cx.emit(status);
225 }
226 }
227
228 pub fn search(
229 &self,
230 query: String,
231 limit: usize,
232 cx: &AppContext,
233 ) -> Task<Result<Vec<SearchResult>>> {
234 let (chunks_tx, chunks_rx) = channel::bounded(1024);
235 let mut worktree_scan_tasks = Vec::new();
236 for worktree_index in self.worktree_indices.values() {
237 let worktree_index = worktree_index.clone();
238 let chunks_tx = chunks_tx.clone();
239 worktree_scan_tasks.push(cx.spawn(|cx| async move {
240 let index = match worktree_index {
241 WorktreeIndexHandle::Loading { index } => {
242 index.clone().await.map_err(|error| anyhow!(error))?
243 }
244 WorktreeIndexHandle::Loaded { index } => index.clone(),
245 };
246
247 index
248 .read_with(&cx, |index, cx| {
249 let worktree_id = index.worktree().read(cx).id();
250 let db_connection = index.db_connection().clone();
251 let db = *index.embedding_index().db();
252 cx.background_executor().spawn(async move {
253 let txn = db_connection
254 .read_txn()
255 .context("failed to create read transaction")?;
256 let db_entries = db.iter(&txn).context("failed to iterate database")?;
257 for db_entry in db_entries {
258 let (_key, db_embedded_file) = db_entry?;
259 for chunk in db_embedded_file.chunks {
260 chunks_tx
261 .send((worktree_id, db_embedded_file.path.clone(), chunk))
262 .await?;
263 }
264 }
265 anyhow::Ok(())
266 })
267 })?
268 .await
269 }));
270 }
271 drop(chunks_tx);
272
273 let project = self.project.clone();
274 let embedding_provider = self.embedding_provider.clone();
275 cx.spawn(|cx| async move {
276 #[cfg(debug_assertions)]
277 let embedding_query_start = std::time::Instant::now();
278 log::info!("Searching for {query}");
279
280 let query_embeddings = embedding_provider
281 .embed(&[TextToEmbed::new(&query)])
282 .await?;
283 let query_embedding = query_embeddings
284 .into_iter()
285 .next()
286 .ok_or_else(|| anyhow!("no embedding for query"))?;
287
288 let mut results_by_worker = Vec::new();
289 for _ in 0..cx.background_executor().num_cpus() {
290 results_by_worker.push(Vec::<WorktreeSearchResult>::new());
291 }
292
293 #[cfg(debug_assertions)]
294 let search_start = std::time::Instant::now();
295
296 cx.background_executor()
297 .scoped(|cx| {
298 for results in results_by_worker.iter_mut() {
299 cx.spawn(async {
300 while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
301 let score = chunk.embedding.similarity(&query_embedding);
302 let ix = match results.binary_search_by(|probe| {
303 score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
304 }) {
305 Ok(ix) | Err(ix) => ix,
306 };
307 results.insert(
308 ix,
309 WorktreeSearchResult {
310 worktree_id,
311 path: path.clone(),
312 range: chunk.chunk.range.clone(),
313 score,
314 },
315 );
316 results.truncate(limit);
317 }
318 });
319 }
320 })
321 .await;
322
323 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
324 scan_task.log_err();
325 }
326
327 project.read_with(&cx, |project, cx| {
328 let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
329 for worker_results in results_by_worker {
330 search_results.extend(worker_results.into_iter().filter_map(|result| {
331 Some(SearchResult {
332 worktree: project.worktree_for_id(result.worktree_id, cx)?,
333 path: result.path,
334 range: result.range,
335 score: result.score,
336 })
337 }));
338 }
339 search_results.sort_unstable_by(|a, b| {
340 b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
341 });
342 search_results.truncate(limit);
343
344 #[cfg(debug_assertions)]
345 {
346 let search_elapsed = search_start.elapsed();
347 log::debug!(
348 "searched {} entries in {:?}",
349 search_results.len(),
350 search_elapsed
351 );
352 let embedding_query_elapsed = embedding_query_start.elapsed();
353 log::debug!("embedding query took {:?}", embedding_query_elapsed);
354 }
355
356 search_results
357 })
358 })
359 }
360
361 #[cfg(test)]
362 pub fn path_count(&self, cx: &AppContext) -> Result<u64> {
363 let mut result = 0;
364 for worktree_index in self.worktree_indices.values() {
365 if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
366 result += index.read(cx).path_count()?;
367 }
368 }
369 Ok(result)
370 }
371
372 pub(crate) fn worktree_index(
373 &self,
374 worktree_id: WorktreeId,
375 cx: &AppContext,
376 ) -> Option<Model<WorktreeIndex>> {
377 for index in self.worktree_indices.values() {
378 if let WorktreeIndexHandle::Loaded { index, .. } = index {
379 if index.read(cx).worktree().read(cx).id() == worktree_id {
380 return Some(index.clone());
381 }
382 }
383 }
384 None
385 }
386
387 pub(crate) fn worktree_indices(&self, cx: &AppContext) -> Vec<Model<WorktreeIndex>> {
388 let mut result = self
389 .worktree_indices
390 .values()
391 .filter_map(|index| {
392 if let WorktreeIndexHandle::Loaded { index, .. } = index {
393 Some(index.clone())
394 } else {
395 None
396 }
397 })
398 .collect::<Vec<_>>();
399 result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
400 result
401 }
402
403 pub fn all_summaries(&self, cx: &AppContext) -> Task<Result<Vec<FileSummary>>> {
404 let (summaries_tx, summaries_rx) = channel::bounded(1024);
405 let mut worktree_scan_tasks = Vec::new();
406 for worktree_index in self.worktree_indices.values() {
407 let worktree_index = worktree_index.clone();
408 let summaries_tx: channel::Sender<(String, String)> = summaries_tx.clone();
409 worktree_scan_tasks.push(cx.spawn(|cx| async move {
410 let index = match worktree_index {
411 WorktreeIndexHandle::Loading { index } => {
412 index.clone().await.map_err(|error| anyhow!(error))?
413 }
414 WorktreeIndexHandle::Loaded { index } => index.clone(),
415 };
416
417 index
418 .read_with(&cx, |index, cx| {
419 let db_connection = index.db_connection().clone();
420 let summary_index = index.summary_index();
421 let file_digest_db = summary_index.file_digest_db();
422 let summary_db = summary_index.summary_db();
423
424 cx.background_executor().spawn(async move {
425 let txn = db_connection
426 .read_txn()
427 .context("failed to create db read transaction")?;
428 let db_entries = file_digest_db
429 .iter(&txn)
430 .context("failed to iterate database")?;
431 for db_entry in db_entries {
432 let (file_path, db_file) = db_entry?;
433
434 match summary_db.get(&txn, &db_file.digest) {
435 Ok(opt_summary) => {
436 // Currently, we only use summaries we already have. If the file hasn't been
437 // summarized yet, then we skip it and don't include it in the inferred context.
438 // If we want to do just-in-time summarization, this would be the place to do it!
439 if let Some(summary) = opt_summary {
440 summaries_tx
441 .send((file_path.to_string(), summary.to_string()))
442 .await?;
443 } else {
444 log::warn!("No summary found for {:?}", &db_file);
445 }
446 }
447 Err(err) => {
448 log::error!(
449 "Error reading from summary database: {:?}",
450 err
451 );
452 }
453 }
454 }
455 anyhow::Ok(())
456 })
457 })?
458 .await
459 }));
460 }
461 drop(summaries_tx);
462
463 let project = self.project.clone();
464 cx.spawn(|cx| async move {
465 let mut results_by_worker = Vec::new();
466 for _ in 0..cx.background_executor().num_cpus() {
467 results_by_worker.push(Vec::<FileSummary>::new());
468 }
469
470 cx.background_executor()
471 .scoped(|cx| {
472 for results in results_by_worker.iter_mut() {
473 cx.spawn(async {
474 while let Ok((filename, summary)) = summaries_rx.recv().await {
475 results.push(FileSummary { filename, summary });
476 }
477 });
478 }
479 })
480 .await;
481
482 for scan_task in futures::future::join_all(worktree_scan_tasks).await {
483 scan_task.log_err();
484 }
485
486 project.read_with(&cx, |_project, _cx| {
487 results_by_worker.into_iter().flatten().collect()
488 })
489 })
490 }
491
492 /// Empty out the backlogs of all the worktrees in the project
493 pub fn flush_summary_backlogs(&self, cx: &AppContext) -> impl Future<Output = ()> {
494 let flush_start = std::time::Instant::now();
495
496 futures::future::join_all(self.worktree_indices.values().map(|worktree_index| {
497 let worktree_index = worktree_index.clone();
498
499 cx.spawn(|cx| async move {
500 let index = match worktree_index {
501 WorktreeIndexHandle::Loading { index } => {
502 index.clone().await.map_err(|error| anyhow!(error))?
503 }
504 WorktreeIndexHandle::Loaded { index } => index.clone(),
505 };
506 let worktree_abs_path =
507 cx.update(|cx| index.read(cx).worktree().read(cx).abs_path())?;
508
509 index
510 .read_with(&cx, |index, cx| {
511 cx.background_executor()
512 .spawn(index.summary_index().flush_backlog(worktree_abs_path, cx))
513 })?
514 .await
515 })
516 }))
517 .map(move |results| {
518 // Log any errors, but don't block the user. These summaries are supposed to
519 // improve quality by providing extra context, but they aren't hard requirements!
520 for result in results {
521 if let Err(err) = result {
522 log::error!("Error flushing summary backlog: {:?}", err);
523 }
524 }
525
526 log::info!("Summary backlog flushed in {:?}", flush_start.elapsed());
527 })
528 }
529
530 pub fn remaining_summaries(&self, cx: &mut ModelContext<Self>) -> usize {
531 self.worktree_indices(cx)
532 .iter()
533 .map(|index| index.read(cx).summary_index().backlog_len())
534 .sum()
535 }
536}
537
538impl EventEmitter<Status> for ProjectIndex {}