1mod db;
2mod embedding;
3mod embedding_queue;
4mod parsing;
5pub mod semantic_index_settings;
6
7#[cfg(test)]
8mod semantic_index_tests;
9
10use crate::semantic_index_settings::SemanticIndexSettings;
11use anyhow::{anyhow, Result};
12use collections::{BTreeMap, HashMap, HashSet};
13use db::VectorDatabase;
14use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
15use embedding_queue::{EmbeddingQueue, FileToEmbed};
16use futures::{future, FutureExt, StreamExt};
17use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
18use language::{Anchor, Bias, Buffer, Language, LanguageRegistry};
19use parking_lot::Mutex;
20use parsing::{CodeContextRetriever, SpanDigest, PARSEABLE_ENTIRE_FILE_TYPES};
21use postage::watch;
22use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Worktree, WorktreeId};
23use smol::channel;
24use std::{
25 cmp::Ordering,
26 future::Future,
27 ops::Range,
28 path::{Path, PathBuf},
29 sync::{Arc, Weak},
30 time::{Duration, Instant, SystemTime},
31};
32use util::{
33 channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
34 http::HttpClient,
35 paths::EMBEDDINGS_DIR,
36 ResultExt,
37};
38use workspace::WorkspaceCreated;
39
40const SEMANTIC_INDEX_VERSION: usize = 10;
41const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60);
42const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
43
44pub fn init(
45 fs: Arc<dyn Fs>,
46 http_client: Arc<dyn HttpClient>,
47 language_registry: Arc<LanguageRegistry>,
48 cx: &mut AppContext,
49) {
50 settings::register::<SemanticIndexSettings>(cx);
51
52 let db_file_path = EMBEDDINGS_DIR
53 .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
54 .join("embeddings_db");
55
56 // This needs to be removed at some point before stable.
57 if *RELEASE_CHANNEL == ReleaseChannel::Stable {
58 return;
59 }
60
61 cx.subscribe_global::<WorkspaceCreated, _>({
62 move |event, cx| {
63 let Some(semantic_index) = SemanticIndex::global(cx) else {
64 return;
65 };
66 let workspace = &event.0;
67 if let Some(workspace) = workspace.upgrade(cx) {
68 let project = workspace.read(cx).project().clone();
69 if project.read(cx).is_local() {
70 cx.spawn(|mut cx| async move {
71 let previously_indexed = semantic_index
72 .update(&mut cx, |index, cx| {
73 index.project_previously_indexed(&project, cx)
74 })
75 .await?;
76 if previously_indexed {
77 semantic_index
78 .update(&mut cx, |index, cx| index.index_project(project, cx))
79 .await?;
80 }
81 anyhow::Ok(())
82 })
83 .detach_and_log_err(cx);
84 }
85 }
86 }
87 })
88 .detach();
89
90 cx.spawn(move |mut cx| async move {
91 let semantic_index = SemanticIndex::new(
92 fs,
93 db_file_path,
94 Arc::new(OpenAIEmbeddings {
95 client: http_client,
96 executor: cx.background(),
97 }),
98 language_registry,
99 cx.clone(),
100 )
101 .await?;
102
103 cx.update(|cx| {
104 cx.set_global(semantic_index.clone());
105 });
106
107 anyhow::Ok(())
108 })
109 .detach();
110}
111
112#[derive(Copy, Clone, Debug)]
113pub enum SemanticIndexStatus {
114 NotIndexed,
115 Indexed,
116 Indexing { remaining_files: usize },
117}
118
119pub struct SemanticIndex {
120 fs: Arc<dyn Fs>,
121 db: VectorDatabase,
122 embedding_provider: Arc<dyn EmbeddingProvider>,
123 language_registry: Arc<LanguageRegistry>,
124 parsing_files_tx: channel::Sender<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>,
125 _embedding_task: Task<()>,
126 _parsing_files_tasks: Vec<Task<()>>,
127 projects: HashMap<WeakModelHandle<Project>, ProjectState>,
128}
129
130struct ProjectState {
131 worktrees: HashMap<WorktreeId, WorktreeState>,
132 pending_file_count_rx: watch::Receiver<usize>,
133 pending_file_count_tx: Arc<Mutex<watch::Sender<usize>>>,
134 pending_index: usize,
135 _subscription: gpui::Subscription,
136 _observe_pending_file_count: Task<()>,
137}
138
139enum WorktreeState {
140 Registering(RegisteringWorktreeState),
141 Registered(RegisteredWorktreeState),
142}
143
144impl WorktreeState {
145 fn is_registered(&self) -> bool {
146 matches!(self, Self::Registered(_))
147 }
148
149 fn paths_changed(
150 &mut self,
151 changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
152 worktree: &Worktree,
153 ) {
154 let changed_paths = match self {
155 Self::Registering(state) => &mut state.changed_paths,
156 Self::Registered(state) => &mut state.changed_paths,
157 };
158
159 for (path, entry_id, change) in changes.iter() {
160 let Some(entry) = worktree.entry_for_id(*entry_id) else {
161 continue;
162 };
163 if entry.is_ignored || entry.is_symlink || entry.is_external || entry.is_dir() {
164 continue;
165 }
166 changed_paths.insert(
167 path.clone(),
168 ChangedPathInfo {
169 mtime: entry.mtime,
170 is_deleted: *change == PathChange::Removed,
171 },
172 );
173 }
174 }
175}
176
177struct RegisteringWorktreeState {
178 changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
179 done_rx: watch::Receiver<Option<()>>,
180 _registration: Task<()>,
181}
182
183impl RegisteringWorktreeState {
184 fn done(&self) -> impl Future<Output = ()> {
185 let mut done_rx = self.done_rx.clone();
186 async move {
187 while let Some(result) = done_rx.next().await {
188 if result.is_some() {
189 break;
190 }
191 }
192 }
193 }
194}
195
196struct RegisteredWorktreeState {
197 db_id: i64,
198 changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
199}
200
201struct ChangedPathInfo {
202 mtime: SystemTime,
203 is_deleted: bool,
204}
205
206#[derive(Clone)]
207pub struct JobHandle {
208 /// The outer Arc is here to count the clones of a JobHandle instance;
209 /// when the last handle to a given job is dropped, we decrement a counter (just once).
210 tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
211}
212
213impl JobHandle {
214 fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
215 *tx.lock().borrow_mut() += 1;
216 Self {
217 tx: Arc::new(Arc::downgrade(&tx)),
218 }
219 }
220}
221
222impl ProjectState {
223 fn new(subscription: gpui::Subscription, cx: &mut ModelContext<SemanticIndex>) -> Self {
224 let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0);
225 let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx));
226 Self {
227 worktrees: Default::default(),
228 pending_file_count_rx: pending_file_count_rx.clone(),
229 pending_file_count_tx,
230 pending_index: 0,
231 _subscription: subscription,
232 _observe_pending_file_count: cx.spawn_weak({
233 let mut pending_file_count_rx = pending_file_count_rx.clone();
234 |this, mut cx| async move {
235 while let Some(_) = pending_file_count_rx.next().await {
236 if let Some(this) = this.upgrade(&cx) {
237 this.update(&mut cx, |_, cx| cx.notify());
238 }
239 }
240 }
241 }),
242 }
243 }
244
245 fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
246 self.worktrees
247 .iter()
248 .find_map(|(worktree_id, worktree_state)| match worktree_state {
249 WorktreeState::Registered(state) if state.db_id == id => Some(*worktree_id),
250 _ => None,
251 })
252 }
253}
254
255#[derive(Clone)]
256pub struct PendingFile {
257 worktree_db_id: i64,
258 relative_path: Arc<Path>,
259 absolute_path: PathBuf,
260 language: Option<Arc<Language>>,
261 modified_time: SystemTime,
262 job_handle: JobHandle,
263}
264
265pub struct SearchResult {
266 pub buffer: ModelHandle<Buffer>,
267 pub range: Range<Anchor>,
268}
269
270impl SemanticIndex {
271 pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
272 if cx.has_global::<ModelHandle<Self>>() {
273 Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
274 } else {
275 None
276 }
277 }
278
279 pub fn enabled(cx: &AppContext) -> bool {
280 settings::get::<SemanticIndexSettings>(cx).enabled
281 && *RELEASE_CHANNEL != ReleaseChannel::Stable
282 }
283
284 pub fn status(&self, project: &ModelHandle<Project>) -> SemanticIndexStatus {
285 if let Some(project_state) = self.projects.get(&project.downgrade()) {
286 if project_state
287 .worktrees
288 .values()
289 .all(|worktree| worktree.is_registered())
290 && project_state.pending_index == 0
291 {
292 SemanticIndexStatus::Indexed
293 } else {
294 SemanticIndexStatus::Indexing {
295 remaining_files: project_state.pending_file_count_rx.borrow().clone(),
296 }
297 }
298 } else {
299 SemanticIndexStatus::NotIndexed
300 }
301 }
302
303 async fn new(
304 fs: Arc<dyn Fs>,
305 database_path: PathBuf,
306 embedding_provider: Arc<dyn EmbeddingProvider>,
307 language_registry: Arc<LanguageRegistry>,
308 mut cx: AsyncAppContext,
309 ) -> Result<ModelHandle<Self>> {
310 let t0 = Instant::now();
311 let database_path = Arc::from(database_path);
312 let db = VectorDatabase::new(fs.clone(), database_path, cx.background()).await?;
313
314 log::trace!(
315 "db initialization took {:?} milliseconds",
316 t0.elapsed().as_millis()
317 );
318
319 Ok(cx.add_model(|cx| {
320 let t0 = Instant::now();
321 let embedding_queue =
322 EmbeddingQueue::new(embedding_provider.clone(), cx.background().clone());
323 let _embedding_task = cx.background().spawn({
324 let embedded_files = embedding_queue.finished_files();
325 let db = db.clone();
326 async move {
327 while let Ok(file) = embedded_files.recv().await {
328 db.insert_file(file.worktree_id, file.path, file.mtime, file.spans)
329 .await
330 .log_err();
331 }
332 }
333 });
334
335 // Parse files into embeddable spans.
336 let (parsing_files_tx, parsing_files_rx) =
337 channel::unbounded::<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>();
338 let embedding_queue = Arc::new(Mutex::new(embedding_queue));
339 let mut _parsing_files_tasks = Vec::new();
340 for _ in 0..cx.background().num_cpus() {
341 let fs = fs.clone();
342 let mut parsing_files_rx = parsing_files_rx.clone();
343 let embedding_provider = embedding_provider.clone();
344 let embedding_queue = embedding_queue.clone();
345 let background = cx.background().clone();
346 _parsing_files_tasks.push(cx.background().spawn(async move {
347 let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
348 loop {
349 let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
350 let mut next_file_to_parse = parsing_files_rx.next().fuse();
351 futures::select_biased! {
352 next_file_to_parse = next_file_to_parse => {
353 if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
354 Self::parse_file(
355 &fs,
356 pending_file,
357 &mut retriever,
358 &embedding_queue,
359 &embeddings_for_digest,
360 )
361 .await
362 } else {
363 break;
364 }
365 },
366 _ = timer => {
367 embedding_queue.lock().flush();
368 }
369 }
370 }
371 }));
372 }
373
374 log::trace!(
375 "semantic index task initialization took {:?} milliseconds",
376 t0.elapsed().as_millis()
377 );
378 Self {
379 fs,
380 db,
381 embedding_provider,
382 language_registry,
383 parsing_files_tx,
384 _embedding_task,
385 _parsing_files_tasks,
386 projects: Default::default(),
387 }
388 }))
389 }
390
391 async fn parse_file(
392 fs: &Arc<dyn Fs>,
393 pending_file: PendingFile,
394 retriever: &mut CodeContextRetriever,
395 embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
396 embeddings_for_digest: &HashMap<SpanDigest, Embedding>,
397 ) {
398 let Some(language) = pending_file.language else {
399 return;
400 };
401
402 if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
403 if let Some(mut spans) = retriever
404 .parse_file_with_template(&pending_file.relative_path, &content, language)
405 .log_err()
406 {
407 log::trace!(
408 "parsed path {:?}: {} spans",
409 pending_file.relative_path,
410 spans.len()
411 );
412
413 for span in &mut spans {
414 if let Some(embedding) = embeddings_for_digest.get(&span.digest) {
415 span.embedding = Some(embedding.to_owned());
416 }
417 }
418
419 embedding_queue.lock().push(FileToEmbed {
420 worktree_id: pending_file.worktree_db_id,
421 path: pending_file.relative_path,
422 mtime: pending_file.modified_time,
423 job_handle: pending_file.job_handle,
424 spans: spans,
425 });
426 }
427 }
428 }
429
430 pub fn project_previously_indexed(
431 &mut self,
432 project: &ModelHandle<Project>,
433 cx: &mut ModelContext<Self>,
434 ) -> Task<Result<bool>> {
435 let worktrees_indexed_previously = project
436 .read(cx)
437 .worktrees(cx)
438 .map(|worktree| {
439 self.db
440 .worktree_previously_indexed(&worktree.read(cx).abs_path())
441 })
442 .collect::<Vec<_>>();
443 cx.spawn(|_, _cx| async move {
444 let worktree_indexed_previously =
445 futures::future::join_all(worktrees_indexed_previously).await;
446
447 Ok(worktree_indexed_previously
448 .iter()
449 .filter(|worktree| worktree.is_ok())
450 .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
451 })
452 }
453
454 fn project_entries_changed(
455 &mut self,
456 project: ModelHandle<Project>,
457 worktree_id: WorktreeId,
458 changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
459 cx: &mut ModelContext<Self>,
460 ) {
461 let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
462 return;
463 };
464 let project = project.downgrade();
465 let Some(project_state) = self.projects.get_mut(&project) else {
466 return;
467 };
468
469 let worktree = worktree.read(cx);
470 let worktree_state =
471 if let Some(worktree_state) = project_state.worktrees.get_mut(&worktree_id) {
472 worktree_state
473 } else {
474 return;
475 };
476 worktree_state.paths_changed(changes, worktree);
477 if let WorktreeState::Registered(_) = worktree_state {
478 cx.spawn_weak(|this, mut cx| async move {
479 cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
480 if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
481 this.update(&mut cx, |this, cx| {
482 this.index_project(project, cx).detach_and_log_err(cx)
483 });
484 }
485 })
486 .detach();
487 }
488 }
489
490 fn register_worktree(
491 &mut self,
492 project: ModelHandle<Project>,
493 worktree: ModelHandle<Worktree>,
494 cx: &mut ModelContext<Self>,
495 ) {
496 let project = project.downgrade();
497 let project_state = if let Some(project_state) = self.projects.get_mut(&project) {
498 project_state
499 } else {
500 return;
501 };
502 let worktree = if let Some(worktree) = worktree.read(cx).as_local() {
503 worktree
504 } else {
505 return;
506 };
507 let worktree_abs_path = worktree.abs_path().clone();
508 let scan_complete = worktree.scan_complete();
509 let worktree_id = worktree.id();
510 let db = self.db.clone();
511 let language_registry = self.language_registry.clone();
512 let (mut done_tx, done_rx) = watch::channel();
513 let registration = cx.spawn(|this, mut cx| {
514 async move {
515 let register = async {
516 scan_complete.await;
517 let db_id = db.find_or_create_worktree(worktree_abs_path).await?;
518 let mut file_mtimes = db.get_file_mtimes(db_id).await?;
519 let worktree = if let Some(project) = project.upgrade(&cx) {
520 project
521 .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx))
522 .ok_or_else(|| anyhow!("worktree not found"))?
523 } else {
524 return anyhow::Ok(());
525 };
526 let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot());
527 let mut changed_paths = cx
528 .background()
529 .spawn(async move {
530 let mut changed_paths = BTreeMap::new();
531 for file in worktree.files(false, 0) {
532 let absolute_path = worktree.absolutize(&file.path);
533
534 if file.is_external || file.is_ignored || file.is_symlink {
535 continue;
536 }
537
538 if let Ok(language) = language_registry
539 .language_for_file(&absolute_path, None)
540 .await
541 {
542 // Test if file is valid parseable file
543 if !PARSEABLE_ENTIRE_FILE_TYPES
544 .contains(&language.name().as_ref())
545 && &language.name().as_ref() != &"Markdown"
546 && language
547 .grammar()
548 .and_then(|grammar| grammar.embedding_config.as_ref())
549 .is_none()
550 {
551 continue;
552 }
553
554 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
555 let already_stored = stored_mtime
556 .map_or(false, |existing_mtime| {
557 existing_mtime == file.mtime
558 });
559
560 if !already_stored {
561 changed_paths.insert(
562 file.path.clone(),
563 ChangedPathInfo {
564 mtime: file.mtime,
565 is_deleted: false,
566 },
567 );
568 }
569 }
570 }
571
572 // Clean up entries from database that are no longer in the worktree.
573 for (path, mtime) in file_mtimes {
574 changed_paths.insert(
575 path.into(),
576 ChangedPathInfo {
577 mtime,
578 is_deleted: true,
579 },
580 );
581 }
582
583 anyhow::Ok(changed_paths)
584 })
585 .await?;
586 this.update(&mut cx, |this, cx| {
587 let project_state = this
588 .projects
589 .get_mut(&project)
590 .ok_or_else(|| anyhow!("project not registered"))?;
591 let project = project
592 .upgrade(cx)
593 .ok_or_else(|| anyhow!("project was dropped"))?;
594
595 if let Some(WorktreeState::Registering(state)) =
596 project_state.worktrees.remove(&worktree_id)
597 {
598 changed_paths.extend(state.changed_paths);
599 }
600 project_state.worktrees.insert(
601 worktree_id,
602 WorktreeState::Registered(RegisteredWorktreeState {
603 db_id,
604 changed_paths,
605 }),
606 );
607 this.index_project(project, cx).detach_and_log_err(cx);
608
609 anyhow::Ok(())
610 })?;
611
612 anyhow::Ok(())
613 };
614
615 if register.await.log_err().is_none() {
616 // Stop tracking this worktree if the registration failed.
617 this.update(&mut cx, |this, _| {
618 this.projects.get_mut(&project).map(|project_state| {
619 project_state.worktrees.remove(&worktree_id);
620 });
621 })
622 }
623
624 *done_tx.borrow_mut() = Some(());
625 }
626 });
627 project_state.worktrees.insert(
628 worktree_id,
629 WorktreeState::Registering(RegisteringWorktreeState {
630 changed_paths: Default::default(),
631 done_rx,
632 _registration: registration,
633 }),
634 );
635 }
636
637 fn project_worktrees_changed(
638 &mut self,
639 project: ModelHandle<Project>,
640 cx: &mut ModelContext<Self>,
641 ) {
642 let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade())
643 {
644 project_state
645 } else {
646 return;
647 };
648
649 let mut worktrees = project
650 .read(cx)
651 .worktrees(cx)
652 .filter(|worktree| worktree.read(cx).is_local())
653 .collect::<Vec<_>>();
654 let worktree_ids = worktrees
655 .iter()
656 .map(|worktree| worktree.read(cx).id())
657 .collect::<HashSet<_>>();
658
659 // Remove worktrees that are no longer present
660 project_state
661 .worktrees
662 .retain(|worktree_id, _| worktree_ids.contains(worktree_id));
663
664 // Register new worktrees
665 worktrees.retain(|worktree| {
666 let worktree_id = worktree.read(cx).id();
667 !project_state.worktrees.contains_key(&worktree_id)
668 });
669 for worktree in worktrees {
670 self.register_worktree(project.clone(), worktree, cx);
671 }
672 }
673
674 pub fn pending_file_count(
675 &self,
676 project: &ModelHandle<Project>,
677 ) -> Option<watch::Receiver<usize>> {
678 Some(
679 self.projects
680 .get(&project.downgrade())?
681 .pending_file_count_rx
682 .clone(),
683 )
684 }
685
686 pub fn search_project(
687 &mut self,
688 project: ModelHandle<Project>,
689 phrase: String,
690 limit: usize,
691 includes: Vec<PathMatcher>,
692 excludes: Vec<PathMatcher>,
693 cx: &mut ModelContext<Self>,
694 ) -> Task<Result<Vec<SearchResult>>> {
695 let index = self.index_project(project.clone(), cx);
696 let embedding_provider = self.embedding_provider.clone();
697 let db_path = self.db.path().clone();
698 let fs = self.fs.clone();
699 cx.spawn(|this, mut cx| async move {
700 index.await?;
701
702 let t0 = Instant::now();
703 let database =
704 VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?;
705
706 if phrase.len() == 0 {
707 return Ok(Vec::new());
708 }
709
710 let phrase_embedding = embedding_provider
711 .embed_batch(vec![phrase])
712 .await?
713 .into_iter()
714 .next()
715 .unwrap();
716
717 log::trace!(
718 "Embedding search phrase took: {:?} milliseconds",
719 t0.elapsed().as_millis()
720 );
721
722 let worktree_db_ids = this.read_with(&cx, |this, _| {
723 let project_state = this
724 .projects
725 .get(&project.downgrade())
726 .ok_or_else(|| anyhow!("project was not indexed"))?;
727 let worktree_db_ids = project_state
728 .worktrees
729 .values()
730 .filter_map(|worktree| {
731 if let WorktreeState::Registered(worktree) = worktree {
732 Some(worktree.db_id)
733 } else {
734 None
735 }
736 })
737 .collect::<Vec<i64>>();
738 anyhow::Ok(worktree_db_ids)
739 })?;
740 let file_ids = database
741 .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)
742 .await?;
743
744 let batch_n = cx.background().num_cpus();
745 let ids_len = file_ids.clone().len();
746 let batch_size = if ids_len <= batch_n {
747 ids_len
748 } else {
749 ids_len / batch_n
750 };
751
752 let mut batch_results = Vec::new();
753 for batch in file_ids.chunks(batch_size) {
754 let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
755 let limit = limit.clone();
756 let fs = fs.clone();
757 let db_path = db_path.clone();
758 let phrase_embedding = phrase_embedding.clone();
759 if let Some(db) = VectorDatabase::new(fs, db_path.clone(), cx.background())
760 .await
761 .log_err()
762 {
763 batch_results.push(async move {
764 db.top_k_search(&phrase_embedding, limit, batch.as_slice())
765 .await
766 });
767 }
768 }
769 let batch_results = futures::future::join_all(batch_results).await;
770
771 let mut results = Vec::new();
772 for batch_result in batch_results {
773 if batch_result.is_ok() {
774 for (id, similarity) in batch_result.unwrap() {
775 let ix = match results.binary_search_by(|(_, s)| {
776 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
777 }) {
778 Ok(ix) => ix,
779 Err(ix) => ix,
780 };
781 results.insert(ix, (id, similarity));
782 results.truncate(limit);
783 }
784 }
785 }
786
787 let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
788 let spans = database.spans_for_ids(ids.as_slice()).await?;
789
790 let mut tasks = Vec::new();
791 let mut ranges = Vec::new();
792 let weak_project = project.downgrade();
793 project.update(&mut cx, |project, cx| {
794 for (worktree_db_id, file_path, byte_range) in spans {
795 let project_state =
796 if let Some(state) = this.read(cx).projects.get(&weak_project) {
797 state
798 } else {
799 return Err(anyhow!("project not added"));
800 };
801 if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
802 tasks.push(project.open_buffer((worktree_id, file_path), cx));
803 ranges.push(byte_range);
804 }
805 }
806
807 Ok(())
808 })?;
809
810 let buffers = futures::future::join_all(tasks).await;
811
812 log::trace!(
813 "Semantic Searching took: {:?} milliseconds in total",
814 t0.elapsed().as_millis()
815 );
816
817 Ok(buffers
818 .into_iter()
819 .zip(ranges)
820 .filter_map(|(buffer, range)| {
821 let buffer = buffer.log_err()?;
822 let range = buffer.read_with(&cx, |buffer, _| {
823 let start = buffer.clip_offset(range.start, Bias::Left);
824 let end = buffer.clip_offset(range.end, Bias::Right);
825 buffer.anchor_before(start)..buffer.anchor_after(end)
826 });
827 Some(SearchResult { buffer, range })
828 })
829 .collect::<Vec<_>>())
830 })
831 }
832
833 pub fn index_project(
834 &mut self,
835 project: ModelHandle<Project>,
836 cx: &mut ModelContext<Self>,
837 ) -> Task<Result<()>> {
838 if !self.projects.contains_key(&project.downgrade()) {
839 log::trace!("Registering Project for Semantic Index");
840
841 let subscription = cx.subscribe(&project, |this, project, event, cx| match event {
842 project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
843 this.project_worktrees_changed(project.clone(), cx);
844 }
845 project::Event::WorktreeUpdatedEntries(worktree_id, changes) => {
846 this.project_entries_changed(project, *worktree_id, changes.clone(), cx);
847 }
848 _ => {}
849 });
850 let project_state = ProjectState::new(subscription, cx);
851 self.projects.insert(project.downgrade(), project_state);
852 self.project_worktrees_changed(project.clone(), cx);
853 }
854 let project_state = self.projects.get_mut(&project.downgrade()).unwrap();
855 project_state.pending_index += 1;
856 cx.notify();
857
858 let mut pending_file_count_rx = project_state.pending_file_count_rx.clone();
859 let db = self.db.clone();
860 let language_registry = self.language_registry.clone();
861 let parsing_files_tx = self.parsing_files_tx.clone();
862 let worktree_registration = self.wait_for_worktree_registration(&project, cx);
863
864 cx.spawn(|this, mut cx| async move {
865 worktree_registration.await?;
866
867 let mut pending_files = Vec::new();
868 let mut files_to_delete = Vec::new();
869 this.update(&mut cx, |this, cx| {
870 let project_state = this
871 .projects
872 .get_mut(&project.downgrade())
873 .ok_or_else(|| anyhow!("project was dropped"))?;
874 let pending_file_count_tx = &project_state.pending_file_count_tx;
875
876 project_state
877 .worktrees
878 .retain(|worktree_id, worktree_state| {
879 let worktree = if let Some(worktree) =
880 project.read(cx).worktree_for_id(*worktree_id, cx)
881 {
882 worktree
883 } else {
884 return false;
885 };
886 let worktree_state =
887 if let WorktreeState::Registered(worktree_state) = worktree_state {
888 worktree_state
889 } else {
890 return true;
891 };
892
893 worktree_state.changed_paths.retain(|path, info| {
894 if info.is_deleted {
895 files_to_delete.push((worktree_state.db_id, path.clone()));
896 } else {
897 let absolute_path = worktree.read(cx).absolutize(path);
898 let job_handle = JobHandle::new(pending_file_count_tx);
899 pending_files.push(PendingFile {
900 absolute_path,
901 relative_path: path.clone(),
902 language: None,
903 job_handle,
904 modified_time: info.mtime,
905 worktree_db_id: worktree_state.db_id,
906 });
907 }
908
909 false
910 });
911 true
912 });
913
914 anyhow::Ok(())
915 })?;
916
917 cx.background()
918 .spawn(async move {
919 for (worktree_db_id, path) in files_to_delete {
920 db.delete_file(worktree_db_id, path).await.log_err();
921 }
922
923 let embeddings_for_digest = {
924 let mut files = HashMap::default();
925 for pending_file in &pending_files {
926 files
927 .entry(pending_file.worktree_db_id)
928 .or_insert(Vec::new())
929 .push(pending_file.relative_path.clone());
930 }
931 Arc::new(
932 db.embeddings_for_files(files)
933 .await
934 .log_err()
935 .unwrap_or_default(),
936 )
937 };
938
939 for mut pending_file in pending_files {
940 if let Ok(language) = language_registry
941 .language_for_file(&pending_file.relative_path, None)
942 .await
943 {
944 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
945 && &language.name().as_ref() != &"Markdown"
946 && language
947 .grammar()
948 .and_then(|grammar| grammar.embedding_config.as_ref())
949 .is_none()
950 {
951 continue;
952 }
953 pending_file.language = Some(language);
954 }
955 parsing_files_tx
956 .try_send((embeddings_for_digest.clone(), pending_file))
957 .ok();
958 }
959
960 // Wait until we're done indexing.
961 while let Some(count) = pending_file_count_rx.next().await {
962 if count == 0 {
963 break;
964 }
965 }
966 })
967 .await;
968
969 this.update(&mut cx, |this, cx| {
970 let project_state = this
971 .projects
972 .get_mut(&project.downgrade())
973 .ok_or_else(|| anyhow!("project was dropped"))?;
974 project_state.pending_index -= 1;
975 cx.notify();
976 anyhow::Ok(())
977 })?;
978
979 Ok(())
980 })
981 }
982
983 fn wait_for_worktree_registration(
984 &self,
985 project: &ModelHandle<Project>,
986 cx: &mut ModelContext<Self>,
987 ) -> Task<Result<()>> {
988 let project = project.downgrade();
989 cx.spawn_weak(|this, cx| async move {
990 loop {
991 let mut pending_worktrees = Vec::new();
992 this.upgrade(&cx)
993 .ok_or_else(|| anyhow!("semantic index dropped"))?
994 .read_with(&cx, |this, _| {
995 if let Some(project) = this.projects.get(&project) {
996 for worktree in project.worktrees.values() {
997 if let WorktreeState::Registering(worktree) = worktree {
998 pending_worktrees.push(worktree.done());
999 }
1000 }
1001 }
1002 });
1003
1004 if pending_worktrees.is_empty() {
1005 break;
1006 } else {
1007 future::join_all(pending_worktrees).await;
1008 }
1009 }
1010 Ok(())
1011 })
1012 }
1013}
1014
1015impl Entity for SemanticIndex {
1016 type Event = ();
1017}
1018
1019impl Drop for JobHandle {
1020 fn drop(&mut self) {
1021 if let Some(inner) = Arc::get_mut(&mut self.tx) {
1022 // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
1023 if let Some(tx) = inner.upgrade() {
1024 let mut tx = tx.lock();
1025 *tx.borrow_mut() -= 1;
1026 }
1027 }
1028 }
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033
1034 use super::*;
1035 #[test]
1036 fn test_job_handle() {
1037 let (job_count_tx, job_count_rx) = watch::channel_with(0);
1038 let tx = Arc::new(Mutex::new(job_count_tx));
1039 let job_handle = JobHandle::new(&tx);
1040
1041 assert_eq!(1, *job_count_rx.borrow());
1042 let new_job_handle = job_handle.clone();
1043 assert_eq!(1, *job_count_rx.borrow());
1044 drop(job_handle);
1045 assert_eq!(1, *job_count_rx.borrow());
1046 drop(new_job_handle);
1047 assert_eq!(0, *job_count_rx.borrow());
1048 }
1049}