1mod db;
2mod embedding;
3mod parsing;
4pub mod semantic_index_settings;
5
6#[cfg(test)]
7mod semantic_index_tests;
8
9use crate::semantic_index_settings::SemanticIndexSettings;
10use anyhow::{anyhow, Result};
11use db::VectorDatabase;
12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
13use futures::{channel::oneshot, Future};
14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
15use language::{Anchor, Buffer, Language, LanguageRegistry};
16use parking_lot::Mutex;
17use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
18use postage::watch;
19use project::{
20 search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId,
21};
22use smol::channel;
23use std::{
24 cmp::Ordering,
25 collections::{BTreeMap, HashMap},
26 mem,
27 ops::Range,
28 path::{Path, PathBuf},
29 sync::{Arc, Weak},
30 time::{Duration, Instant, SystemTime},
31};
32use util::{
33 channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
34 http::HttpClient,
35 paths::EMBEDDINGS_DIR,
36 ResultExt,
37};
38use workspace::WorkspaceCreated;
39
40const SEMANTIC_INDEX_VERSION: usize = 7;
41const EMBEDDINGS_BATCH_SIZE: usize = 80;
42const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(600);
43
44pub fn init(
45 fs: Arc<dyn Fs>,
46 http_client: Arc<dyn HttpClient>,
47 language_registry: Arc<LanguageRegistry>,
48 cx: &mut AppContext,
49) {
50 settings::register::<SemanticIndexSettings>(cx);
51
52 let db_file_path = EMBEDDINGS_DIR
53 .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
54 .join("embeddings_db");
55
56 // This needs to be removed at some point before stable.
57 if *RELEASE_CHANNEL == ReleaseChannel::Stable {
58 return;
59 }
60
61 cx.subscribe_global::<WorkspaceCreated, _>({
62 move |event, cx| {
63 let Some(semantic_index) = SemanticIndex::global(cx) else {
64 return;
65 };
66 let workspace = &event.0;
67 if let Some(workspace) = workspace.upgrade(cx) {
68 let project = workspace.read(cx).project().clone();
69 if project.read(cx).is_local() {
70 semantic_index.update(cx, |index, cx| {
71 index.initialize_project(project, cx).detach_and_log_err(cx)
72 });
73 }
74 }
75 }
76 })
77 .detach();
78
79 cx.spawn(move |mut cx| async move {
80 let semantic_index = SemanticIndex::new(
81 fs,
82 db_file_path,
83 // Arc::new(embedding::DummyEmbeddings {}),
84 Arc::new(OpenAIEmbeddings {
85 client: http_client,
86 executor: cx.background(),
87 }),
88 language_registry,
89 cx.clone(),
90 )
91 .await?;
92
93 cx.update(|cx| {
94 cx.set_global(semantic_index.clone());
95 });
96
97 anyhow::Ok(())
98 })
99 .detach();
100}
101
102pub struct SemanticIndex {
103 fs: Arc<dyn Fs>,
104 database_url: Arc<PathBuf>,
105 embedding_provider: Arc<dyn EmbeddingProvider>,
106 language_registry: Arc<LanguageRegistry>,
107 db_update_tx: channel::Sender<DbOperation>,
108 parsing_files_tx: channel::Sender<PendingFile>,
109 _db_update_task: Task<()>,
110 _embed_batch_tasks: Vec<Task<()>>,
111 _batch_files_task: Task<()>,
112 _parsing_files_tasks: Vec<Task<()>>,
113 projects: HashMap<WeakModelHandle<Project>, ProjectState>,
114}
115
116struct ProjectState {
117 worktree_db_ids: Vec<(WorktreeId, i64)>,
118 _subscription: gpui::Subscription,
119 outstanding_job_count_rx: watch::Receiver<usize>,
120 outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
121 changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
122}
123
124struct ChangedPathInfo {
125 changed_at: Instant,
126 mtime: SystemTime,
127 is_deleted: bool,
128}
129
130#[derive(Clone)]
131struct JobHandle {
132 /// The outer Arc is here to count the clones of a JobHandle instance;
133 /// when the last handle to a given job is dropped, we decrement a counter (just once).
134 tx: Arc<Weak<Mutex<watch::Sender<usize>>>>,
135}
136
137impl JobHandle {
138 fn new(tx: &Arc<Mutex<watch::Sender<usize>>>) -> Self {
139 *tx.lock().borrow_mut() += 1;
140 Self {
141 tx: Arc::new(Arc::downgrade(&tx)),
142 }
143 }
144}
145
146impl ProjectState {
147 fn new(
148 subscription: gpui::Subscription,
149 worktree_db_ids: Vec<(WorktreeId, i64)>,
150 changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
151 ) -> Self {
152 let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0);
153 let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx));
154 Self {
155 worktree_db_ids,
156 outstanding_job_count_rx,
157 outstanding_job_count_tx,
158 changed_paths,
159 _subscription: subscription,
160 }
161 }
162
163 pub fn get_outstanding_count(&self) -> usize {
164 self.outstanding_job_count_rx.borrow().clone()
165 }
166
167 fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
168 self.worktree_db_ids
169 .iter()
170 .find_map(|(worktree_id, db_id)| {
171 if *worktree_id == id {
172 Some(*db_id)
173 } else {
174 None
175 }
176 })
177 }
178
179 fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
180 self.worktree_db_ids
181 .iter()
182 .find_map(|(worktree_id, db_id)| {
183 if *db_id == id {
184 Some(*worktree_id)
185 } else {
186 None
187 }
188 })
189 }
190}
191
192#[derive(Clone)]
193pub struct PendingFile {
194 worktree_db_id: i64,
195 relative_path: PathBuf,
196 absolute_path: PathBuf,
197 language: Option<Arc<Language>>,
198 modified_time: SystemTime,
199 job_handle: JobHandle,
200}
201
202pub struct SearchResult {
203 pub buffer: ModelHandle<Buffer>,
204 pub range: Range<Anchor>,
205}
206
207enum DbOperation {
208 InsertFile {
209 worktree_id: i64,
210 documents: Vec<Document>,
211 path: PathBuf,
212 mtime: SystemTime,
213 job_handle: JobHandle,
214 },
215 Delete {
216 worktree_id: i64,
217 path: PathBuf,
218 },
219 FindOrCreateWorktree {
220 path: PathBuf,
221 sender: oneshot::Sender<Result<i64>>,
222 },
223 FileMTimes {
224 worktree_id: i64,
225 sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
226 },
227 WorktreePreviouslyIndexed {
228 path: Arc<Path>,
229 sender: oneshot::Sender<Result<bool>>,
230 },
231}
232
233enum EmbeddingJob {
234 Enqueue {
235 worktree_id: i64,
236 path: PathBuf,
237 mtime: SystemTime,
238 documents: Vec<Document>,
239 job_handle: JobHandle,
240 },
241 Flush,
242}
243
244impl SemanticIndex {
245 pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
246 if cx.has_global::<ModelHandle<Self>>() {
247 Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
248 } else {
249 None
250 }
251 }
252
253 pub fn enabled(cx: &AppContext) -> bool {
254 settings::get::<SemanticIndexSettings>(cx).enabled
255 && *RELEASE_CHANNEL != ReleaseChannel::Stable
256 }
257
258 async fn new(
259 fs: Arc<dyn Fs>,
260 database_url: PathBuf,
261 embedding_provider: Arc<dyn EmbeddingProvider>,
262 language_registry: Arc<LanguageRegistry>,
263 mut cx: AsyncAppContext,
264 ) -> Result<ModelHandle<Self>> {
265 let t0 = Instant::now();
266 let database_url = Arc::new(database_url);
267
268 let db = cx
269 .background()
270 .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
271 .await?;
272
273 log::trace!(
274 "db initialization took {:?} milliseconds",
275 t0.elapsed().as_millis()
276 );
277
278 Ok(cx.add_model(|cx| {
279 let t0 = Instant::now();
280 // Perform database operations
281 let (db_update_tx, db_update_rx) = channel::unbounded();
282 let _db_update_task = cx.background().spawn({
283 async move {
284 while let Ok(job) = db_update_rx.recv().await {
285 Self::run_db_operation(&db, job)
286 }
287 }
288 });
289
290 // Group documents into batches and send them to the embedding provider.
291 let (embed_batch_tx, embed_batch_rx) =
292 channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
293 let mut _embed_batch_tasks = Vec::new();
294 for _ in 0..cx.background().num_cpus() {
295 let embed_batch_rx = embed_batch_rx.clone();
296 _embed_batch_tasks.push(cx.background().spawn({
297 let db_update_tx = db_update_tx.clone();
298 let embedding_provider = embedding_provider.clone();
299 async move {
300 while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
301 Self::compute_embeddings_for_batch(
302 embeddings_queue,
303 &embedding_provider,
304 &db_update_tx,
305 )
306 .await;
307 }
308 }
309 }));
310 }
311
312 // Group documents into batches and send them to the embedding provider.
313 let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
314 let _batch_files_task = cx.background().spawn(async move {
315 let mut queue_len = 0;
316 let mut embeddings_queue = vec![];
317 while let Ok(job) = batch_files_rx.recv().await {
318 Self::enqueue_documents_to_embed(
319 job,
320 &mut queue_len,
321 &mut embeddings_queue,
322 &embed_batch_tx,
323 );
324 }
325 });
326
327 // Parse files into embeddable documents.
328 let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
329 let mut _parsing_files_tasks = Vec::new();
330 for _ in 0..cx.background().num_cpus() {
331 let fs = fs.clone();
332 let parsing_files_rx = parsing_files_rx.clone();
333 let batch_files_tx = batch_files_tx.clone();
334 let db_update_tx = db_update_tx.clone();
335 let embedding_provider = embedding_provider.clone();
336 _parsing_files_tasks.push(cx.background().spawn(async move {
337 let mut retriever = CodeContextRetriever::new(embedding_provider.clone());
338 while let Ok(pending_file) = parsing_files_rx.recv().await {
339 Self::parse_file(
340 &fs,
341 pending_file,
342 &mut retriever,
343 &batch_files_tx,
344 &parsing_files_rx,
345 &db_update_tx,
346 )
347 .await;
348 }
349 }));
350 }
351
352 log::trace!(
353 "semantic index task initialization took {:?} milliseconds",
354 t0.elapsed().as_millis()
355 );
356 Self {
357 fs,
358 database_url,
359 embedding_provider,
360 language_registry,
361 db_update_tx,
362 parsing_files_tx,
363 _db_update_task,
364 _embed_batch_tasks,
365 _batch_files_task,
366 _parsing_files_tasks,
367 projects: HashMap::new(),
368 }
369 }))
370 }
371
372 fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
373 match job {
374 DbOperation::InsertFile {
375 worktree_id,
376 documents,
377 path,
378 mtime,
379 job_handle,
380 } => {
381 db.insert_file(worktree_id, path, mtime, documents)
382 .log_err();
383 drop(job_handle)
384 }
385 DbOperation::Delete { worktree_id, path } => {
386 db.delete_file(worktree_id, path).log_err();
387 }
388 DbOperation::FindOrCreateWorktree { path, sender } => {
389 let id = db.find_or_create_worktree(&path);
390 sender.send(id).ok();
391 }
392 DbOperation::FileMTimes {
393 worktree_id: worktree_db_id,
394 sender,
395 } => {
396 let file_mtimes = db.get_file_mtimes(worktree_db_id);
397 sender.send(file_mtimes).ok();
398 }
399 DbOperation::WorktreePreviouslyIndexed { path, sender } => {
400 let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
401 sender.send(worktree_indexed).ok();
402 }
403 }
404 }
405
406 async fn compute_embeddings_for_batch(
407 mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
408 embedding_provider: &Arc<dyn EmbeddingProvider>,
409 db_update_tx: &channel::Sender<DbOperation>,
410 ) {
411 let mut batch_documents = vec![];
412 for (_, documents, _, _, _) in embeddings_queue.iter() {
413 batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
414 }
415
416 if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
417 log::trace!(
418 "created {} embeddings for {} files",
419 embeddings.len(),
420 embeddings_queue.len(),
421 );
422
423 let mut i = 0;
424 let mut j = 0;
425
426 for embedding in embeddings.iter() {
427 while embeddings_queue[i].1.len() == j {
428 i += 1;
429 j = 0;
430 }
431
432 embeddings_queue[i].1[j].embedding = embedding.to_owned();
433 j += 1;
434 }
435
436 for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
437 db_update_tx
438 .send(DbOperation::InsertFile {
439 worktree_id,
440 documents,
441 path,
442 mtime,
443 job_handle,
444 })
445 .await
446 .unwrap();
447 }
448 } else {
449 // Insert the file in spite of failure so that future attempts to index it do not take place (unless the file is changed).
450 for (worktree_id, _, path, mtime, job_handle) in embeddings_queue.into_iter() {
451 db_update_tx
452 .send(DbOperation::InsertFile {
453 worktree_id,
454 documents: vec![],
455 path,
456 mtime,
457 job_handle,
458 })
459 .await
460 .unwrap();
461 }
462 }
463 }
464
465 fn enqueue_documents_to_embed(
466 job: EmbeddingJob,
467 queue_len: &mut usize,
468 embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
469 embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
470 ) {
471 // Handle edge case where individual file has more documents than max batch size
472 let should_flush = match job {
473 EmbeddingJob::Enqueue {
474 documents,
475 worktree_id,
476 path,
477 mtime,
478 job_handle,
479 } => {
480 // If documents is greater than embeddings batch size, recursively batch existing rows.
481 if &documents.len() > &EMBEDDINGS_BATCH_SIZE {
482 let first_job = EmbeddingJob::Enqueue {
483 documents: documents[..EMBEDDINGS_BATCH_SIZE].to_vec(),
484 worktree_id,
485 path: path.clone(),
486 mtime,
487 job_handle: job_handle.clone(),
488 };
489
490 Self::enqueue_documents_to_embed(
491 first_job,
492 queue_len,
493 embeddings_queue,
494 embed_batch_tx,
495 );
496
497 let second_job = EmbeddingJob::Enqueue {
498 documents: documents[EMBEDDINGS_BATCH_SIZE..].to_vec(),
499 worktree_id,
500 path: path.clone(),
501 mtime,
502 job_handle: job_handle.clone(),
503 };
504
505 Self::enqueue_documents_to_embed(
506 second_job,
507 queue_len,
508 embeddings_queue,
509 embed_batch_tx,
510 );
511 return;
512 } else {
513 *queue_len += &documents.len();
514 embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
515 *queue_len >= EMBEDDINGS_BATCH_SIZE
516 }
517 }
518 EmbeddingJob::Flush => true,
519 };
520
521 if should_flush {
522 embed_batch_tx
523 .try_send(mem::take(embeddings_queue))
524 .unwrap();
525 *queue_len = 0;
526 }
527 }
528
529 async fn parse_file(
530 fs: &Arc<dyn Fs>,
531 pending_file: PendingFile,
532 retriever: &mut CodeContextRetriever,
533 batch_files_tx: &channel::Sender<EmbeddingJob>,
534 parsing_files_rx: &channel::Receiver<PendingFile>,
535 db_update_tx: &channel::Sender<DbOperation>,
536 ) {
537 let Some(language) = pending_file.language else {
538 return;
539 };
540
541 if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
542 if let Some(documents) = retriever
543 .parse_file_with_template(&pending_file.relative_path, &content, language)
544 .log_err()
545 {
546 log::trace!(
547 "parsed path {:?}: {} documents",
548 pending_file.relative_path,
549 documents.len()
550 );
551
552 if documents.len() == 0 {
553 db_update_tx
554 .send(DbOperation::InsertFile {
555 worktree_id: pending_file.worktree_db_id,
556 documents,
557 path: pending_file.relative_path,
558 mtime: pending_file.modified_time,
559 job_handle: pending_file.job_handle,
560 })
561 .await
562 .unwrap();
563 } else {
564 batch_files_tx
565 .try_send(EmbeddingJob::Enqueue {
566 worktree_id: pending_file.worktree_db_id,
567 path: pending_file.relative_path,
568 mtime: pending_file.modified_time,
569 job_handle: pending_file.job_handle,
570 documents,
571 })
572 .unwrap();
573 }
574 }
575 }
576
577 if parsing_files_rx.len() == 0 {
578 batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
579 }
580 }
581
582 fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
583 let (tx, rx) = oneshot::channel();
584 self.db_update_tx
585 .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
586 .unwrap();
587 async move { rx.await? }
588 }
589
590 fn get_file_mtimes(
591 &self,
592 worktree_id: i64,
593 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
594 let (tx, rx) = oneshot::channel();
595 self.db_update_tx
596 .try_send(DbOperation::FileMTimes {
597 worktree_id,
598 sender: tx,
599 })
600 .unwrap();
601 async move { rx.await? }
602 }
603
604 fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
605 let (tx, rx) = oneshot::channel();
606 self.db_update_tx
607 .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
608 .unwrap();
609 async move { rx.await? }
610 }
611
612 pub fn project_previously_indexed(
613 &mut self,
614 project: ModelHandle<Project>,
615 cx: &mut ModelContext<Self>,
616 ) -> Task<Result<bool>> {
617 let worktrees_indexed_previously = project
618 .read(cx)
619 .worktrees(cx)
620 .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
621 .collect::<Vec<_>>();
622 cx.spawn(|_, _cx| async move {
623 let worktree_indexed_previously =
624 futures::future::join_all(worktrees_indexed_previously).await;
625
626 Ok(worktree_indexed_previously
627 .iter()
628 .filter(|worktree| worktree.is_ok())
629 .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
630 })
631 }
632
633 fn project_entries_changed(
634 &mut self,
635 project: ModelHandle<Project>,
636 changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
637 cx: &mut ModelContext<'_, SemanticIndex>,
638 worktree_id: &WorktreeId,
639 ) {
640 let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
641 return;
642 };
643 let project = project.downgrade();
644 let Some(project_state) = self.projects.get_mut(&project) else {
645 return;
646 };
647
648 let worktree = worktree.read(cx);
649 let change_time = Instant::now();
650 for (path, entry_id, change) in changes.iter() {
651 let Some(entry) = worktree.entry_for_id(*entry_id) else {
652 continue;
653 };
654 if entry.is_ignored || entry.is_symlink || entry.is_external {
655 continue;
656 }
657 let project_path = ProjectPath {
658 worktree_id: *worktree_id,
659 path: path.clone(),
660 };
661 project_state.changed_paths.insert(
662 project_path,
663 ChangedPathInfo {
664 changed_at: change_time,
665 mtime: entry.mtime,
666 is_deleted: *change == PathChange::Removed,
667 },
668 );
669 }
670
671 cx.spawn_weak(|this, mut cx| async move {
672 cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
673 if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
674 Self::reindex_changed_paths(this, project, Some(change_time), &mut cx).await;
675 }
676 })
677 .detach();
678 }
679
680 pub fn initialize_project(
681 &mut self,
682 project: ModelHandle<Project>,
683 cx: &mut ModelContext<Self>,
684 ) -> Task<Result<()>> {
685 log::trace!("Initializing Project for Semantic Index");
686 let worktree_scans_complete = project
687 .read(cx)
688 .worktrees(cx)
689 .map(|worktree| {
690 let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
691 async move {
692 scan_complete.await;
693 }
694 })
695 .collect::<Vec<_>>();
696
697 let worktree_db_ids = project
698 .read(cx)
699 .worktrees(cx)
700 .map(|worktree| {
701 self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
702 })
703 .collect::<Vec<_>>();
704
705 let _subscription = cx.subscribe(&project, |this, project, event, cx| {
706 if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
707 this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
708 };
709 });
710
711 let language_registry = self.language_registry.clone();
712
713 cx.spawn(|this, mut cx| async move {
714 futures::future::join_all(worktree_scans_complete).await;
715
716 let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
717 let worktrees = project.read_with(&cx, |project, cx| {
718 project
719 .worktrees(cx)
720 .map(|worktree| worktree.read(cx).snapshot())
721 .collect::<Vec<_>>()
722 });
723
724 let mut worktree_file_mtimes = HashMap::new();
725 let mut db_ids_by_worktree_id = HashMap::new();
726
727 for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
728 let db_id = db_id?;
729 db_ids_by_worktree_id.insert(worktree.id(), db_id);
730 worktree_file_mtimes.insert(
731 worktree.id(),
732 this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
733 .await?,
734 );
735 }
736
737 let worktree_db_ids = db_ids_by_worktree_id
738 .iter()
739 .map(|(a, b)| (*a, *b))
740 .collect();
741
742 let changed_paths = cx
743 .background()
744 .spawn(async move {
745 let mut changed_paths = BTreeMap::new();
746 let now = Instant::now();
747 for worktree in worktrees.into_iter() {
748 let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
749 for file in worktree.files(false, 0) {
750 let absolute_path = worktree.absolutize(&file.path);
751
752 if file.is_external || file.is_ignored || file.is_symlink {
753 continue;
754 }
755
756 if let Ok(language) = language_registry
757 .language_for_file(&absolute_path, None)
758 .await
759 {
760 // Test if file is valid parseable file
761 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
762 && &language.name().as_ref() != &"Markdown"
763 && language
764 .grammar()
765 .and_then(|grammar| grammar.embedding_config.as_ref())
766 .is_none()
767 {
768 continue;
769 }
770
771 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
772 let already_stored = stored_mtime
773 .map_or(false, |existing_mtime| existing_mtime == file.mtime);
774
775 if !already_stored {
776 changed_paths.insert(
777 ProjectPath {
778 worktree_id: worktree.id(),
779 path: file.path.clone(),
780 },
781 ChangedPathInfo {
782 changed_at: now,
783 mtime: file.mtime,
784 is_deleted: false,
785 },
786 );
787 }
788 }
789 }
790
791 // Clean up entries from database that are no longer in the worktree.
792 for (path, mtime) in file_mtimes {
793 changed_paths.insert(
794 ProjectPath {
795 worktree_id: worktree.id(),
796 path: path.into(),
797 },
798 ChangedPathInfo {
799 changed_at: now,
800 mtime,
801 is_deleted: true,
802 },
803 );
804 }
805 }
806
807 anyhow::Ok(changed_paths)
808 })
809 .await?;
810
811 this.update(&mut cx, |this, _| {
812 this.projects.insert(
813 project.downgrade(),
814 ProjectState::new(_subscription, worktree_db_ids, changed_paths),
815 );
816 });
817 Result::<(), _>::Ok(())
818 })
819 }
820
821 pub fn index_project(
822 &mut self,
823 project: ModelHandle<Project>,
824 cx: &mut ModelContext<Self>,
825 ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
826 cx.spawn(|this, mut cx| async move {
827 Self::reindex_changed_paths(this.clone(), project.clone(), None, &mut cx).await;
828
829 this.update(&mut cx, |this, _cx| {
830 let Some(state) = this.projects.get(&project.downgrade()) else {
831 return Err(anyhow!("Project not yet initialized"));
832 };
833 let job_count_rx = state.outstanding_job_count_rx.clone();
834 let count = state.get_outstanding_count();
835 Ok((count, job_count_rx))
836 })
837 })
838 }
839
840 pub fn outstanding_job_count_rx(
841 &self,
842 project: &ModelHandle<Project>,
843 ) -> Option<watch::Receiver<usize>> {
844 Some(
845 self.projects
846 .get(&project.downgrade())?
847 .outstanding_job_count_rx
848 .clone(),
849 )
850 }
851
852 pub fn search_project(
853 &mut self,
854 project: ModelHandle<Project>,
855 phrase: String,
856 limit: usize,
857 includes: Vec<PathMatcher>,
858 excludes: Vec<PathMatcher>,
859 cx: &mut ModelContext<Self>,
860 ) -> Task<Result<Vec<SearchResult>>> {
861 let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
862 state
863 } else {
864 return Task::ready(Err(anyhow!("project not added")));
865 };
866
867 let worktree_db_ids = project
868 .read(cx)
869 .worktrees(cx)
870 .filter_map(|worktree| {
871 let worktree_id = worktree.read(cx).id();
872 project_state.db_id_for_worktree_id(worktree_id)
873 })
874 .collect::<Vec<_>>();
875
876 let embedding_provider = self.embedding_provider.clone();
877 let database_url = self.database_url.clone();
878 let fs = self.fs.clone();
879 cx.spawn(|this, mut cx| async move {
880 let t0 = Instant::now();
881 let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
882
883 let phrase_embedding = embedding_provider
884 .embed_batch(vec![&phrase])
885 .await?
886 .into_iter()
887 .next()
888 .unwrap();
889
890 log::trace!(
891 "Embedding search phrase took: {:?} milliseconds",
892 t0.elapsed().as_millis()
893 );
894
895 let file_ids =
896 database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?;
897
898 let batch_n = cx.background().num_cpus();
899 let ids_len = file_ids.clone().len();
900 let batch_size = if ids_len <= batch_n {
901 ids_len
902 } else {
903 ids_len / batch_n
904 };
905
906 let mut result_tasks = Vec::new();
907 for batch in file_ids.chunks(batch_size) {
908 let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
909 let limit = limit.clone();
910 let fs = fs.clone();
911 let database_url = database_url.clone();
912 let phrase_embedding = phrase_embedding.clone();
913 let task = cx.background().spawn(async move {
914 let database = VectorDatabase::new(fs, database_url).await.log_err();
915 if database.is_none() {
916 return Err(anyhow!("failed to acquire database connection"));
917 } else {
918 database
919 .unwrap()
920 .top_k_search(&phrase_embedding, limit, batch.as_slice())
921 }
922 });
923 result_tasks.push(task);
924 }
925
926 let batch_results = futures::future::join_all(result_tasks).await;
927
928 let mut results = Vec::new();
929 for batch_result in batch_results {
930 if batch_result.is_ok() {
931 for (id, similarity) in batch_result.unwrap() {
932 let ix = match results.binary_search_by(|(_, s)| {
933 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
934 }) {
935 Ok(ix) => ix,
936 Err(ix) => ix,
937 };
938 results.insert(ix, (id, similarity));
939 results.truncate(limit);
940 }
941 }
942 }
943
944 let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
945 let documents = database.get_documents_by_ids(ids.as_slice())?;
946
947 let mut tasks = Vec::new();
948 let mut ranges = Vec::new();
949 let weak_project = project.downgrade();
950 project.update(&mut cx, |project, cx| {
951 for (worktree_db_id, file_path, byte_range) in documents {
952 let project_state =
953 if let Some(state) = this.read(cx).projects.get(&weak_project) {
954 state
955 } else {
956 return Err(anyhow!("project not added"));
957 };
958 if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
959 tasks.push(project.open_buffer((worktree_id, file_path), cx));
960 ranges.push(byte_range);
961 }
962 }
963
964 Ok(())
965 })?;
966
967 let buffers = futures::future::join_all(tasks).await;
968
969 log::trace!(
970 "Semantic Searching took: {:?} milliseconds in total",
971 t0.elapsed().as_millis()
972 );
973
974 Ok(buffers
975 .into_iter()
976 .zip(ranges)
977 .filter_map(|(buffer, range)| {
978 let buffer = buffer.log_err()?;
979 let range = buffer.read_with(&cx, |buffer, _| {
980 buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
981 });
982 Some(SearchResult { buffer, range })
983 })
984 .collect::<Vec<_>>())
985 })
986 }
987
988 async fn reindex_changed_paths(
989 this: ModelHandle<SemanticIndex>,
990 project: ModelHandle<Project>,
991 last_changed_before: Option<Instant>,
992 cx: &mut AsyncAppContext,
993 ) {
994 let mut pending_files = Vec::new();
995 let (language_registry, parsing_files_tx) = this.update(cx, |this, cx| {
996 if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
997 let outstanding_job_count_tx = &project_state.outstanding_job_count_tx;
998 let db_ids = &project_state.worktree_db_ids;
999 let mut worktree: Option<ModelHandle<Worktree>> = None;
1000
1001 project_state.changed_paths.retain(|path, info| {
1002 if let Some(last_changed_before) = last_changed_before {
1003 if info.changed_at > last_changed_before {
1004 return true;
1005 }
1006 }
1007
1008 if worktree
1009 .as_ref()
1010 .map_or(true, |tree| tree.read(cx).id() != path.worktree_id)
1011 {
1012 worktree = project.read(cx).worktree_for_id(path.worktree_id, cx);
1013 }
1014 let Some(worktree) = &worktree else {
1015 return false;
1016 };
1017
1018 let Some(worktree_db_id) = db_ids
1019 .iter()
1020 .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1))
1021 else {
1022 return false;
1023 };
1024
1025 if info.is_deleted {
1026 this.db_update_tx
1027 .try_send(DbOperation::Delete {
1028 worktree_id: worktree_db_id,
1029 path: path.path.to_path_buf(),
1030 })
1031 .ok();
1032 } else {
1033 let absolute_path = worktree.read(cx).absolutize(&path.path);
1034 let job_handle = JobHandle::new(&outstanding_job_count_tx);
1035 pending_files.push(PendingFile {
1036 absolute_path,
1037 relative_path: path.path.to_path_buf(),
1038 language: None,
1039 job_handle,
1040 modified_time: info.mtime,
1041 worktree_db_id,
1042 });
1043 }
1044
1045 false
1046 });
1047 }
1048
1049 (
1050 this.language_registry.clone(),
1051 this.parsing_files_tx.clone(),
1052 )
1053 });
1054
1055 for mut pending_file in pending_files {
1056 if let Ok(language) = language_registry
1057 .language_for_file(&pending_file.relative_path, None)
1058 .await
1059 {
1060 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
1061 && &language.name().as_ref() != &"Markdown"
1062 && language
1063 .grammar()
1064 .and_then(|grammar| grammar.embedding_config.as_ref())
1065 .is_none()
1066 {
1067 continue;
1068 }
1069 pending_file.language = Some(language);
1070 }
1071 parsing_files_tx.try_send(pending_file).ok();
1072 }
1073 }
1074}
1075
1076impl Entity for SemanticIndex {
1077 type Event = ();
1078}
1079
1080impl Drop for JobHandle {
1081 fn drop(&mut self) {
1082 if let Some(inner) = Arc::get_mut(&mut self.tx) {
1083 // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not)
1084 if let Some(tx) = inner.upgrade() {
1085 let mut tx = tx.lock();
1086 *tx.borrow_mut() -= 1;
1087 }
1088 }
1089 }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094
1095 use super::*;
1096 #[test]
1097 fn test_job_handle() {
1098 let (job_count_tx, job_count_rx) = watch::channel_with(0);
1099 let tx = Arc::new(Mutex::new(job_count_tx));
1100 let job_handle = JobHandle::new(&tx);
1101
1102 assert_eq!(1, *job_count_rx.borrow());
1103 let new_job_handle = job_handle.clone();
1104 assert_eq!(1, *job_count_rx.borrow());
1105 drop(job_handle);
1106 assert_eq!(1, *job_count_rx.borrow());
1107 drop(new_job_handle);
1108 assert_eq!(0, *job_count_rx.borrow());
1109 }
1110}