1mod db;
2mod embedding;
3mod parsing;
4pub mod semantic_index_settings;
5
6#[cfg(test)]
7mod semantic_index_tests;
8
9use crate::semantic_index_settings::SemanticIndexSettings;
10use anyhow::{anyhow, Result};
11use db::VectorDatabase;
12use embedding::{EmbeddingProvider, OpenAIEmbeddings};
13use futures::{channel::oneshot, Future};
14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
15use language::{Anchor, Buffer, Language, LanguageRegistry};
16use parking_lot::Mutex;
17use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
18use postage::watch;
19use project::{search::PathMatcher, Fs, Project, WorktreeId};
20use smol::channel;
21use std::{
22 cmp::Ordering,
23 collections::HashMap,
24 mem,
25 ops::Range,
26 path::{Path, PathBuf},
27 sync::{Arc, Weak},
28 time::{Instant, SystemTime},
29};
30use util::{
31 channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
32 http::HttpClient,
33 paths::EMBEDDINGS_DIR,
34 ResultExt,
35};
36
37const SEMANTIC_INDEX_VERSION: usize = 6;
38const EMBEDDINGS_BATCH_SIZE: usize = 80;
39
40pub fn init(
41 fs: Arc<dyn Fs>,
42 http_client: Arc<dyn HttpClient>,
43 language_registry: Arc<LanguageRegistry>,
44 cx: &mut AppContext,
45) {
46 settings::register::<SemanticIndexSettings>(cx);
47
48 let db_file_path = EMBEDDINGS_DIR
49 .join(Path::new(RELEASE_CHANNEL_NAME.as_str()))
50 .join("embeddings_db");
51
52 if *RELEASE_CHANNEL == ReleaseChannel::Stable
53 || !settings::get::<SemanticIndexSettings>(cx).enabled
54 {
55 return;
56 }
57
58 cx.spawn(move |mut cx| async move {
59 let semantic_index = SemanticIndex::new(
60 fs,
61 db_file_path,
62 Arc::new(OpenAIEmbeddings {
63 client: http_client,
64 executor: cx.background(),
65 }),
66 language_registry,
67 cx.clone(),
68 )
69 .await?;
70
71 cx.update(|cx| {
72 cx.set_global(semantic_index.clone());
73 });
74
75 anyhow::Ok(())
76 })
77 .detach();
78}
79
80pub struct SemanticIndex {
81 fs: Arc<dyn Fs>,
82 database_url: Arc<PathBuf>,
83 embedding_provider: Arc<dyn EmbeddingProvider>,
84 language_registry: Arc<LanguageRegistry>,
85 db_update_tx: channel::Sender<DbOperation>,
86 parsing_files_tx: channel::Sender<PendingFile>,
87 _db_update_task: Task<()>,
88 _embed_batch_tasks: Vec<Task<()>>,
89 _batch_files_task: Task<()>,
90 _parsing_files_tasks: Vec<Task<()>>,
91 projects: HashMap<WeakModelHandle<Project>, ProjectState>,
92}
93
94struct ProjectState {
95 worktree_db_ids: Vec<(WorktreeId, i64)>,
96 outstanding_job_count_rx: watch::Receiver<usize>,
97 _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
98}
99
100struct JobHandle {
101 tx: Weak<Mutex<watch::Sender<usize>>>,
102}
103
104impl ProjectState {
105 fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
106 self.worktree_db_ids
107 .iter()
108 .find_map(|(worktree_id, db_id)| {
109 if *worktree_id == id {
110 Some(*db_id)
111 } else {
112 None
113 }
114 })
115 }
116
117 fn worktree_id_for_db_id(&self, id: i64) -> Option<WorktreeId> {
118 self.worktree_db_ids
119 .iter()
120 .find_map(|(worktree_id, db_id)| {
121 if *db_id == id {
122 Some(*worktree_id)
123 } else {
124 None
125 }
126 })
127 }
128}
129
130pub struct PendingFile {
131 worktree_db_id: i64,
132 relative_path: PathBuf,
133 absolute_path: PathBuf,
134 language: Arc<Language>,
135 modified_time: SystemTime,
136 job_handle: JobHandle,
137}
138
139pub struct SearchResult {
140 pub buffer: ModelHandle<Buffer>,
141 pub range: Range<Anchor>,
142}
143
144enum DbOperation {
145 InsertFile {
146 worktree_id: i64,
147 documents: Vec<Document>,
148 path: PathBuf,
149 mtime: SystemTime,
150 job_handle: JobHandle,
151 },
152 Delete {
153 worktree_id: i64,
154 path: PathBuf,
155 },
156 FindOrCreateWorktree {
157 path: PathBuf,
158 sender: oneshot::Sender<Result<i64>>,
159 },
160 FileMTimes {
161 worktree_id: i64,
162 sender: oneshot::Sender<Result<HashMap<PathBuf, SystemTime>>>,
163 },
164 WorktreePreviouslyIndexed {
165 path: Arc<Path>,
166 sender: oneshot::Sender<Result<bool>>,
167 },
168}
169
170enum EmbeddingJob {
171 Enqueue {
172 worktree_id: i64,
173 path: PathBuf,
174 mtime: SystemTime,
175 documents: Vec<Document>,
176 job_handle: JobHandle,
177 },
178 Flush,
179}
180
181impl SemanticIndex {
182 pub fn global(cx: &AppContext) -> Option<ModelHandle<SemanticIndex>> {
183 if cx.has_global::<ModelHandle<Self>>() {
184 Some(cx.global::<ModelHandle<SemanticIndex>>().clone())
185 } else {
186 None
187 }
188 }
189
190 pub fn enabled(cx: &AppContext) -> bool {
191 settings::get::<SemanticIndexSettings>(cx).enabled
192 && *RELEASE_CHANNEL != ReleaseChannel::Stable
193 }
194
195 async fn new(
196 fs: Arc<dyn Fs>,
197 database_url: PathBuf,
198 embedding_provider: Arc<dyn EmbeddingProvider>,
199 language_registry: Arc<LanguageRegistry>,
200 mut cx: AsyncAppContext,
201 ) -> Result<ModelHandle<Self>> {
202 let t0 = Instant::now();
203 let database_url = Arc::new(database_url);
204
205 let db = cx
206 .background()
207 .spawn(VectorDatabase::new(fs.clone(), database_url.clone()))
208 .await?;
209
210 log::trace!(
211 "db initialization took {:?} milliseconds",
212 t0.elapsed().as_millis()
213 );
214
215 Ok(cx.add_model(|cx| {
216 let t0 = Instant::now();
217 // Perform database operations
218 let (db_update_tx, db_update_rx) = channel::unbounded();
219 let _db_update_task = cx.background().spawn({
220 async move {
221 while let Ok(job) = db_update_rx.recv().await {
222 Self::run_db_operation(&db, job)
223 }
224 }
225 });
226
227 // Group documents into batches and send them to the embedding provider.
228 let (embed_batch_tx, embed_batch_rx) =
229 channel::unbounded::<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>();
230 let mut _embed_batch_tasks = Vec::new();
231 for _ in 0..cx.background().num_cpus() {
232 let embed_batch_rx = embed_batch_rx.clone();
233 _embed_batch_tasks.push(cx.background().spawn({
234 let db_update_tx = db_update_tx.clone();
235 let embedding_provider = embedding_provider.clone();
236 async move {
237 while let Ok(embeddings_queue) = embed_batch_rx.recv().await {
238 Self::compute_embeddings_for_batch(
239 embeddings_queue,
240 &embedding_provider,
241 &db_update_tx,
242 )
243 .await;
244 }
245 }
246 }));
247 }
248
249 // Group documents into batches and send them to the embedding provider.
250 let (batch_files_tx, batch_files_rx) = channel::unbounded::<EmbeddingJob>();
251 let _batch_files_task = cx.background().spawn(async move {
252 let mut queue_len = 0;
253 let mut embeddings_queue = vec![];
254 while let Ok(job) = batch_files_rx.recv().await {
255 Self::enqueue_documents_to_embed(
256 job,
257 &mut queue_len,
258 &mut embeddings_queue,
259 &embed_batch_tx,
260 );
261 }
262 });
263
264 // Parse files into embeddable documents.
265 let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
266 let mut _parsing_files_tasks = Vec::new();
267 for _ in 0..cx.background().num_cpus() {
268 let fs = fs.clone();
269 let parsing_files_rx = parsing_files_rx.clone();
270 let batch_files_tx = batch_files_tx.clone();
271 let db_update_tx = db_update_tx.clone();
272 _parsing_files_tasks.push(cx.background().spawn(async move {
273 let mut retriever = CodeContextRetriever::new();
274 while let Ok(pending_file) = parsing_files_rx.recv().await {
275 Self::parse_file(
276 &fs,
277 pending_file,
278 &mut retriever,
279 &batch_files_tx,
280 &parsing_files_rx,
281 &db_update_tx,
282 )
283 .await;
284 }
285 }));
286 }
287
288 log::trace!(
289 "semantic index task initialization took {:?} milliseconds",
290 t0.elapsed().as_millis()
291 );
292 Self {
293 fs,
294 database_url,
295 embedding_provider,
296 language_registry,
297 db_update_tx,
298 parsing_files_tx,
299 _db_update_task,
300 _embed_batch_tasks,
301 _batch_files_task,
302 _parsing_files_tasks,
303 projects: HashMap::new(),
304 }
305 }))
306 }
307
308 fn run_db_operation(db: &VectorDatabase, job: DbOperation) {
309 match job {
310 DbOperation::InsertFile {
311 worktree_id,
312 documents,
313 path,
314 mtime,
315 job_handle,
316 } => {
317 db.insert_file(worktree_id, path, mtime, documents)
318 .log_err();
319 drop(job_handle)
320 }
321 DbOperation::Delete { worktree_id, path } => {
322 db.delete_file(worktree_id, path).log_err();
323 }
324 DbOperation::FindOrCreateWorktree { path, sender } => {
325 let id = db.find_or_create_worktree(&path);
326 sender.send(id).ok();
327 }
328 DbOperation::FileMTimes {
329 worktree_id: worktree_db_id,
330 sender,
331 } => {
332 let file_mtimes = db.get_file_mtimes(worktree_db_id);
333 sender.send(file_mtimes).ok();
334 }
335 DbOperation::WorktreePreviouslyIndexed { path, sender } => {
336 let worktree_indexed = db.worktree_previously_indexed(path.as_ref());
337 sender.send(worktree_indexed).ok();
338 }
339 }
340 }
341
342 async fn compute_embeddings_for_batch(
343 mut embeddings_queue: Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
344 embedding_provider: &Arc<dyn EmbeddingProvider>,
345 db_update_tx: &channel::Sender<DbOperation>,
346 ) {
347 let mut batch_documents = vec![];
348 for (_, documents, _, _, _) in embeddings_queue.iter() {
349 batch_documents.extend(documents.iter().map(|document| document.content.as_str()));
350 }
351
352 if let Ok(embeddings) = embedding_provider.embed_batch(batch_documents).await {
353 log::trace!(
354 "created {} embeddings for {} files",
355 embeddings.len(),
356 embeddings_queue.len(),
357 );
358
359 let mut i = 0;
360 let mut j = 0;
361
362 for embedding in embeddings.iter() {
363 while embeddings_queue[i].1.len() == j {
364 i += 1;
365 j = 0;
366 }
367
368 embeddings_queue[i].1[j].embedding = embedding.to_owned();
369 j += 1;
370 }
371
372 for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() {
373 db_update_tx
374 .send(DbOperation::InsertFile {
375 worktree_id,
376 documents,
377 path,
378 mtime,
379 job_handle,
380 })
381 .await
382 .unwrap();
383 }
384 }
385 }
386
387 fn enqueue_documents_to_embed(
388 job: EmbeddingJob,
389 queue_len: &mut usize,
390 embeddings_queue: &mut Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>,
391 embed_batch_tx: &channel::Sender<Vec<(i64, Vec<Document>, PathBuf, SystemTime, JobHandle)>>,
392 ) {
393 let should_flush = match job {
394 EmbeddingJob::Enqueue {
395 documents,
396 worktree_id,
397 path,
398 mtime,
399 job_handle,
400 } => {
401 *queue_len += &documents.len();
402 embeddings_queue.push((worktree_id, documents, path, mtime, job_handle));
403 *queue_len >= EMBEDDINGS_BATCH_SIZE
404 }
405 EmbeddingJob::Flush => true,
406 };
407
408 if should_flush {
409 embed_batch_tx
410 .try_send(mem::take(embeddings_queue))
411 .unwrap();
412 *queue_len = 0;
413 }
414 }
415
416 async fn parse_file(
417 fs: &Arc<dyn Fs>,
418 pending_file: PendingFile,
419 retriever: &mut CodeContextRetriever,
420 batch_files_tx: &channel::Sender<EmbeddingJob>,
421 parsing_files_rx: &channel::Receiver<PendingFile>,
422 db_update_tx: &channel::Sender<DbOperation>,
423 ) {
424 if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
425 if let Some(documents) = retriever
426 .parse_file_with_template(
427 &pending_file.relative_path,
428 &content,
429 pending_file.language,
430 )
431 .log_err()
432 {
433 log::trace!(
434 "parsed path {:?}: {} documents",
435 pending_file.relative_path,
436 documents.len()
437 );
438
439 if documents.len() == 0 {
440 db_update_tx
441 .send(DbOperation::InsertFile {
442 worktree_id: pending_file.worktree_db_id,
443 documents,
444 path: pending_file.relative_path,
445 mtime: pending_file.modified_time,
446 job_handle: pending_file.job_handle,
447 })
448 .await
449 .unwrap();
450 } else {
451 batch_files_tx
452 .try_send(EmbeddingJob::Enqueue {
453 worktree_id: pending_file.worktree_db_id,
454 path: pending_file.relative_path,
455 mtime: pending_file.modified_time,
456 job_handle: pending_file.job_handle,
457 documents,
458 })
459 .unwrap();
460 }
461 }
462 }
463
464 if parsing_files_rx.len() == 0 {
465 batch_files_tx.try_send(EmbeddingJob::Flush).unwrap();
466 }
467 }
468
469 fn find_or_create_worktree(&self, path: PathBuf) -> impl Future<Output = Result<i64>> {
470 let (tx, rx) = oneshot::channel();
471 self.db_update_tx
472 .try_send(DbOperation::FindOrCreateWorktree { path, sender: tx })
473 .unwrap();
474 async move { rx.await? }
475 }
476
477 fn get_file_mtimes(
478 &self,
479 worktree_id: i64,
480 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
481 let (tx, rx) = oneshot::channel();
482 self.db_update_tx
483 .try_send(DbOperation::FileMTimes {
484 worktree_id,
485 sender: tx,
486 })
487 .unwrap();
488 async move { rx.await? }
489 }
490
491 fn worktree_previously_indexed(&self, path: Arc<Path>) -> impl Future<Output = Result<bool>> {
492 let (tx, rx) = oneshot::channel();
493 self.db_update_tx
494 .try_send(DbOperation::WorktreePreviouslyIndexed { path, sender: tx })
495 .unwrap();
496 async move { rx.await? }
497 }
498
499 pub fn project_previously_indexed(
500 &mut self,
501 project: ModelHandle<Project>,
502 cx: &mut ModelContext<Self>,
503 ) -> Task<Result<bool>> {
504 let worktree_scans_complete = project
505 .read(cx)
506 .worktrees(cx)
507 .map(|worktree| {
508 let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
509 async move {
510 scan_complete.await;
511 }
512 })
513 .collect::<Vec<_>>();
514
515 let worktrees_indexed_previously = project
516 .read(cx)
517 .worktrees(cx)
518 .map(|worktree| self.worktree_previously_indexed(worktree.read(cx).abs_path()))
519 .collect::<Vec<_>>();
520
521 cx.spawn(|_, _cx| async move {
522 futures::future::join_all(worktree_scans_complete).await;
523
524 let worktree_indexed_previously =
525 futures::future::join_all(worktrees_indexed_previously).await;
526
527 Ok(worktree_indexed_previously
528 .iter()
529 .filter(|worktree| worktree.is_ok())
530 .all(|v| v.as_ref().log_err().is_some_and(|v| v.to_owned())))
531 })
532 }
533
534 pub fn index_project(
535 &mut self,
536 project: ModelHandle<Project>,
537 cx: &mut ModelContext<Self>,
538 ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
539 let t0 = Instant::now();
540 let worktree_scans_complete = project
541 .read(cx)
542 .worktrees(cx)
543 .map(|worktree| {
544 let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
545 async move {
546 scan_complete.await;
547 }
548 })
549 .collect::<Vec<_>>();
550 let worktree_db_ids = project
551 .read(cx)
552 .worktrees(cx)
553 .map(|worktree| {
554 self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
555 })
556 .collect::<Vec<_>>();
557
558 let language_registry = self.language_registry.clone();
559 let db_update_tx = self.db_update_tx.clone();
560 let parsing_files_tx = self.parsing_files_tx.clone();
561
562 cx.spawn(|this, mut cx| async move {
563 futures::future::join_all(worktree_scans_complete).await;
564
565 let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
566
567 let worktrees = project.read_with(&cx, |project, cx| {
568 project
569 .worktrees(cx)
570 .map(|worktree| worktree.read(cx).snapshot())
571 .collect::<Vec<_>>()
572 });
573
574 let mut worktree_file_mtimes = HashMap::new();
575 let mut db_ids_by_worktree_id = HashMap::new();
576 for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
577 let db_id = db_id?;
578 db_ids_by_worktree_id.insert(worktree.id(), db_id);
579 worktree_file_mtimes.insert(
580 worktree.id(),
581 this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
582 .await?,
583 );
584 }
585
586 let (job_count_tx, job_count_rx) = watch::channel_with(0);
587 let job_count_tx = Arc::new(Mutex::new(job_count_tx));
588 this.update(&mut cx, |this, _| {
589 this.projects.insert(
590 project.downgrade(),
591 ProjectState {
592 worktree_db_ids: db_ids_by_worktree_id
593 .iter()
594 .map(|(a, b)| (*a, *b))
595 .collect(),
596 outstanding_job_count_rx: job_count_rx.clone(),
597 _outstanding_job_count_tx: job_count_tx.clone(),
598 },
599 );
600 });
601
602 cx.background()
603 .spawn(async move {
604 let mut count = 0;
605 for worktree in worktrees.into_iter() {
606 let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
607 for file in worktree.files(false, 0) {
608 let absolute_path = worktree.absolutize(&file.path);
609
610 if let Ok(language) = language_registry
611 .language_for_file(&absolute_path, None)
612 .await
613 {
614 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
615 && &language.name().as_ref() != &"Markdown"
616 && language
617 .grammar()
618 .and_then(|grammar| grammar.embedding_config.as_ref())
619 .is_none()
620 {
621 continue;
622 }
623
624 let path_buf = file.path.to_path_buf();
625 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
626 let already_stored = stored_mtime
627 .map_or(false, |existing_mtime| existing_mtime == file.mtime);
628
629 if !already_stored {
630 count += 1;
631 *job_count_tx.lock().borrow_mut() += 1;
632 let job_handle = JobHandle {
633 tx: Arc::downgrade(&job_count_tx),
634 };
635 parsing_files_tx
636 .try_send(PendingFile {
637 worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
638 relative_path: path_buf,
639 absolute_path,
640 language,
641 job_handle,
642 modified_time: file.mtime,
643 })
644 .unwrap();
645 }
646 }
647 }
648 for file in file_mtimes.keys() {
649 db_update_tx
650 .try_send(DbOperation::Delete {
651 worktree_id: db_ids_by_worktree_id[&worktree.id()],
652 path: file.to_owned(),
653 })
654 .unwrap();
655 }
656 }
657
658 log::trace!(
659 "walking worktree took {:?} milliseconds",
660 t0.elapsed().as_millis()
661 );
662 anyhow::Ok((count, job_count_rx))
663 })
664 .await
665 })
666 }
667
668 pub fn outstanding_job_count_rx(
669 &self,
670 project: &ModelHandle<Project>,
671 ) -> Option<watch::Receiver<usize>> {
672 Some(
673 self.projects
674 .get(&project.downgrade())?
675 .outstanding_job_count_rx
676 .clone(),
677 )
678 }
679
680 pub fn search_project(
681 &mut self,
682 project: ModelHandle<Project>,
683 phrase: String,
684 limit: usize,
685 includes: Vec<PathMatcher>,
686 excludes: Vec<PathMatcher>,
687 cx: &mut ModelContext<Self>,
688 ) -> Task<Result<Vec<SearchResult>>> {
689 let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
690 state
691 } else {
692 return Task::ready(Err(anyhow!("project not added")));
693 };
694
695 let worktree_db_ids = project
696 .read(cx)
697 .worktrees(cx)
698 .filter_map(|worktree| {
699 let worktree_id = worktree.read(cx).id();
700 project_state.db_id_for_worktree_id(worktree_id)
701 })
702 .collect::<Vec<_>>();
703
704 let embedding_provider = self.embedding_provider.clone();
705 let database_url = self.database_url.clone();
706 let fs = self.fs.clone();
707 cx.spawn(|this, mut cx| async move {
708 let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?;
709
710 let phrase_embedding = embedding_provider
711 .embed_batch(vec![&phrase])
712 .await?
713 .into_iter()
714 .next()
715 .unwrap();
716
717 let file_ids =
718 database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?;
719
720 let batch_n = cx.background().num_cpus();
721 let ids_len = file_ids.clone().len();
722 let batch_size = if ids_len <= batch_n {
723 ids_len
724 } else {
725 ids_len / batch_n
726 };
727
728 let mut result_tasks = Vec::new();
729 for batch in file_ids.chunks(batch_size) {
730 let batch = batch.into_iter().map(|v| *v).collect::<Vec<i64>>();
731 let limit = limit.clone();
732 let fs = fs.clone();
733 let database_url = database_url.clone();
734 let phrase_embedding = phrase_embedding.clone();
735 let task = cx.background().spawn(async move {
736 let database = VectorDatabase::new(fs, database_url).await.log_err();
737 if database.is_none() {
738 return Err(anyhow!("failed to acquire database connection"));
739 } else {
740 database
741 .unwrap()
742 .top_k_search(&phrase_embedding, limit, batch.as_slice())
743 }
744 });
745 result_tasks.push(task);
746 }
747
748 let batch_results = futures::future::join_all(result_tasks).await;
749
750 let mut results = Vec::new();
751 for batch_result in batch_results {
752 if batch_result.is_ok() {
753 for (id, similarity) in batch_result.unwrap() {
754 let ix = match results.binary_search_by(|(_, s)| {
755 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
756 }) {
757 Ok(ix) => ix,
758 Err(ix) => ix,
759 };
760 results.insert(ix, (id, similarity));
761 results.truncate(limit);
762 }
763 }
764 }
765
766 let ids = results.into_iter().map(|(id, _)| id).collect::<Vec<i64>>();
767 let documents = database.get_documents_by_ids(ids.as_slice())?;
768
769 let mut tasks = Vec::new();
770 let mut ranges = Vec::new();
771 let weak_project = project.downgrade();
772 project.update(&mut cx, |project, cx| {
773 for (worktree_db_id, file_path, byte_range) in documents {
774 let project_state =
775 if let Some(state) = this.read(cx).projects.get(&weak_project) {
776 state
777 } else {
778 return Err(anyhow!("project not added"));
779 };
780 if let Some(worktree_id) = project_state.worktree_id_for_db_id(worktree_db_id) {
781 tasks.push(project.open_buffer((worktree_id, file_path), cx));
782 ranges.push(byte_range);
783 }
784 }
785
786 Ok(())
787 })?;
788
789 let buffers = futures::future::join_all(tasks).await;
790
791 Ok(buffers
792 .into_iter()
793 .zip(ranges)
794 .filter_map(|(buffer, range)| {
795 let buffer = buffer.log_err()?;
796 let range = buffer.read_with(&cx, |buffer, _| {
797 buffer.anchor_before(range.start)..buffer.anchor_after(range.end)
798 });
799 Some(SearchResult { buffer, range })
800 })
801 .collect::<Vec<_>>())
802 })
803 }
804}
805
806impl Entity for SemanticIndex {
807 type Event = ();
808}
809
810impl Drop for JobHandle {
811 fn drop(&mut self) {
812 if let Some(tx) = self.tx.upgrade() {
813 let mut tx = tx.lock();
814 *tx.borrow_mut() -= 1;
815 }
816 }
817}