1mod db;
2mod embedding;
3mod embedding_queue;
4mod parsing;
5pub mod semantic_index_settings;
6
7#[cfg(test)]
8mod semantic_index_tests;
9
10use crate::semantic_index_settings::SemanticIndexSettings;
11use anyhow::{anyhow, Result};
12use collections::{BTreeMap, HashMap, HashSet};
13use db::VectorDatabase;
14use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
15use embedding_queue::{EmbeddingQueue, FileToEmbed};
16use futures::{future, FutureExt, StreamExt};
17use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
18use language::{Anchor, Bias, Buffer, Language, LanguageRegistry};
19use parking_lot::Mutex;
20use parsing::{CodeContextRetriever, SpanDigest, PARSEABLE_ENTIRE_FILE_TYPES};
21use postage::watch;
22use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Worktree, WorktreeId};
23use smol::channel;
24use std::{
25 cmp::Ordering,
26 future::Future,
27 ops::Range,
28 path::{Path, PathBuf},
29 sync::{Arc, Weak},
30 time::{Duration, Instant, SystemTime},
31};
32use util::{
33 channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
34 http::HttpClient,
35 paths::EMBEDDINGS_DIR,
36 ResultExt,
37};
38use workspace::WorkspaceCreated;
39
40const SEMANTIC_INDEX_VERSION: usize = 10;
41const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60);
42const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
43
44pub fn init(
45 fs: Arc<dyn Fs>,
46 http_client: Arc<dyn HttpClient>,
47 language_registry: Arc<LanguageRegistry>,
48 cx: &mut AppContext,
49) {
50 settings::register::<SemanticIndexSettings>(cx);
51
52 let db_file_path = EMBEDDINGS_DIR
53 .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
54 .join("embeddings_db");
55
56 // This needs to be removed at some point before stable.
57 if *RELEASE_CHANNEL == ReleaseChannel::Stable {
58 return;
59 }
60
61 cx.subscribe_global::<WorkspaceCreated, _>({
62 move |event, cx| {
63 let Some(semantic_index) = SemanticIndex::global(cx) else {
64 return;
65 };
66 let workspace = &event.0;
67 if let Some(workspace) = workspace.upgrade(cx) {
68 let project = workspace.read(cx).project().clone();
69 if project.read(cx).is_local() {
70 cx.spawn(|mut cx| async move {
71 let previously_indexed = semantic_index
72 .update(&mut cx, |index, cx| {
73 index.project_previously_indexed(&project, cx)
74 })
75 .await?;
76 if previously_indexed {
77 semantic_index
78 .update(&mut cx, |index, cx| index.index_project(project, cx))
79 .await?;
80 }
81 anyhow::Ok(())
82 })
83 .detach_and_log_err(cx);
84 }
85 }
86 }
87 })
88 .detach();
89
90 cx.spawn(move |mut cx| async move {
91 let semantic_index = SemanticIndex::new(
92 fs,
93 db_file_path,
94 Arc::new(OpenAIEmbeddings::new(http_client, cx.background())),
95 language_registry,
96 cx.clone(),
97 )
98 .await?;
99
100 cx.update(|cx| {
101 cx.set_global(semantic_index.clone());
102 });
103
104 anyhow::Ok(())
105 })
106 .detach();
107}
108
109#[derive(Copy, Clone, Debug)]
110pub enum SemanticIndexStatus {
111 NotIndexed,
112 Indexed,
113 Indexing {
114 remaining_files: usize,
115 rate_limit_expiry: Option<Instant>,
116 },
117}
118
119pub struct SemanticIndex {
120 fs: Arc<dyn Fs>,
121 db: VectorDatabase,
122 embedding_provider: Arc<dyn EmbeddingProvider>,
123 language_registry: Arc<LanguageRegistry>,
124 parsing_files_tx: channel::Sender<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>,
125 _embedding_task: Task<()>,
126 _parsing_files_tasks: Vec<Task<()>>,
127 projects: HashMap<WeakModelHandle<Project>, ProjectState>,
128}
129
130struct ProjectState {
131 worktrees: HashMap<WorktreeId, WorktreeState>,
132 pending_file_count_rx: watch::Receiver<usize>,
133 pending_file_count_tx: Arc<Mutex<watch::Sender<usize>>>,
134 pending_index: usize,
135 _subscription: gpui::Subscription,
136 _observe_pending_file_count: Task<()>,
137}
138
139enum WorktreeState {
140 Registering(RegisteringWorktreeState),
141 Registered(RegisteredWorktreeState),
142}
143
144impl WorktreeState {
145 fn is_registered(&self) -> bool {
146 matches!(self, Self::Registered(_))
147 }
148
149 fn paths_changed(
150 &mut self,
151 changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
152 worktree: &Worktree,
153 ) {
154 let changed_paths = match self {
155 Self::Registering(state) => &mut state.changed_paths,
156 Self::Registered(state) => &mut state.changed_paths,
157 };
158
159 for (path, entry_id, change) in changes.iter() {
160 let Some(entry) = worktree.entry_for_id(*entry_id) else {
161 continue;
162 };
163 if entry.is_ignored || entry.is_symlink || entry.is_external || entry.is_dir() {
164 continue;
165 }
166 changed_paths.insert(
167 path.clone(),
168 ChangedPathInfo {
169 mtime: entry.mtime,
170 is_deleted: *change == PathChange::Removed,
171 },
172 );
173 }
174 }
175}
176
177struct RegisteringWorktreeState {
178 changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
179 done_rx: watch::Receiver<Option<()>>,
180 _registration: Task<()>,
181}
182
183impl RegisteringWorktreeState {
184 fn done(&self) -> impl Future<Output = ()> {
185 let mut done_rx = self.done_rx.clone();
186 async move {
187 while let Some(result) = done_rx.next().await {
188 if result.is_some() {
189 break;
190 }
191 }
192 }
193 }
194}
195
196struct RegisteredWorktreeState {
197 db_id: i64,
198 changed_paths: BTreeMap<Arc<Path>, ChangedPathInfo>,
199}
200
201struct ChangedPathInfo {
202 mtime: SystemTime,
203 is_deleted: bool,
204}
205
206#[derive(Clone)]
207pub struct JobHandle {
208 /// The outer Arc is here to count the clones of a JobHandle instance;
209 /// when the last handle to a given job is dropped, we decrement a counter (just once).
210 tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
211}
212
213impl JobHandle {
214 fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
215 *tx.lock().borrow_mut() += 1;
216 Self {
217 tx: Arc::new(Arc::downgrade(&tx)),
218 }
219 }
220}
221
222impl ProjectState {
223 fn new(subscription: gpui::Subscription, cx: &mut ModelContext<SemanticIndex>) -> Self {
224 let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0);
225 let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx));
226 Self {
227 worktrees: Default::default(),
228 pending_file_count_rx: pending_file_count_rx.clone(),
229 pending_file_count_tx,
230 pending_index: 0,
231 _subscription: subscription,
232 _observe_pending_file_count: cx.spawn_weak({
233 let mut pending_file_count_rx = pending_file_count_rx.clone();
234 |this, mut cx| async move {
235 while let Some(_) = pending_file_count_rx.next().await {
236 if let Some(this) = this.upgrade(&cx) {
237 this.update(&mut cx, |_, cx| cx.notify());
238 }
239 }
240 }
241 }),
242 }
243 }
244
245 fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
246 self.worktrees
247 .iter()
248 .find_map(|(worktree_id, worktree_state)| match worktree_state {
249 WorktreeState::Registered(state) if state.db_id == id => Some(*worktree_id),
250 _ => None,
251 })
252 }
253}
254
255#[derive(Clone)]
256pub struct PendingFile {
257 worktree_db_id: i64,
258 relative_path: Arc<Path>,
259 absolute_path: PathBuf,
260 language: Option<Arc<Language>>,
261 modified_time: SystemTime,
262 job_handle: JobHandle,
263}
264
265pub struct SearchResult {
266 pub buffer: ModelHandle<Buffer>,
267 pub range: Range<Anchor>,
268}
269
270impl SemanticIndex {
271 pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
272 if cx.has_global::<ModelHandle<Self>>() {
273 Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
274 } else {
275 None
276 }
277 }
278
279 pub fn enabled(cx: &AppContext) -> bool {
280 settings::get::<SemanticIndexSettings>(cx).enabled
281 && *RELEASE_CHANNEL != ReleaseChannel::Stable
282 }
283
284 pub fn status(&self, project: &ModelHandle<Project>) -> SemanticIndexStatus {
285 if let Some(project_state) = self.projects.get(&project.downgrade()) {
286 if project_state
287 .worktrees
288 .values()
289 .all(|worktree| worktree.is_registered())
290 && project_state.pending_index == 0
291 {
292 SemanticIndexStatus::Indexed
293 } else {
294 SemanticIndexStatus::Indexing {
295 remaining_files: project_state.pending_file_count_rx.borrow().clone(),
296 rate_limit_expiry: self.embedding_provider.rate_limit_expiration(),
297 }
298 }
299 } else {
300 SemanticIndexStatus::NotIndexed
301 }
302 }
303
304 async fn new(
305 fs: Arc<dyn Fs>,
306 database_path: PathBuf,
307 embedding_provider: Arc<dyn EmbeddingProvider>,
308 language_registry: Arc<LanguageRegistry>,
309 mut cx: AsyncAppContext,
310 ) -> Result<ModelHandle<Self>> {
311 let t0 = Instant::now();
312 let database_path = Arc::from(database_path);
313 let db = VectorDatabase::new(fs.clone(), database_path, cx.background()).await?;
314
315 log::trace!(
316 "db initialization took {:?} milliseconds",
317 t0.elapsed().as_millis()
318 );
319
320 Ok(cx.add_model(|cx| {
321 let t0 = Instant::now();
322 let embedding_queue =
323 EmbeddingQueue::new(embedding_provider.clone(), cx.background().clone());
324 let _embedding_task = cx.background().spawn({
325 let embedded_files = embedding_queue.finished_files();
326 let db = db.clone();
327 async move {
328 while let Ok(file) = embedded_files.recv().await {
329 db.insert_file(file.worktree_id, file.path, file.mtime, file.spans)
330 .await
331 .log_err();
332 }
333 }
334 });
335
336 // Parse files into embeddable spans.
337 let (parsing_files_tx, parsing_files_rx) =
338 channel::unbounded::<(Arc<HashMap<SpanDigest, Embedding>>, PendingFile)>();
339 let embedding_queue = Arc::new(Mutex::new(embedding_queue));
340 let mut _parsing_files_tasks = Vec::new();
341 for _ in 0..cx.background().num_cpus() {
342 let fs = fs.clone();
343 let mut parsing_files_rx = parsing_files_rx.clone();
344 let embedding_provider = embedding_provider.clone();
345 let embedding_queue = embedding_queue.clone();
346 let background = cx.background().clone();
347 _parsing_files_tasks.push(cx.background().spawn(async move {
348 let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
349 loop {
350 let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
351 let mut next_file_to_parse = parsing_files_rx.next().fuse();
352 futures::select_biased! {
353 next_file_to_parse = next_file_to_parse => {
354 if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
355 Self::parse_file(
356 &fs,
357 pending_file,
358 &mut retriever,
359 &embedding_queue,
360 &embeddings_for_digest,
361 )
362 .await
363 } else {
364 break;
365 }
366 },
367 _ = timer => {
368 embedding_queue.lock().flush();
369 }
370 }
371 }
372 }));
373 }
374
375 log::trace!(
376 "semantic index task initialization took {:?} milliseconds",
377 t0.elapsed().as_millis()
378 );
379 Self {
380 fs,
381 db,
382 embedding_provider,
383 language_registry,
384 parsing_files_tx,
385 _embedding_task,
386 _parsing_files_tasks,
387 projects: Default::default(),
388 }
389 }))
390 }
391
392 async fn parse_file(
393 fs: &Arc<dyn Fs>,
394 pending_file: PendingFile,
395 retriever: &mut CodeContextRetriever,
396 embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
397 embeddings_for_digest: &HashMap<SpanDigest, Embedding>,
398 ) {
399 let Some(language) = pending_file.language else {
400 return;
401 };
402
403 if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
404 if let Some(mut spans) = retriever
405 .parse_file_with_template(&pending_file.relative_path, &content, language)
406 .log_err()
407 {
408 log::trace!(
409 "parsed path {:?}: {} spans",
410 pending_file.relative_path,
411 spans.len()
412 );
413
414 for span in &mut spans {
415 if let Some(embedding) = embeddings_for_digest.get(&span.digest) {
416 span.embedding = Some(embedding.to_owned());
417 }
418 }
419
420 embedding_queue.lock().push(FileToEmbed {
421 worktree_id: pending_file.worktree_db_id,
422 path: pending_file.relative_path,
423 mtime: pending_file.modified_time,
424 job_handle: pending_file.job_handle,
425 spans: spans,
426 });
427 }
428 }
429 }
430
431 pub fn project_previously_indexed(
432 &mut self,
433 project: &ModelHandle<Project>,
434 cx: &mut ModelContext<Self>,
435 ) -> Task<Result<bool>> {
436 let worktrees_indexed_previously = project
437 .read(cx)
438 .worktrees(cx)
439 .map(|worktree| {
440 self.db
441 .worktree_previously_indexed(&worktree.read(cx).abs_path())
442 })
443 .collect::<Vec<_>>();
444 cx.spawn(|_, _cx| async move {
445 let worktree_indexed_previously =
446 futures::future::join_all(worktrees_indexed_previously).await;
447
448 Ok(worktree_indexed_previously
449 .iter()
450 .filter(|worktree| worktree.is_ok())
451 .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
452 })
453 }
454
455 fn project_entries_changed(
456 &mut self,
457 project: ModelHandle<Project>,
458 worktree_id: WorktreeId,
459 changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
460 cx: &mut ModelContext<Self>,
461 ) {
462 let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
463 return;
464 };
465 let project = project.downgrade();
466 let Some(project_state) = self.projects.get_mut(&project) else {
467 return;
468 };
469
470 let worktree = worktree.read(cx);
471 let worktree_state =
472 if let Some(worktree_state) = project_state.worktrees.get_mut(&worktree_id) {
473 worktree_state
474 } else {
475 return;
476 };
477 worktree_state.paths_changed(changes, worktree);
478 if let WorktreeState::Registered(_) = worktree_state {
479 cx.spawn_weak(|this, mut cx| async move {
480 cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
481 if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
482 this.update(&mut cx, |this, cx| {
483 this.index_project(project, cx).detach_and_log_err(cx)
484 });
485 }
486 })
487 .detach();
488 }
489 }
490
491 fn register_worktree(
492 &mut self,
493 project: ModelHandle<Project>,
494 worktree: ModelHandle<Worktree>,
495 cx: &mut ModelContext<Self>,
496 ) {
497 let project = project.downgrade();
498 let project_state = if let Some(project_state) = self.projects.get_mut(&project) {
499 project_state
500 } else {
501 return;
502 };
503 let worktree = if let Some(worktree) = worktree.read(cx).as_local() {
504 worktree
505 } else {
506 return;
507 };
508 let worktree_abs_path = worktree.abs_path().clone();
509 let scan_complete = worktree.scan_complete();
510 let worktree_id = worktree.id();
511 let db = self.db.clone();
512 let language_registry = self.language_registry.clone();
513 let (mut done_tx, done_rx) = watch::channel();
514 let registration = cx.spawn(|this, mut cx| {
515 async move {
516 let register = async {
517 scan_complete.await;
518 let db_id = db.find_or_create_worktree(worktree_abs_path).await?;
519 let mut file_mtimes = db.get_file_mtimes(db_id).await?;
520 let worktree = if let Some(project) = project.upgrade(&cx) {
521 project
522 .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx))
523 .ok_or_else(|| anyhow!("worktree not found"))?
524 } else {
525 return anyhow::Ok(());
526 };
527 let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot());
528 let mut changed_paths = cx
529 .background()
530 .spawn(async move {
531 let mut changed_paths = BTreeMap::new();
532 for file in worktree.files(false, 0) {
533 let absolute_path = worktree.absolutize(&file.path);
534
535 if file.is_external || file.is_ignored || file.is_symlink {
536 continue;
537 }
538
539 if let Ok(language) = language_registry
540 .language_for_file(&absolute_path, None)
541 .await
542 {
543 // Test if file is valid parseable file
544 if !PARSEABLE_ENTIRE_FILE_TYPES
545 .contains(&language.name().as_ref())
546 && &language.name().as_ref() != &"Markdown"
547 && language
548 .grammar()
549 .and_then(|grammar| grammar.embedding_config.as_ref())
550 .is_none()
551 {
552 continue;
553 }
554
555 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
556 let already_stored = stored_mtime
557 .map_or(false, |existing_mtime| {
558 existing_mtime == file.mtime
559 });
560
561 if !already_stored {
562 changed_paths.insert(
563 file.path.clone(),
564 ChangedPathInfo {
565 mtime: file.mtime,
566 is_deleted: false,
567 },
568 );
569 }
570 }
571 }
572
573 // Clean up entries from database that are no longer in the worktree.
574 for (path, mtime) in file_mtimes {
575 changed_paths.insert(
576 path.into(),
577 ChangedPathInfo {
578 mtime,
579 is_deleted: true,
580 },
581 );
582 }
583
584 anyhow::Ok(changed_paths)
585 })
586 .await?;
587 this.update(&mut cx, |this, cx| {
588 let project_state = this
589 .projects
590 .get_mut(&project)
591 .ok_or_else(|| anyhow!("project not registered"))?;
592 let project = project
593 .upgrade(cx)
594 .ok_or_else(|| anyhow!("project was dropped"))?;
595
596 if let Some(WorktreeState::Registering(state)) =
597 project_state.worktrees.remove(&worktree_id)
598 {
599 changed_paths.extend(state.changed_paths);
600 }
601 project_state.worktrees.insert(
602 worktree_id,
603 WorktreeState::Registered(RegisteredWorktreeState {
604 db_id,
605 changed_paths,
606 }),
607 );
608 this.index_project(project, cx).detach_and_log_err(cx);
609
610 anyhow::Ok(())
611 })?;
612
613 anyhow::Ok(())
614 };
615
616 if register.await.log_err().is_none() {
617 // Stop tracking this worktree if the registration failed.
618 this.update(&mut cx, |this, _| {
619 this.projects.get_mut(&project).map(|project_state| {
620 project_state.worktrees.remove(&worktree_id);
621 });
622 })
623 }
624
625 *done_tx.borrow_mut() = Some(());
626 }
627 });
628 project_state.worktrees.insert(
629 worktree_id,
630 WorktreeState::Registering(RegisteringWorktreeState {
631 changed_paths: Default::default(),
632 done_rx,
633 _registration: registration,
634 }),
635 );
636 }
637
638 fn project_worktrees_changed(
639 &mut self,
640 project: ModelHandle<Project>,
641 cx: &mut ModelContext<Self>,
642 ) {
643 let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade())
644 {
645 project_state
646 } else {
647 return;
648 };
649
650 let mut worktrees = project
651 .read(cx)
652 .worktrees(cx)
653 .filter(|worktree| worktree.read(cx).is_local())
654 .collect::<Vec<_>>();
655 let worktree_ids = worktrees
656 .iter()
657 .map(|worktree| worktree.read(cx).id())
658 .collect::<HashSet<_>>();
659
660 // Remove worktrees that are no longer present
661 project_state
662 .worktrees
663 .retain(|worktree_id, _| worktree_ids.contains(worktree_id));
664
665 // Register new worktrees
666 worktrees.retain(|worktree| {
667 let worktree_id = worktree.read(cx).id();
668 !project_state.worktrees.contains_key(&worktree_id)
669 });
670 for worktree in worktrees {
671 self.register_worktree(project.clone(), worktree, cx);
672 }
673 }
674
675 pub fn pending_file_count(
676 &self,
677 project: &ModelHandle<Project>,
678 ) -> Option<watch::Receiver<usize>> {
679 Some(
680 self.projects
681 .get(&project.downgrade())?
682 .pending_file_count_rx
683 .clone(),
684 )
685 }
686
687 pub fn search_project(
688 &mut self,
689 project: ModelHandle<Project>,
690 phrase: String,
691 limit: usize,
692 includes: Vec<PathMatcher>,
693 excludes: Vec<PathMatcher>,
694 cx: &mut ModelContext<Self>,
695 ) -> Task<Result<Vec<SearchResult>>> {
696 let index = self.index_project(project.clone(), cx);
697 let embedding_provider = self.embedding_provider.clone();
698 let db_path = self.db.path().clone();
699 let fs = self.fs.clone();
700 cx.spawn(|this, mut cx| async move {
701 index.await?;
702
703 let t0 = Instant::now();
704 let database =
705 VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?;
706
707 if phrase.len() == 0 {
708 return Ok(Vec::new());
709 }
710
711 let phrase_embedding = embedding_provider
712 .embed_batch(vec![phrase])
713 .await?
714 .into_iter()
715 .next()
716 .unwrap();
717
718 log::trace!(
719 "Embedding search phrase took: {:?} milliseconds",
720 t0.elapsed().as_millis()
721 );
722
723 let worktree_db_ids = this.read_with(&cx, |this, _| {
724 let project_state = this
725 .projects
726 .get(&project.downgrade())
727 .ok_or_else(|| anyhow!("project was not indexed"))?;
728 let worktree_db_ids = project_state
729 .worktrees
730 .values()
731 .filter_map(|worktree| {
732 if let WorktreeState::Registered(worktree) = worktree {
733 Some(worktree.db_id)
734 } else {
735 None
736 }
737 })
738 .collect::<Vec<i64>>();
739 anyhow::Ok(worktree_db_ids)
740 })?;
741 let file_ids = database
742 .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)
743 .await?;
744
745 let batch_n = cx.background().num_cpus();
746 let ids_len = file_ids.clone().len();
747 let batch_size = if ids_len <= batch_n {
748 ids_len
749 } else {
750 ids_len / batch_n
751 };
752
753 let mut batch_results = Vec::new();
754 for batch in file_ids.chunks(batch_size) {
755 let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
756 let limit = limit.clone();
757 let fs = fs.clone();
758 let db_path = db_path.clone();
759 let phrase_embedding = phrase_embedding.clone();
760 if let Some(db) = VectorDatabase::new(fs, db_path.clone(), cx.background())
761 .await
762 .log_err()
763 {
764 batch_results.push(async move {
765 db.top_k_search(&phrase_embedding, limit, batch.as_slice())
766 .await
767 });
768 }
769 }
770 let batch_results = futures::future::join_all(batch_results).await;
771
772 let mut results = Vec::new();
773 for batch_result in batch_results {
774 if batch_result.is_ok() {
775 for (id, similarity) in batch_result.unwrap() {
776 let ix = match results.binary_search_by(|(_, s)| {
777 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
778 }) {
779 Ok(ix) => ix,
780 Err(ix) => ix,
781 };
782 results.insert(ix, (id, similarity));
783 results.truncate(limit);
784 }
785 }
786 }
787
788 let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
789 let spans = database.spans_for_ids(ids.as_slice()).await?;
790
791 let mut tasks = Vec::new();
792 let mut ranges = Vec::new();
793 let weak_project = project.downgrade();
794 project.update(&mut cx, |project, cx| {
795 for (worktree_db_id, file_path, byte_range) in spans {
796 let project_state =
797 if let Some(state) = this.read(cx).projects.get(&weak_project) {
798 state
799 } else {
800 return Err(anyhow!("project not added"));
801 };
802 if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
803 tasks.push(project.open_buffer((worktree_id, file_path), cx));
804 ranges.push(byte_range);
805 }
806 }
807
808 Ok(())
809 })?;
810
811 let buffers = futures::future::join_all(tasks).await;
812
813 log::trace!(
814 "Semantic Searching took: {:?} milliseconds in total",
815 t0.elapsed().as_millis()
816 );
817
818 Ok(buffers
819 .into_iter()
820 .zip(ranges)
821 .filter_map(|(buffer, range)| {
822 let buffer = buffer.log_err()?;
823 let range = buffer.read_with(&cx, |buffer, _| {
824 let start = buffer.clip_offset(range.start, Bias::Left);
825 let end = buffer.clip_offset(range.end, Bias::Right);
826 buffer.anchor_before(start)..buffer.anchor_after(end)
827 });
828 Some(SearchResult { buffer, range })
829 })
830 .collect::<Vec<_>>())
831 })
832 }
833
834 pub fn index_project(
835 &mut self,
836 project: ModelHandle<Project>,
837 cx: &mut ModelContext<Self>,
838 ) -> Task<Result<()>> {
839 if !self.projects.contains_key(&project.downgrade()) {
840 log::trace!("Registering Project for Semantic Index");
841
842 let subscription = cx.subscribe(&project, |this, project, event, cx| match event {
843 project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
844 this.project_worktrees_changed(project.clone(), cx);
845 }
846 project::Event::WorktreeUpdatedEntries(worktree_id, changes) => {
847 this.project_entries_changed(project, *worktree_id, changes.clone(), cx);
848 }
849 _ => {}
850 });
851 let project_state = ProjectState::new(subscription, cx);
852 self.projects.insert(project.downgrade(), project_state);
853 self.project_worktrees_changed(project.clone(), cx);
854 }
855 let project_state = self.projects.get_mut(&project.downgrade()).unwrap();
856 project_state.pending_index += 1;
857 cx.notify();
858
859 let mut pending_file_count_rx = project_state.pending_file_count_rx.clone();
860 let db = self.db.clone();
861 let language_registry = self.language_registry.clone();
862 let parsing_files_tx = self.parsing_files_tx.clone();
863 let worktree_registration = self.wait_for_worktree_registration(&project, cx);
864
865 cx.spawn(|this, mut cx| async move {
866 worktree_registration.await?;
867
868 let mut pending_files = Vec::new();
869 let mut files_to_delete = Vec::new();
870 this.update(&mut cx, |this, cx| {
871 let project_state = this
872 .projects
873 .get_mut(&project.downgrade())
874 .ok_or_else(|| anyhow!("project was dropped"))?;
875 let pending_file_count_tx = &project_state.pending_file_count_tx;
876
877 project_state
878 .worktrees
879 .retain(|worktree_id, worktree_state| {
880 let worktree = if let Some(worktree) =
881 project.read(cx).worktree_for_id(*worktree_id, cx)
882 {
883 worktree
884 } else {
885 return false;
886 };
887 let worktree_state =
888 if let WorktreeState::Registered(worktree_state) = worktree_state {
889 worktree_state
890 } else {
891 return true;
892 };
893
894 worktree_state.changed_paths.retain(|path, info| {
895 if info.is_deleted {
896 files_to_delete.push((worktree_state.db_id, path.clone()));
897 } else {
898 let absolute_path = worktree.read(cx).absolutize(path);
899 let job_handle = JobHandle::new(pending_file_count_tx);
900 pending_files.push(PendingFile {
901 absolute_path,
902 relative_path: path.clone(),
903 language: None,
904 job_handle,
905 modified_time: info.mtime,
906 worktree_db_id: worktree_state.db_id,
907 });
908 }
909
910 false
911 });
912 true
913 });
914
915 anyhow::Ok(())
916 })?;
917
918 cx.background()
919 .spawn(async move {
920 for (worktree_db_id, path) in files_to_delete {
921 db.delete_file(worktree_db_id, path).await.log_err();
922 }
923
924 let embeddings_for_digest = {
925 let mut files = HashMap::default();
926 for pending_file in &pending_files {
927 files
928 .entry(pending_file.worktree_db_id)
929 .or_insert(Vec::new())
930 .push(pending_file.relative_path.clone());
931 }
932 Arc::new(
933 db.embeddings_for_files(files)
934 .await
935 .log_err()
936 .unwrap_or_default(),
937 )
938 };
939
940 for mut pending_file in pending_files {
941 if let Ok(language) = language_registry
942 .language_for_file(&pending_file.relative_path, None)
943 .await
944 {
945 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
946 && &language.name().as_ref() != &"Markdown"
947 && language
948 .grammar()
949 .and_then(|grammar| grammar.embedding_config.as_ref())
950 .is_none()
951 {
952 continue;
953 }
954 pending_file.language = Some(language);
955 }
956 parsing_files_tx
957 .try_send((embeddings_for_digest.clone(), pending_file))
958 .ok();
959 }
960
961 // Wait until we're done indexing.
962 while let Some(count) = pending_file_count_rx.next().await {
963 if count == 0 {
964 break;
965 }
966 }
967 })
968 .await;
969
970 this.update(&mut cx, |this, cx| {
971 let project_state = this
972 .projects
973 .get_mut(&project.downgrade())
974 .ok_or_else(|| anyhow!("project was dropped"))?;
975 project_state.pending_index -= 1;
976 cx.notify();
977 anyhow::Ok(())
978 })?;
979
980 Ok(())
981 })
982 }
983
984 fn wait_for_worktree_registration(
985 &self,
986 project: &ModelHandle<Project>,
987 cx: &mut ModelContext<Self>,
988 ) -> Task<Result<()>> {
989 let project = project.downgrade();
990 cx.spawn_weak(|this, cx| async move {
991 loop {
992 let mut pending_worktrees = Vec::new();
993 this.upgrade(&cx)
994 .ok_or_else(|| anyhow!("semantic index dropped"))?
995 .read_with(&cx, |this, _| {
996 if let Some(project) = this.projects.get(&project) {
997 for worktree in project.worktrees.values() {
998 if let WorktreeState::Registering(worktree) = worktree {
999 pending_worktrees.push(worktree.done());
1000 }
1001 }
1002 }
1003 });
1004
1005 if pending_worktrees.is_empty() {
1006 break;
1007 } else {
1008 future::join_all(pending_worktrees).await;
1009 }
1010 }
1011 Ok(())
1012 })
1013 }
1014}
1015
1016impl Entity for SemanticIndex {
1017 type Event = ();
1018}
1019
1020impl Drop for JobHandle {
1021 fn drop(&mut self) {
1022 if let Some(inner) = Arc::get_mut(&mut self.tx) {
1023 // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
1024 if let Some(tx) = inner.upgrade() {
1025 let mut tx = tx.lock();
1026 *tx.borrow_mut() -= 1;
1027 }
1028 }
1029 }
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034
1035 use super::*;
1036 #[test]
1037 fn test_job_handle() {
1038 let (job_count_tx, job_count_rx) = watch::channel_with(0);
1039 let tx = Arc::new(Mutex::new(job_count_tx));
1040 let job_handle = JobHandle::new(&tx);
1041
1042 assert_eq!(1, *job_count_rx.borrow());
1043 let new_job_handle = job_handle.clone();
1044 assert_eq!(1, *job_count_rx.borrow());
1045 drop(job_handle);
1046 assert_eq!(1, *job_count_rx.borrow());
1047 drop(new_job_handle);
1048 assert_eq!(0, *job_count_rx.borrow());
1049 }
1050}