1mod db;
2mod embedding;
3mod parsing;
4pub mod semantic_index_settings;
5
6#[cfg(test)]
7mod semantic_index_tests;
8
9use crate::semantic_index_settings::SemanticIndexSettings;
10use anyhow::{anyhow, Result};
11use db::VectorDatabase;
12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
13use futures::{channel::oneshot, Future};
14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
15use language::{Anchor, Buffer, Language, LanguageRegistry};
16use parking_lot::Mutex;
17use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
18use postage::watch;
19use project::{search::PathMatcher, Fs, Project, WorktreeId};
20use smol::channel;
21use std::{
22 cmp::Ordering,
23 collections::HashMap,
24 mem,
25 ops::Range,
26 path::{Path, PathBuf},
27 sync::{Arc, Weak},
28 time::{Instant, SystemTime},
29};
30use util::{
31 channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
32 http::HttpClient,
33 paths::EMBEDDINGS_DIR,
34 ResultExt,
35};
36
37const SEMANTIC_INDEX_VERSION: usize = 6;
38const EMBEDDINGS_BATCH_SIZE: usize = 80;
39
40pub fn init(
41 fs: Arc<dyn Fs>,
42 http_client: Arc<dyn HttpClient>,
43 language_registry: Arc<LanguageRegistry>,
44 cx: &mut AppContext,
45) {
46 settings::register::<SemanticIndexSettings>(cx);
47
48 let db_file_path = EMBEDDINGS_DIR
49 .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
50 .join("embeddings_db");
51
52 // This needs to be removed at some point before stable.
53 if *RELEASE_CHANNEL == ReleaseChannel::Stable {
54 return;
55 }
56
57 cx.spawn(move |mut cx| async move {
58 let semantic_index = SemanticIndex::new(
59 fs,
60 db_file_path,
61 Arc::new(OpenAIEmbeddings {
62 client: http_client,
63 executor: cx.background(),
64 }),
65 language_registry,
66 cx.clone(),
67 )
68 .await?;
69
70 cx.update(|cx| {
71 cx.set_global(semantic_index.clone());
72 });
73
74 anyhow::Ok(())
75 })
76 .detach();
77}
78
79pub struct SemanticIndex {
80 fs: Arc<dyn Fs>,
81 database_url: Arc<PathBuf>,
82 embedding_provider: Arc<dyn EmbeddingProvider>,
83 language_registry: Arc<LanguageRegistry>,
84 db_update_tx: channel::Sender<DbOperation>,
85 parsing_files_tx: channel::Sender<PendingFile>,
86 _db_update_task: Task<()>,
87 _embed_batch_tasks: Vec<Task<()>>,
88 _batch_files_task: Task<()>,
89 _parsing_files_tasks: Vec<Task<()>>,
90 projects: HashMap<WeakModelHandle<Project>, ProjectState>,
91}
92
93struct ProjectState {
94 worktree_db_ids: Vec<(WorktreeId, i64)>,
95 outstanding_job_count_rx: watch::Receiver<usize>,
96 _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
97}
98
99struct JobHandle {
100 tx: Weak<Mutex<watch::Sender<usize>>>,
101}
102
103impl ProjectState {
104 fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
105 self.worktree_db_ids
106 .iter()
107 .find_map(|(worktree_id, db_id)| {
108 if *worktree_id == id {
109 Some(*db_id)
110 } else {
111 None
112 }
113 })
114 }
115
116 fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
117 self.worktree_db_ids
118 .iter()
119 .find_map(|(worktree_id, db_id)| {
120 if *db_id == id {
121 Some(*worktree_id)
122 } else {
123 None
124 }
125 })
126 }
127}
128
129pub struct PendingFile {
130 worktree_db_id: i64,
131 relative_path: PathBuf,
132 absolute_path: PathBuf,
133 language: Arc<Language>,
134 modified_time: SystemTime,
135 job_handle: JobHandle,
136}
137
138pub struct SearchResult {
139 pub buffer: ModelHandle<Buffer>,
140 pub range: Range<Anchor>,
141}
142
143enum DbOperation {
144 InsertFile {
145 worktree_id: i64,
146 documents: Vec<Document>,
147 path: PathBuf,
148 mtime: SystemTime,
149 job_handle: JobHandle,
150 },
151 Delete {
152 worktree_id: i64,
153 path: PathBuf,
154 },
155 FindOrCreateWorktree {
156 path: PathBuf,
157 sender: oneshot::Sender<Result<i64>>,
158 },
159 FileMTimes {
160 worktree_id: i64,
161 sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
162 },
163 WorktreePreviouslyIndexed {
164 path: Arc<Path>,
165 sender: oneshot::Sender<Result<bool>>,
166 },
167}
168
169enum EmbeddingJob {
170 Enqueue {
171 worktree_id: i64,
172 path: PathBuf,
173 mtime: SystemTime,
174 documents: Vec<Document>,
175 job_handle: JobHandle,
176 },
177 Flush,
178}
179
180impl SemanticIndex {
181 pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
182 if cx.has_global::<ModelHandle<Self>>() {
183 Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
184 } else {
185 None
186 }
187 }
188
189 pub fn enabled(cx: &AppContext) -> bool {
190 settings::get::<SemanticIndexSettings>(cx).enabled
191 && *RELEASE_CHANNEL != ReleaseChannel::Stable
192 }
193
194 async fn new(
195 fs: Arc<dyn Fs>,
196 database_url: PathBuf,
197 embedding_provider: Arc<dyn EmbeddingProvider>,
198 language_registry: Arc<LanguageRegistry>,
199 mut cx: AsyncAppContext,
200 ) -> Result<ModelHandle<Self>> {
201 let t0 = Instant::now();
202 let database_url = Arc::new(database_url);
203
204 let db = cx
205 .background()
206 .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
207 .await?;
208
209 log::trace!(
210 "db initialization took {:?} milliseconds",
211 t0.elapsed().as_millis()
212 );
213
214 Ok(cx.add_model(|cx| {
215 let t0 = Instant::now();
216 // Perform database operations
217 let (db_update_tx, db_update_rx) = channel::unbounded();
218 let _db_update_task = cx.background().spawn({
219 async move {
220 while let Ok(job) = db_update_rx.recv().await {
221 Self::run_db_operation(&db, job)
222 }
223 }
224 });
225
226 // Group documents into batches and send them to the embedding provider.
227 let (embed_batch_tx, embed_batch_rx) =
228 channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
229 let mut _embed_batch_tasks = Vec::new();
230 for _ in 0..cx.background().num_cpus() {
231 let embed_batch_rx = embed_batch_rx.clone();
232 _embed_batch_tasks.push(cx.background().spawn({
233 let db_update_tx = db_update_tx.clone();
234 let embedding_provider = embedding_provider.clone();
235 async move {
236 while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
237 Self::compute_embeddings_for_batch(
238 embeddings_queue,
239 &embedding_provider,
240 &db_update_tx,
241 )
242 .await;
243 }
244 }
245 }));
246 }
247
248 // Group documents into batches and send them to the embedding provider.
249 let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
250 let _batch_files_task = cx.background().spawn(async move {
251 let mut queue_len = 0;
252 let mut embeddings_queue = vec![];
253 while let Ok(job) = batch_files_rx.recv().await {
254 Self::enqueue_documents_to_embed(
255 job,
256 &mut queue_len,
257 &mut embeddings_queue,
258 &embed_batch_tx,
259 );
260 }
261 });
262
263 // Parse files into embeddable documents.
264 let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
265 let mut _parsing_files_tasks = Vec::new();
266 for _ in 0..cx.background().num_cpus() {
267 let fs = fs.clone();
268 let parsing_files_rx = parsing_files_rx.clone();
269 let batch_files_tx = batch_files_tx.clone();
270 let db_update_tx = db_update_tx.clone();
271 _parsing_files_tasks.push(cx.background().spawn(async move {
272 let mut retriever = CodeContextRetriever::new();
273 while let Ok(pending_file) = parsing_files_rx.recv().await {
274 Self::parse_file(
275 &fs,
276 pending_file,
277 &mut retriever,
278 &batch_files_tx,
279 &parsing_files_rx,
280 &db_update_tx,
281 )
282 .await;
283 }
284 }));
285 }
286
287 log::trace!(
288 "semantic index task initialization took {:?} milliseconds",
289 t0.elapsed().as_millis()
290 );
291 Self {
292 fs,
293 database_url,
294 embedding_provider,
295 language_registry,
296 db_update_tx,
297 parsing_files_tx,
298 _db_update_task,
299 _embed_batch_tasks,
300 _batch_files_task,
301 _parsing_files_tasks,
302 projects: HashMap::new(),
303 }
304 }))
305 }
306
307 fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
308 match job {
309 DbOperation::InsertFile {
310 worktree_id,
311 documents,
312 path,
313 mtime,
314 job_handle,
315 } => {
316 db.insert_file(worktree_id, path, mtime, documents)
317 .log_err();
318 drop(job_handle)
319 }
320 DbOperation::Delete { worktree_id, path } => {
321 db.delete_file(worktree_id, path).log_err();
322 }
323 DbOperation::FindOrCreateWorktree { path, sender } => {
324 let id = db.find_or_create_worktree(&path);
325 sender.send(id).ok();
326 }
327 DbOperation::FileMTimes {
328 worktree_id: worktree_db_id,
329 sender,
330 } => {
331 let file_mtimes = db.get_file_mtimes(worktree_db_id);
332 sender.send(file_mtimes).ok();
333 }
334 DbOperation::WorktreePreviouslyIndexed { path, sender } => {
335 let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
336 sender.send(worktree_indexed).ok();
337 }
338 }
339 }
340
341 async fn compute_embeddings_for_batch(
342 mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
343 embedding_provider: &Arc<dyn EmbeddingProvider>,
344 db_update_tx: &channel::Sender<DbOperation>,
345 ) {
346 let mut batch_documents = vec![];
347 for (_, documents, _, _, _) in embeddings_queue.iter() {
348 batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
349 }
350
351 if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
352 log::trace!(
353 "created {} embeddings for {} files",
354 embeddings.len(),
355 embeddings_queue.len(),
356 );
357
358 let mut i = 0;
359 let mut j = 0;
360
361 for embedding in embeddings.iter() {
362 while embeddings_queue[i].1.len() == j {
363 i += 1;
364 j = 0;
365 }
366
367 embeddings_queue[i].1[j].embedding = embedding.to_owned();
368 j += 1;
369 }
370
371 for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
372 db_update_tx
373 .send(DbOperation::InsertFile {
374 worktree_id,
375 documents,
376 path,
377 mtime,
378 job_handle,
379 })
380 .await
381 .unwrap();
382 }
383 }
384 }
385
386 fn enqueue_documents_to_embed(
387 job: EmbeddingJob,
388 queue_len: &mut usize,
389 embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
390 embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
391 ) {
392 let should_flush = match job {
393 EmbeddingJob::Enqueue {
394 documents,
395 worktree_id,
396 path,
397 mtime,
398 job_handle,
399 } => {
400 *queue_len += &documents.len();
401 embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
402 *queue_len >= EMBEDDINGS_BATCH_SIZE
403 }
404 EmbeddingJob::Flush => true,
405 };
406
407 if should_flush {
408 embed_batch_tx
409 .try_send(mem::take(embeddings_queue))
410 .unwrap();
411 *queue_len = 0;
412 }
413 }
414
415 async fn parse_file(
416 fs: &Arc<dyn Fs>,
417 pending_file: PendingFile,
418 retriever: &mut CodeContextRetriever,
419 batch_files_tx: &channel::Sender<EmbeddingJob>,
420 parsing_files_rx: &channel::Receiver<PendingFile>,
421 db_update_tx: &channel::Sender<DbOperation>,
422 ) {
423 if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
424 if let Some(documents) = retriever
425 .parse_file_with_template(
426 &pending_file.relative_path,
427 &content,
428 pending_file.language,
429 )
430 .log_err()
431 {
432 log::trace!(
433 "parsed path {:?}: {} documents",
434 pending_file.relative_path,
435 documents.len()
436 );
437
438 if documents.len() == 0 {
439 db_update_tx
440 .send(DbOperation::InsertFile {
441 worktree_id: pending_file.worktree_db_id,
442 documents,
443 path: pending_file.relative_path,
444 mtime: pending_file.modified_time,
445 job_handle: pending_file.job_handle,
446 })
447 .await
448 .unwrap();
449 } else {
450 batch_files_tx
451 .try_send(EmbeddingJob::Enqueue {
452 worktree_id: pending_file.worktree_db_id,
453 path: pending_file.relative_path,
454 mtime: pending_file.modified_time,
455 job_handle: pending_file.job_handle,
456 documents,
457 })
458 .unwrap();
459 }
460 }
461 }
462
463 if parsing_files_rx.len() == 0 {
464 batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
465 }
466 }
467
468 fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
469 let (tx, rx) = oneshot::channel();
470 self.db_update_tx
471 .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
472 .unwrap();
473 async move { rx.await? }
474 }
475
476 fn get_file_mtimes(
477 &self,
478 worktree_id: i64,
479 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
480 let (tx, rx) = oneshot::channel();
481 self.db_update_tx
482 .try_send(DbOperation::FileMTimes {
483 worktree_id,
484 sender: tx,
485 })
486 .unwrap();
487 async move { rx.await? }
488 }
489
490 fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
491 let (tx, rx) = oneshot::channel();
492 self.db_update_tx
493 .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
494 .unwrap();
495 async move { rx.await? }
496 }
497
498 pub fn project_previously_indexed(
499 &mut self,
500 project: ModelHandle<Project>,
501 cx: &mut ModelContext<Self>,
502 ) -> Task<Result<bool>> {
503 let worktrees_indexed_previously = project
504 .read(cx)
505 .worktrees(cx)
506 .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
507 .collect::<Vec<_>>();
508 cx.spawn(|_, _cx| async move {
509 let worktree_indexed_previously =
510 futures::future::join_all(worktrees_indexed_previously).await;
511
512 Ok(worktree_indexed_previously
513 .iter()
514 .filter(|worktree| worktree.is_ok())
515 .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
516 })
517 }
518
519 pub fn index_project(
520 &mut self,
521 project: ModelHandle<Project>,
522 cx: &mut ModelContext<Self>,
523 ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
524 let t0 = Instant::now();
525 let worktree_scans_complete = project
526 .read(cx)
527 .worktrees(cx)
528 .map(|worktree| {
529 let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
530 async move {
531 scan_complete.await;
532 }
533 })
534 .collect::<Vec<_>>();
535 let worktree_db_ids = project
536 .read(cx)
537 .worktrees(cx)
538 .map(|worktree| {
539 self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
540 })
541 .collect::<Vec<_>>();
542
543 let language_registry = self.language_registry.clone();
544 let db_update_tx = self.db_update_tx.clone();
545 let parsing_files_tx = self.parsing_files_tx.clone();
546
547 cx.spawn(|this, mut cx| async move {
548 futures::future::join_all(worktree_scans_complete).await;
549
550 let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
551
552 let worktrees = project.read_with(&cx, |project, cx| {
553 project
554 .worktrees(cx)
555 .map(|worktree| worktree.read(cx).snapshot())
556 .collect::<Vec<_>>()
557 });
558
559 let mut worktree_file_mtimes = HashMap::new();
560 let mut db_ids_by_worktree_id = HashMap::new();
561 for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
562 let db_id = db_id?;
563 db_ids_by_worktree_id.insert(worktree.id(), db_id);
564 worktree_file_mtimes.insert(
565 worktree.id(),
566 this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
567 .await?,
568 );
569 }
570
571 let (job_count_tx, job_count_rx) = watch::channel_with(0);
572 let job_count_tx = Arc::new(Mutex::new(job_count_tx));
573 this.update(&mut cx, |this, _| {
574 this.projects.insert(
575 project.downgrade(),
576 ProjectState {
577 worktree_db_ids: db_ids_by_worktree_id
578 .iter()
579 .map(|(a, b)| (*a, *b))
580 .collect(),
581 outstanding_job_count_rx: job_count_rx.clone(),
582 _outstanding_job_count_tx: job_count_tx.clone(),
583 },
584 );
585 });
586
587 cx.background()
588 .spawn(async move {
589 let mut count = 0;
590 for worktree in worktrees.into_iter() {
591 let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
592 for file in worktree.files(false, 0) {
593 let absolute_path = worktree.absolutize(&file.path);
594
595 if let Ok(language) = language_registry
596 .language_for_file(&absolute_path, None)
597 .await
598 {
599 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
600 && &language.name().as_ref() != &"Markdown"
601 && language
602 .grammar()
603 .and_then(|grammar| grammar.embedding_config.as_ref())
604 .is_none()
605 {
606 continue;
607 }
608
609 let path_buf = file.path.to_path_buf();
610 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
611 let already_stored = stored_mtime
612 .map_or(false, |existing_mtime| existing_mtime == file.mtime);
613
614 if !already_stored {
615 count += 1;
616 *job_count_tx.lock().borrow_mut() += 1;
617 let job_handle = JobHandle {
618 tx: Arc::downgrade(&job_count_tx),
619 };
620 parsing_files_tx
621 .try_send(PendingFile {
622 worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
623 relative_path: path_buf,
624 absolute_path,
625 language,
626 job_handle,
627 modified_time: file.mtime,
628 })
629 .unwrap();
630 }
631 }
632 }
633 for file in file_mtimes.keys() {
634 db_update_tx
635 .try_send(DbOperation::Delete {
636 worktree_id: db_ids_by_worktree_id[&worktree.id()],
637 path: file.to_owned(),
638 })
639 .unwrap();
640 }
641 }
642
643 log::trace!(
644 "walking worktree took {:?} milliseconds",
645 t0.elapsed().as_millis()
646 );
647 anyhow::Ok((count, job_count_rx))
648 })
649 .await
650 })
651 }
652
653 pub fn outstanding_job_count_rx(
654 &self,
655 project: &ModelHandle<Project>,
656 ) -> Option<watch::Receiver<usize>> {
657 Some(
658 self.projects
659 .get(&project.downgrade())?
660 .outstanding_job_count_rx
661 .clone(),
662 )
663 }
664
665 pub fn search_project(
666 &mut self,
667 project: ModelHandle<Project>,
668 phrase: String,
669 limit: usize,
670 includes: Vec<PathMatcher>,
671 excludes: Vec<PathMatcher>,
672 cx: &mut ModelContext<Self>,
673 ) -> Task<Result<Vec<SearchResult>>> {
674 let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
675 state
676 } else {
677 return Task::ready(Err(anyhow!("project not added")));
678 };
679
680 let worktree_db_ids = project
681 .read(cx)
682 .worktrees(cx)
683 .filter_map(|worktree| {
684 let worktree_id = worktree.read(cx).id();
685 project_state.db_id_for_worktree_id(worktree_id)
686 })
687 .collect::<Vec<_>>();
688
689 let embedding_provider = self.embedding_provider.clone();
690 let database_url = self.database_url.clone();
691 let fs = self.fs.clone();
692 cx.spawn(|this, mut cx| async move {
693 let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
694
695 let phrase_embedding = embedding_provider
696 .embed_batch(vec![&phrase])
697 .await?
698 .into_iter()
699 .next()
700 .unwrap();
701
702 let file_ids =
703 database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?;
704
705 let batch_n = cx.background().num_cpus();
706 let ids_len = file_ids.clone().len();
707 let batch_size = if ids_len <= batch_n {
708 ids_len
709 } else {
710 ids_len / batch_n
711 };
712
713 let mut result_tasks = Vec::new();
714 for batch in file_ids.chunks(batch_size) {
715 let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
716 let limit = limit.clone();
717 let fs = fs.clone();
718 let database_url = database_url.clone();
719 let phrase_embedding = phrase_embedding.clone();
720 let task = cx.background().spawn(async move {
721 let database = VectorDatabase::new(fs, database_url).await.log_err();
722 if database.is_none() {
723 return Err(anyhow!("failed to acquire database connection"));
724 } else {
725 database
726 .unwrap()
727 .top_k_search(&phrase_embedding, limit, batch.as_slice())
728 }
729 });
730 result_tasks.push(task);
731 }
732
733 let batch_results = futures::future::join_all(result_tasks).await;
734
735 let mut results = Vec::new();
736 for batch_result in batch_results {
737 if batch_result.is_ok() {
738 for (id, similarity) in batch_result.unwrap() {
739 let ix = match results.binary_search_by(|(_, s)| {
740 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
741 }) {
742 Ok(ix) => ix,
743 Err(ix) => ix,
744 };
745 results.insert(ix, (id, similarity));
746 results.truncate(limit);
747 }
748 }
749 }
750
751 let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
752 let documents = database.get_documents_by_ids(ids.as_slice())?;
753
754 let mut tasks = Vec::new();
755 let mut ranges = Vec::new();
756 let weak_project = project.downgrade();
757 project.update(&mut cx, |project, cx| {
758 for (worktree_db_id, file_path, byte_range) in documents {
759 let project_state =
760 if let Some(state) = this.read(cx).projects.get(&weak_project) {
761 state
762 } else {
763 return Err(anyhow!("project not added"));
764 };
765 if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
766 tasks.push(project.open_buffer((worktree_id, file_path), cx));
767 ranges.push(byte_range);
768 }
769 }
770
771 Ok(())
772 })?;
773
774 let buffers = futures::future::join_all(tasks).await;
775
776 Ok(buffers
777 .into_iter()
778 .zip(ranges)
779 .filter_map(|(buffer, range)| {
780 let buffer = buffer.log_err()?;
781 let range = buffer.read_with(&cx, |buffer, _| {
782 buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
783 });
784 Some(SearchResult { buffer, range })
785 })
786 .collect::<Vec<_>>())
787 })
788 }
789}
790
791impl Entity for SemanticIndex {
792 type Event = ();
793}
794
795impl Drop for JobHandle {
796 fn drop(&mut self) {
797 if let Some(tx) = self.tx.upgrade() {
798 let mut tx = tx.lock();
799 *tx.borrow_mut() -= 1;
800 }
801 }
802}