1mod db;
2mod embedding;
3mod embedding_queue;
4mod parsing;
5pub mod semantic_index_settings;
6
7#[cfg(test)]
8mod semantic_index_tests;
9
10use crate::semantic_index_settings::SemanticIndexSettings;
11use anyhow::{anyhow, Result};
12use db::VectorDatabase;
13use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings};
14use embedding_queue::{EmbeddingQueue, FileToEmbed};
15use futures::{FutureExt, StreamExt};
16use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
17use language::{Anchor, Buffer, Language, LanguageRegistry};
18use parking_lot::Mutex;
19use parsing::{CodeContextRetriever, DocumentDigest, PARSEABLE_ENTIRE_FILE_TYPES};
20use postage::watch;
21use project::{
22 search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId,
23};
24use smol::channel;
25use std::{
26 cmp::Ordering,
27 collections::{BTreeMap, HashMap},
28 ops::Range,
29 path::{Path, PathBuf},
30 sync::{Arc, Weak},
31 time::{Duration, Instant, SystemTime},
32};
33use util::{
34 channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
35 http::HttpClient,
36 paths::EMBEDDINGS_DIR,
37 ResultExt,
38};
39use workspace::WorkspaceCreated;
40
41const SEMANTIC_INDEX_VERSION: usize = 9;
42const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60);
43const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
44
45pub fn init(
46 fs: Arc<dyn Fs>,
47 http_client: Arc<dyn HttpClient>,
48 language_registry: Arc<LanguageRegistry>,
49 cx: &mut AppContext,
50) {
51 settings::register::<SemanticIndexSettings>(cx);
52
53 let db_file_path = EMBEDDINGS_DIR
54 .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
55 .join("embeddings_db");
56
57 // This needs to be removed at some point before stable.
58 if *RELEASE_CHANNEL == ReleaseChannel::Stable {
59 return;
60 }
61
62 cx.subscribe_global::<WorkspaceCreated, _>({
63 move |event, cx| {
64 let Some(semantic_index) = SemanticIndex::global(cx) else {
65 return;
66 };
67 let workspace = &event.0;
68 if let Some(workspace) = workspace.upgrade(cx) {
69 let project = workspace.read(cx).project().clone();
70 if project.read(cx).is_local() {
71 semantic_index.update(cx, |index, cx| {
72 index.initialize_project(project, cx).detach_and_log_err(cx)
73 });
74 }
75 }
76 }
77 })
78 .detach();
79
80 cx.spawn(move |mut cx| async move {
81 let semantic_index = SemanticIndex::new(
82 fs,
83 db_file_path,
84 Arc::new(OpenAIEmbeddings {
85 client: http_client,
86 executor: cx.background(),
87 }),
88 language_registry,
89 cx.clone(),
90 )
91 .await?;
92
93 cx.update(|cx| {
94 cx.set_global(semantic_index.clone());
95 });
96
97 anyhow::Ok(())
98 })
99 .detach();
100}
101
102pub struct SemanticIndex {
103 fs: Arc<dyn Fs>,
104 db: VectorDatabase,
105 embedding_provider: Arc<dyn EmbeddingProvider>,
106 language_registry: Arc<LanguageRegistry>,
107 parsing_files_tx: channel::Sender<(Arc<HashMap<DocumentDigest, Embedding>>, PendingFile)>,
108 _embedding_task: Task<()>,
109 _parsing_files_tasks: Vec<Task<()>>,
110 projects: HashMap<WeakModelHandle<Project>, ProjectState>,
111}
112
113struct ProjectState {
114 worktree_db_ids: Vec<(WorktreeId, i64)>,
115 _subscription: gpui::Subscription,
116 outstanding_job_count_rx: watch::Receiver<usize>,
117 outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
118 changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
119}
120
121struct ChangedPathInfo {
122 changed_at: Instant,
123 mtime: SystemTime,
124 is_deleted: bool,
125}
126
127#[derive(Clone)]
128pub struct JobHandle {
129 /// The outer Arc is here to count the clones of a JobHandle instance;
130 /// when the last handle to a given job is dropped, we decrement a counter (just once).
131 tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
132}
133
134impl JobHandle {
135 fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
136 *tx.lock().borrow_mut() += 1;
137 Self {
138 tx: Arc::new(Arc::downgrade(&tx)),
139 }
140 }
141}
142
143impl ProjectState {
144 fn new(
145 subscription: gpui::Subscription,
146 worktree_db_ids: Vec<(WorktreeId, i64)>,
147 changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
148 ) -> Self {
149 let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0);
150 let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx));
151 Self {
152 worktree_db_ids,
153 outstanding_job_count_rx,
154 outstanding_job_count_tx,
155 changed_paths,
156 _subscription: subscription,
157 }
158 }
159
160 pub fn get_outstanding_count(&self) -> usize {
161 self.outstanding_job_count_rx.borrow().clone()
162 }
163
164 fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
165 self.worktree_db_ids
166 .iter()
167 .find_map(|(worktree_id, db_id)| {
168 if *worktree_id == id {
169 Some(*db_id)
170 } else {
171 None
172 }
173 })
174 }
175
176 fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
177 self.worktree_db_ids
178 .iter()
179 .find_map(|(worktree_id, db_id)| {
180 if *db_id == id {
181 Some(*worktree_id)
182 } else {
183 None
184 }
185 })
186 }
187}
188
189#[derive(Clone)]
190pub struct PendingFile {
191 worktree_db_id: i64,
192 relative_path: PathBuf,
193 absolute_path: PathBuf,
194 language: Option<Arc<Language>>,
195 modified_time: SystemTime,
196 job_handle: JobHandle,
197}
198
199pub struct SearchResult {
200 pub buffer: ModelHandle<Buffer>,
201 pub range: Range<Anchor>,
202}
203
204impl SemanticIndex {
205 pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
206 if cx.has_global::<ModelHandle<Self>>() {
207 Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
208 } else {
209 None
210 }
211 }
212
213 pub fn enabled(cx: &AppContext) -> bool {
214 settings::get::<SemanticIndexSettings>(cx).enabled
215 && *RELEASE_CHANNEL != ReleaseChannel::Stable
216 }
217
218 async fn new(
219 fs: Arc<dyn Fs>,
220 database_path: PathBuf,
221 embedding_provider: Arc<dyn EmbeddingProvider>,
222 language_registry: Arc<LanguageRegistry>,
223 mut cx: AsyncAppContext,
224 ) -> Result<ModelHandle<Self>> {
225 let t0 = Instant::now();
226 let database_path = Arc::from(database_path);
227 let db = VectorDatabase::new(fs.clone(), database_path, cx.background()).await?;
228
229 log::trace!(
230 "db initialization took {:?} milliseconds",
231 t0.elapsed().as_millis()
232 );
233
234 Ok(cx.add_model(|cx| {
235 let t0 = Instant::now();
236 let embedding_queue =
237 EmbeddingQueue::new(embedding_provider.clone(), cx.background().clone());
238 let _embedding_task = cx.background().spawn({
239 let embedded_files = embedding_queue.finished_files();
240 let db = db.clone();
241 async move {
242 while let Ok(file) = embedded_files.recv().await {
243 db.insert_file(file.worktree_id, file.path, file.mtime, file.documents)
244 .await
245 .log_err();
246 }
247 }
248 });
249
250 // Parse files into embeddable documents.
251 let (parsing_files_tx, parsing_files_rx) =
252 channel::unbounded::<(Arc<HashMap<DocumentDigest, Embedding>>, PendingFile)>();
253 let embedding_queue = Arc::new(Mutex::new(embedding_queue));
254 let mut _parsing_files_tasks = Vec::new();
255 for _ in 0..cx.background().num_cpus() {
256 let fs = fs.clone();
257 let mut parsing_files_rx = parsing_files_rx.clone();
258 let embedding_provider = embedding_provider.clone();
259 let embedding_queue = embedding_queue.clone();
260 let background = cx.background().clone();
261 _parsing_files_tasks.push(cx.background().spawn(async move {
262 let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
263 loop {
264 let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse();
265 let mut next_file_to_parse = parsing_files_rx.next().fuse();
266 futures::select_biased! {
267 next_file_to_parse = next_file_to_parse => {
268 if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse {
269 Self::parse_file(
270 &fs,
271 pending_file,
272 &mut retriever,
273 &embedding_queue,
274 &embeddings_for_digest,
275 )
276 .await
277 } else {
278 break;
279 }
280 },
281 _ = timer => {
282 embedding_queue.lock().flush();
283 }
284 }
285 }
286 }));
287 }
288
289 log::trace!(
290 "semantic index task initialization took {:?} milliseconds",
291 t0.elapsed().as_millis()
292 );
293 Self {
294 fs,
295 db,
296 embedding_provider,
297 language_registry,
298 parsing_files_tx,
299 _embedding_task,
300 _parsing_files_tasks,
301 projects: HashMap::new(),
302 }
303 }))
304 }
305
306 async fn parse_file(
307 fs: &Arc<dyn Fs>,
308 pending_file: PendingFile,
309 retriever: &mut CodeContextRetriever,
310 embedding_queue: &Arc<Mutex<EmbeddingQueue>>,
311 embeddings_for_digest: &HashMap<DocumentDigest, Embedding>,
312 ) {
313 let Some(language) = pending_file.language else {
314 return;
315 };
316
317 if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
318 if let Some(mut documents) = retriever
319 .parse_file_with_template(&pending_file.relative_path, &content, language)
320 .log_err()
321 {
322 log::trace!(
323 "parsed path {:?}: {} documents",
324 pending_file.relative_path,
325 documents.len()
326 );
327
328 for document in documents.iter_mut() {
329 if let Some(embedding) = embeddings_for_digest.get(&document.digest) {
330 document.embedding = Some(embedding.to_owned());
331 }
332 }
333
334 embedding_queue.lock().push(FileToEmbed {
335 worktree_id: pending_file.worktree_db_id,
336 path: pending_file.relative_path,
337 mtime: pending_file.modified_time,
338 job_handle: pending_file.job_handle,
339 documents,
340 });
341 }
342 }
343 }
344
345 pub fn project_previously_indexed(
346 &mut self,
347 project: ModelHandle<Project>,
348 cx: &mut ModelContext<Self>,
349 ) -> Task<Result<bool>> {
350 let worktrees_indexed_previously = project
351 .read(cx)
352 .worktrees(cx)
353 .map(|worktree| {
354 self.db
355 .worktree_previously_indexed(&worktree.read(cx).abs_path())
356 })
357 .collect::<Vec<_>>();
358 cx.spawn(|_, _cx| async move {
359 let worktree_indexed_previously =
360 futures::future::join_all(worktrees_indexed_previously).await;
361
362 Ok(worktree_indexed_previously
363 .iter()
364 .filter(|worktree| worktree.is_ok())
365 .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
366 })
367 }
368
369 fn project_entries_changed(
370 &mut self,
371 project: ModelHandle<Project>,
372 changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
373 cx: &mut ModelContext<'_, SemanticIndex>,
374 worktree_id: &WorktreeId,
375 ) {
376 let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
377 return;
378 };
379 let project = project.downgrade();
380 let Some(project_state) = self.projects.get_mut(&project) else {
381 return;
382 };
383
384 let embeddings_for_digest = {
385 let mut worktree_id_file_paths = HashMap::new();
386 for (path, _) in &project_state.changed_paths {
387 if let Some(worktree_db_id) = project_state.db_id_for_worktree_id(path.worktree_id)
388 {
389 worktree_id_file_paths
390 .entry(worktree_db_id)
391 .or_insert(Vec::new())
392 .push(path.path.clone());
393 }
394 }
395 self.db.embeddings_for_files(worktree_id_file_paths)
396 };
397
398 let worktree = worktree.read(cx);
399 let change_time = Instant::now();
400 for (path, entry_id, change) in changes.iter() {
401 let Some(entry) = worktree.entry_for_id(*entry_id) else {
402 continue;
403 };
404 if entry.is_ignored || entry.is_symlink || entry.is_external {
405 continue;
406 }
407 let project_path = ProjectPath {
408 worktree_id: *worktree_id,
409 path: path.clone(),
410 };
411 project_state.changed_paths.insert(
412 project_path,
413 ChangedPathInfo {
414 changed_at: change_time,
415 mtime: entry.mtime,
416 is_deleted: *change == PathChange::Removed,
417 },
418 );
419 }
420
421 cx.spawn_weak(|this, mut cx| async move {
422 let embeddings_for_digest = embeddings_for_digest.await.log_err().unwrap_or_default();
423
424 cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
425 if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
426 Self::reindex_changed_paths(
427 this,
428 project,
429 Some(change_time),
430 &mut cx,
431 Arc::new(embeddings_for_digest),
432 )
433 .await;
434 }
435 })
436 .detach();
437 }
438
439 pub fn initialize_project(
440 &mut self,
441 project: ModelHandle<Project>,
442 cx: &mut ModelContext<Self>,
443 ) -> Task<Result<()>> {
444 log::trace!("Initializing Project for Semantic Index");
445 let worktree_scans_complete = project
446 .read(cx)
447 .worktrees(cx)
448 .map(|worktree| {
449 let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
450 async move {
451 scan_complete.await;
452 }
453 })
454 .collect::<Vec<_>>();
455
456 let worktree_db_ids = project
457 .read(cx)
458 .worktrees(cx)
459 .map(|worktree| {
460 self.db
461 .find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
462 })
463 .collect::<Vec<_>>();
464
465 let _subscription = cx.subscribe(&project, |this, project, event, cx| {
466 if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
467 this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
468 };
469 });
470
471 let language_registry = self.language_registry.clone();
472
473 cx.spawn(|this, mut cx| async move {
474 futures::future::join_all(worktree_scans_complete).await;
475
476 let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
477 let worktrees = project.read_with(&cx, |project, cx| {
478 project
479 .worktrees(cx)
480 .map(|worktree| worktree.read(cx).snapshot())
481 .collect::<Vec<_>>()
482 });
483
484 let mut worktree_file_mtimes = HashMap::new();
485 let mut db_ids_by_worktree_id = HashMap::new();
486
487 for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
488 let db_id = db_id?;
489 db_ids_by_worktree_id.insert(worktree.id(), db_id);
490 worktree_file_mtimes.insert(
491 worktree.id(),
492 this.read_with(&cx, |this, _| this.db.get_file_mtimes(db_id))
493 .await?,
494 );
495 }
496
497 let worktree_db_ids = db_ids_by_worktree_id
498 .iter()
499 .map(|(a, b)| (*a, *b))
500 .collect();
501
502 let changed_paths = cx
503 .background()
504 .spawn(async move {
505 let mut changed_paths = BTreeMap::new();
506 let now = Instant::now();
507 for worktree in worktrees.into_iter() {
508 let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
509 for file in worktree.files(false, 0) {
510 let absolute_path = worktree.absolutize(&file.path);
511
512 if file.is_external || file.is_ignored || file.is_symlink {
513 continue;
514 }
515
516 if let Ok(language) = language_registry
517 .language_for_file(&absolute_path, None)
518 .await
519 {
520 // Test if file is valid parseable file
521 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
522 && &language.name().as_ref() != &"Markdown"
523 && language
524 .grammar()
525 .and_then(|grammar| grammar.embedding_config.as_ref())
526 .is_none()
527 {
528 continue;
529 }
530
531 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
532 let already_stored = stored_mtime
533 .map_or(false, |existing_mtime| existing_mtime == file.mtime);
534
535 if !already_stored {
536 changed_paths.insert(
537 ProjectPath {
538 worktree_id: worktree.id(),
539 path: file.path.clone(),
540 },
541 ChangedPathInfo {
542 changed_at: now,
543 mtime: file.mtime,
544 is_deleted: false,
545 },
546 );
547 }
548 }
549 }
550
551 // Clean up entries from database that are no longer in the worktree.
552 for (path, mtime) in file_mtimes {
553 changed_paths.insert(
554 ProjectPath {
555 worktree_id: worktree.id(),
556 path: path.into(),
557 },
558 ChangedPathInfo {
559 changed_at: now,
560 mtime,
561 is_deleted: true,
562 },
563 );
564 }
565 }
566
567 anyhow::Ok(changed_paths)
568 })
569 .await?;
570
571 this.update(&mut cx, |this, _| {
572 this.projects.insert(
573 project.downgrade(),
574 ProjectState::new(_subscription, worktree_db_ids, changed_paths),
575 );
576 });
577 Result::<(), _>::Ok(())
578 })
579 }
580
581 pub fn index_project(
582 &mut self,
583 project: ModelHandle<Project>,
584 cx: &mut ModelContext<Self>,
585 ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
586 cx.spawn(|this, mut cx| async move {
587 let embeddings_for_digest = this.read_with(&cx, |this, _| {
588 if let Some(state) = this.projects.get(&project.downgrade()) {
589 let mut worktree_id_file_paths = HashMap::default();
590 for (path, _) in &state.changed_paths {
591 if let Some(worktree_db_id) = state.db_id_for_worktree_id(path.worktree_id)
592 {
593 worktree_id_file_paths
594 .entry(worktree_db_id)
595 .or_insert(Vec::new())
596 .push(path.path.clone());
597 }
598 }
599
600 Ok(this.db.embeddings_for_files(worktree_id_file_paths))
601 } else {
602 Err(anyhow!("Project not yet initialized"))
603 }
604 })?;
605
606 let embeddings_for_digest = Arc::new(embeddings_for_digest.await?);
607
608 Self::reindex_changed_paths(
609 this.clone(),
610 project.clone(),
611 None,
612 &mut cx,
613 embeddings_for_digest,
614 )
615 .await;
616
617 this.update(&mut cx, |this, _cx| {
618 let Some(state) = this.projects.get(&project.downgrade()) else {
619 return Err(anyhow!("Project not yet initialized"));
620 };
621 let job_count_rx = state.outstanding_job_count_rx.clone();
622 let count = state.get_outstanding_count();
623 Ok((count, job_count_rx))
624 })
625 })
626 }
627
628 pub fn outstanding_job_count_rx(
629 &self,
630 project: &ModelHandle<Project>,
631 ) -> Option<watch::Receiver<usize>> {
632 Some(
633 self.projects
634 .get(&project.downgrade())?
635 .outstanding_job_count_rx
636 .clone(),
637 )
638 }
639
640 pub fn search_project(
641 &mut self,
642 project: ModelHandle<Project>,
643 phrase: String,
644 limit: usize,
645 includes: Vec<PathMatcher>,
646 excludes: Vec<PathMatcher>,
647 cx: &mut ModelContext<Self>,
648 ) -> Task<Result<Vec<SearchResult>>> {
649 let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
650 state
651 } else {
652 return Task::ready(Err(anyhow!("project not added")));
653 };
654
655 let worktree_db_ids = project
656 .read(cx)
657 .worktrees(cx)
658 .filter_map(|worktree| {
659 let worktree_id = worktree.read(cx).id();
660 project_state.db_id_for_worktree_id(worktree_id)
661 })
662 .collect::<Vec<_>>();
663
664 let embedding_provider = self.embedding_provider.clone();
665 let db_path = self.db.path().clone();
666 let fs = self.fs.clone();
667 cx.spawn(|this, mut cx| async move {
668 let t0 = Instant::now();
669 let database =
670 VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?;
671
672 let phrase_embedding = embedding_provider
673 .embed_batch(vec![phrase])
674 .await?
675 .into_iter()
676 .next()
677 .unwrap();
678
679 log::trace!(
680 "Embedding search phrase took: {:?} milliseconds",
681 t0.elapsed().as_millis()
682 );
683
684 let file_ids = database
685 .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)
686 .await?;
687
688 let batch_n = cx.background().num_cpus();
689 let ids_len = file_ids.clone().len();
690 let batch_size = if ids_len <= batch_n {
691 ids_len
692 } else {
693 ids_len / batch_n
694 };
695
696 let mut batch_results = Vec::new();
697 for batch in file_ids.chunks(batch_size) {
698 let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
699 let limit = limit.clone();
700 let fs = fs.clone();
701 let db_path = db_path.clone();
702 let phrase_embedding = phrase_embedding.clone();
703 if let Some(db) = VectorDatabase::new(fs, db_path.clone(), cx.background())
704 .await
705 .log_err()
706 {
707 batch_results.push(async move {
708 db.top_k_search(&phrase_embedding, limit, batch.as_slice())
709 .await
710 });
711 }
712 }
713 let batch_results = futures::future::join_all(batch_results).await;
714
715 let mut results = Vec::new();
716 for batch_result in batch_results {
717 if batch_result.is_ok() {
718 for (id, similarity) in batch_result.unwrap() {
719 let ix = match results.binary_search_by(|(_, s)| {
720 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
721 }) {
722 Ok(ix) => ix,
723 Err(ix) => ix,
724 };
725 results.insert(ix, (id, similarity));
726 results.truncate(limit);
727 }
728 }
729 }
730
731 let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
732 let documents = database.get_documents_by_ids(ids.as_slice()).await?;
733
734 let mut tasks = Vec::new();
735 let mut ranges = Vec::new();
736 let weak_project = project.downgrade();
737 project.update(&mut cx, |project, cx| {
738 for (worktree_db_id, file_path, byte_range) in documents {
739 let project_state =
740 if let Some(state) = this.read(cx).projects.get(&weak_project) {
741 state
742 } else {
743 return Err(anyhow!("project not added"));
744 };
745 if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
746 tasks.push(project.open_buffer((worktree_id, file_path), cx));
747 ranges.push(byte_range);
748 }
749 }
750
751 Ok(())
752 })?;
753
754 let buffers = futures::future::join_all(tasks).await;
755
756 log::trace!(
757 "Semantic Searching took: {:?} milliseconds in total",
758 t0.elapsed().as_millis()
759 );
760
761 Ok(buffers
762 .into_iter()
763 .zip(ranges)
764 .filter_map(|(buffer, range)| {
765 let buffer = buffer.log_err()?;
766 let range = buffer.read_with(&cx, |buffer, _| {
767 buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
768 });
769 Some(SearchResult { buffer, range })
770 })
771 .collect::<Vec<_>>())
772 })
773 }
774
775 async fn reindex_changed_paths(
776 this: ModelHandle<SemanticIndex>,
777 project: ModelHandle<Project>,
778 last_changed_before: Option<Instant>,
779 cx: &mut AsyncAppContext,
780 embeddings_for_digest: Arc<HashMap<DocumentDigest, Embedding>>,
781 ) {
782 let mut pending_files = Vec::new();
783 let mut files_to_delete = Vec::new();
784 let (db, language_registry, parsing_files_tx) = this.update(cx, |this, cx| {
785 if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
786 let outstanding_job_count_tx = &project_state.outstanding_job_count_tx;
787 let db_ids = &project_state.worktree_db_ids;
788 let mut worktree: Option<ModelHandle<Worktree>> = None;
789
790 project_state.changed_paths.retain(|path, info| {
791 if let Some(last_changed_before) = last_changed_before {
792 if info.changed_at > last_changed_before {
793 return true;
794 }
795 }
796
797 if worktree
798 .as_ref()
799 .map_or(true, |tree| tree.read(cx).id() != path.worktree_id)
800 {
801 worktree = project.read(cx).worktree_for_id(path.worktree_id, cx);
802 }
803 let Some(worktree) = &worktree else {
804 return false;
805 };
806
807 let Some(worktree_db_id) = db_ids
808 .iter()
809 .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1))
810 else {
811 return false;
812 };
813
814 if info.is_deleted {
815 files_to_delete.push((worktree_db_id, path.path.to_path_buf()));
816 } else {
817 let absolute_path = worktree.read(cx).absolutize(&path.path);
818 let job_handle = JobHandle::new(&outstanding_job_count_tx);
819 pending_files.push(PendingFile {
820 absolute_path,
821 relative_path: path.path.to_path_buf(),
822 language: None,
823 job_handle,
824 modified_time: info.mtime,
825 worktree_db_id,
826 });
827 }
828
829 false
830 });
831 }
832
833 (
834 this.db.clone(),
835 this.language_registry.clone(),
836 this.parsing_files_tx.clone(),
837 )
838 });
839
840 for (worktree_db_id, path) in files_to_delete {
841 db.delete_file(worktree_db_id, path).await.log_err();
842 }
843
844 for mut pending_file in pending_files {
845 if let Ok(language) = language_registry
846 .language_for_file(&pending_file.relative_path, None)
847 .await
848 {
849 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
850 && &language.name().as_ref() != &"Markdown"
851 && language
852 .grammar()
853 .and_then(|grammar| grammar.embedding_config.as_ref())
854 .is_none()
855 {
856 continue;
857 }
858 pending_file.language = Some(language);
859 }
860 parsing_files_tx
861 .try_send((embeddings_for_digest.clone(), pending_file))
862 .ok();
863 }
864 }
865}
866
867impl Entity for SemanticIndex {
868 type Event = ();
869}
870
871impl Drop for JobHandle {
872 fn drop(&mut self) {
873 if let Some(inner) = Arc::get_mut(&mut self.tx) {
874 // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
875 if let Some(tx) = inner.upgrade() {
876 let mut tx = tx.lock();
877 *tx.borrow_mut() -= 1;
878 }
879 }
880 }
881}
882
883#[cfg(test)]
884mod tests {
885
886 use super::*;
887 #[test]
888 fn test_job_handle() {
889 let (job_count_tx, job_count_rx) = watch::channel_with(0);
890 let tx = Arc::new(Mutex::new(job_count_tx));
891 let job_handle = JobHandle::new(&tx);
892
893 assert_eq!(1, *job_count_rx.borrow());
894 let new_job_handle = job_handle.clone();
895 assert_eq!(1, *job_count_rx.borrow());
896 drop(job_handle);
897 assert_eq!(1, *job_count_rx.borrow());
898 drop(new_job_handle);
899 assert_eq!(0, *job_count_rx.borrow());
900 }
901}