1use anyhow::{anyhow, Context as _, Result};
2use arrayvec::ArrayString;
3use fs::{Fs, MTime};
4use futures::{stream::StreamExt, TryFutureExt};
5use futures_batch::ChunksTimeoutStreamExt;
6use gpui::{AppContext, Model, Task};
7use heed::{
8 types::{SerdeBincode, Str},
9 RoTxn,
10};
11use language_model::{
12 LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
13 LanguageModelRequestMessage, Role,
14};
15use log;
16use parking_lot::Mutex;
17use project::{Entry, UpdatedEntriesSet, Worktree};
18use serde::{Deserialize, Serialize};
19use smol::channel;
20use std::{
21 future::Future,
22 path::Path,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use util::ResultExt;
27use worktree::Snapshot;
28
29use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
30
31#[derive(Serialize, Deserialize, Debug)]
32pub struct FileSummary {
33 pub filename: String,
34 pub summary: String,
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38struct UnsummarizedFile {
39 // Path to the file on disk
40 path: Arc<Path>,
41 // The mtime of the file on disk
42 mtime: Option<MTime>,
43 // BLAKE3 hash of the source file's contents
44 digest: Blake3Digest,
45 // The source file's contents
46 contents: String,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50struct SummarizedFile {
51 // Path to the file on disk
52 path: String,
53 // The mtime of the file on disk
54 mtime: Option<MTime>,
55 // BLAKE3 hash of the source file's contents
56 digest: Blake3Digest,
57 // The LLM's summary of the file's contents
58 summary: String,
59}
60
61/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
62pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
63
64#[derive(Debug, Serialize, Deserialize)]
65pub struct FileDigest {
66 pub mtime: Option<MTime>,
67 pub digest: Blake3Digest,
68}
69
70struct NeedsSummary {
71 files: channel::Receiver<UnsummarizedFile>,
72 task: Task<Result<()>>,
73}
74
75struct SummarizeFiles {
76 files: channel::Receiver<SummarizedFile>,
77 task: Task<Result<()>>,
78}
79
80pub struct SummaryIndex {
81 worktree: Model<Worktree>,
82 fs: Arc<dyn Fs>,
83 db_connection: heed::Env,
84 file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
85 summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
86 backlog: Arc<Mutex<SummaryBacklog>>,
87 _entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
88}
89
90struct Backlogged {
91 paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
92 task: Task<Result<()>>,
93}
94
95struct MightNeedSummaryFiles {
96 files: channel::Receiver<UnsummarizedFile>,
97 task: Task<Result<()>>,
98}
99
100impl SummaryIndex {
101 pub fn new(
102 worktree: Model<Worktree>,
103 fs: Arc<dyn Fs>,
104 db_connection: heed::Env,
105 file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
106 summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
107 _entry_ids_being_indexed: Arc<IndexingEntrySet>,
108 ) -> Self {
109 Self {
110 worktree,
111 fs,
112 db_connection,
113 file_digest_db,
114 summary_db,
115 _entry_ids_being_indexed,
116 backlog: Default::default(),
117 }
118 }
119
120 pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
121 self.file_digest_db
122 }
123
124 pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
125 self.summary_db
126 }
127
128 pub fn index_entries_changed_on_disk(
129 &self,
130 is_auto_available: bool,
131 cx: &AppContext,
132 ) -> impl Future<Output = Result<()>> {
133 let start = Instant::now();
134 let backlogged;
135 let digest;
136 let needs_summary;
137 let summaries;
138 let persist;
139
140 if is_auto_available {
141 let worktree = self.worktree.read(cx).snapshot();
142 let worktree_abs_path = worktree.abs_path().clone();
143
144 backlogged = self.scan_entries(worktree, cx);
145 digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
146 needs_summary = self.check_summary_cache(digest.files, cx);
147 summaries = self.summarize_files(needs_summary.files, cx);
148 persist = self.persist_summaries(summaries.files, cx);
149 } else {
150 // This feature is only staff-shipped, so make the rest of these no-ops.
151 backlogged = Backlogged {
152 paths_to_digest: channel::unbounded().1,
153 task: Task::ready(Ok(())),
154 };
155 digest = MightNeedSummaryFiles {
156 files: channel::unbounded().1,
157 task: Task::ready(Ok(())),
158 };
159 needs_summary = NeedsSummary {
160 files: channel::unbounded().1,
161 task: Task::ready(Ok(())),
162 };
163 summaries = SummarizeFiles {
164 files: channel::unbounded().1,
165 task: Task::ready(Ok(())),
166 };
167 persist = Task::ready(Ok(()));
168 }
169
170 async move {
171 futures::try_join!(
172 backlogged.task,
173 digest.task,
174 needs_summary.task,
175 summaries.task,
176 persist
177 )?;
178
179 if is_auto_available {
180 log::info!(
181 "Summarizing everything that changed on disk took {:?}",
182 start.elapsed()
183 );
184 }
185
186 Ok(())
187 }
188 }
189
190 pub fn index_updated_entries(
191 &mut self,
192 updated_entries: UpdatedEntriesSet,
193 is_auto_available: bool,
194 cx: &AppContext,
195 ) -> impl Future<Output = Result<()>> {
196 let start = Instant::now();
197 let backlogged;
198 let digest;
199 let needs_summary;
200 let summaries;
201 let persist;
202
203 if is_auto_available {
204 let worktree = self.worktree.read(cx).snapshot();
205 let worktree_abs_path = worktree.abs_path().clone();
206
207 backlogged = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
208 digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
209 needs_summary = self.check_summary_cache(digest.files, cx);
210 summaries = self.summarize_files(needs_summary.files, cx);
211 persist = self.persist_summaries(summaries.files, cx);
212 } else {
213 // This feature is only staff-shipped, so make the rest of these no-ops.
214 backlogged = Backlogged {
215 paths_to_digest: channel::unbounded().1,
216 task: Task::ready(Ok(())),
217 };
218 digest = MightNeedSummaryFiles {
219 files: channel::unbounded().1,
220 task: Task::ready(Ok(())),
221 };
222 needs_summary = NeedsSummary {
223 files: channel::unbounded().1,
224 task: Task::ready(Ok(())),
225 };
226 summaries = SummarizeFiles {
227 files: channel::unbounded().1,
228 task: Task::ready(Ok(())),
229 };
230 persist = Task::ready(Ok(()));
231 }
232
233 async move {
234 futures::try_join!(
235 backlogged.task,
236 digest.task,
237 needs_summary.task,
238 summaries.task,
239 persist
240 )?;
241
242 log::debug!("Summarizing updated entries took {:?}", start.elapsed());
243
244 Ok(())
245 }
246 }
247
248 fn check_summary_cache(
249 &self,
250 mut might_need_summary: channel::Receiver<UnsummarizedFile>,
251 cx: &AppContext,
252 ) -> NeedsSummary {
253 let db_connection = self.db_connection.clone();
254 let db = self.summary_db;
255 let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
256 let task = cx.background_executor().spawn(async move {
257 while let Some(file) = might_need_summary.next().await {
258 let tx = db_connection
259 .read_txn()
260 .context("Failed to create read transaction for checking which hashes are in summary cache")?;
261
262 match db.get(&tx, &file.digest) {
263 Ok(opt_answer) => {
264 if opt_answer.is_none() {
265 // It's not in the summary cache db, so we need to summarize it.
266 log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
267 needs_summary_tx.send(file).await?;
268 } else {
269 log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
270 }
271 }
272 Err(err) => {
273 log::error!("Reading from the summaries database failed: {:?}", err);
274 }
275 }
276 }
277
278 Ok(())
279 });
280
281 NeedsSummary {
282 files: needs_summary_rx,
283 task,
284 }
285 }
286
287 fn scan_entries(&self, worktree: Snapshot, cx: &AppContext) -> Backlogged {
288 let (tx, rx) = channel::bounded(512);
289 let db_connection = self.db_connection.clone();
290 let digest_db = self.file_digest_db;
291 let backlog = Arc::clone(&self.backlog);
292 let task = cx.background_executor().spawn(async move {
293 let txn = db_connection
294 .read_txn()
295 .context("failed to create read transaction")?;
296
297 for entry in worktree.files(false, 0) {
298 let needs_summary =
299 Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
300
301 if !needs_summary.is_empty() {
302 tx.send(needs_summary).await?;
303 }
304 }
305
306 // TODO delete db entries for deleted files
307
308 Ok(())
309 });
310
311 Backlogged {
312 paths_to_digest: rx,
313 task,
314 }
315 }
316
317 fn add_to_backlog(
318 backlog: Arc<Mutex<SummaryBacklog>>,
319 digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
320 txn: &RoTxn<'_>,
321 entry: &Entry,
322 ) -> Vec<(Arc<Path>, Option<MTime>)> {
323 let entry_db_key = db_key_for_path(&entry.path);
324
325 match digest_db.get(&txn, &entry_db_key) {
326 Ok(opt_saved_digest) => {
327 // The file path is the same, but the mtime is different. (Or there was no mtime.)
328 // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
329 if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
330 let mut backlog = backlog.lock();
331
332 log::info!(
333 "Inserting {:?} ({:?} bytes) into backlog",
334 &entry.path,
335 entry.size,
336 );
337 backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
338
339 if backlog.needs_drain() {
340 log::info!("Draining summary backlog...");
341 return backlog.drain().collect();
342 }
343 }
344 }
345 Err(err) => {
346 log::error!(
347 "Error trying to get file digest db entry {:?}: {:?}",
348 &entry_db_key,
349 err
350 );
351 }
352 }
353
354 Vec::new()
355 }
356
357 fn scan_updated_entries(
358 &self,
359 worktree: Snapshot,
360 updated_entries: UpdatedEntriesSet,
361 cx: &AppContext,
362 ) -> Backlogged {
363 log::info!("Scanning for updated entries that might need summarization...");
364 let (tx, rx) = channel::bounded(512);
365 // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
366 let db_connection = self.db_connection.clone();
367 let digest_db = self.file_digest_db;
368 let backlog = Arc::clone(&self.backlog);
369 let task = cx.background_executor().spawn(async move {
370 let txn = db_connection
371 .read_txn()
372 .context("failed to create read transaction")?;
373
374 for (path, entry_id, status) in updated_entries.iter() {
375 match status {
376 project::PathChange::Loaded
377 | project::PathChange::Added
378 | project::PathChange::Updated
379 | project::PathChange::AddedOrUpdated => {
380 if let Some(entry) = worktree.entry_for_id(*entry_id) {
381 if entry.is_file() {
382 let needs_summary = Self::add_to_backlog(
383 Arc::clone(&backlog),
384 digest_db,
385 &txn,
386 entry,
387 );
388
389 if !needs_summary.is_empty() {
390 tx.send(needs_summary).await?;
391 }
392 }
393 }
394 }
395 project::PathChange::Removed => {
396 let _db_path = db_key_for_path(path);
397 // TODO delete db entries for deleted files
398 // deleted_entry_ranges_tx
399 // .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
400 // .await?;
401 }
402 }
403 }
404
405 Ok(())
406 });
407
408 Backlogged {
409 paths_to_digest: rx,
410 // deleted_entry_ranges: deleted_entry_ranges_rx,
411 task,
412 }
413 }
414
415 fn digest_files(
416 &self,
417 paths: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
418 worktree_abs_path: Arc<Path>,
419 cx: &AppContext,
420 ) -> MightNeedSummaryFiles {
421 let fs = self.fs.clone();
422 let (rx, tx) = channel::bounded(2048);
423 let task = cx.spawn(|cx| async move {
424 cx.background_executor()
425 .scoped(|cx| {
426 for _ in 0..cx.num_cpus() {
427 cx.spawn(async {
428 while let Ok(pairs) = paths.recv().await {
429 // Note: we could process all these files concurrently if desired. Might or might not speed things up.
430 for (path, mtime) in pairs {
431 let entry_abs_path = worktree_abs_path.join(&path);
432
433 // Load the file's contents and compute its hash digest.
434 let unsummarized_file = {
435 let Some(contents) = fs
436 .load(&entry_abs_path)
437 .await
438 .with_context(|| {
439 format!("failed to read path {entry_abs_path:?}")
440 })
441 .log_err()
442 else {
443 continue;
444 };
445
446 let digest = {
447 let mut hasher = blake3::Hasher::new();
448 // Incorporate both the (relative) file path as well as the contents of the file into the hash.
449 // This is because in some languages and frameworks, identical files can do different things
450 // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
451 hasher.update(path.display().to_string().as_bytes());
452 hasher.update(contents.as_bytes());
453 hasher.finalize().to_hex()
454 };
455
456 UnsummarizedFile {
457 digest,
458 contents,
459 path,
460 mtime,
461 }
462 };
463
464 if let Err(err) = rx
465 .send(unsummarized_file)
466 .map_err(|error| anyhow!(error))
467 .await
468 {
469 log::error!("Error: {:?}", err);
470
471 return;
472 }
473 }
474 }
475 });
476 }
477 })
478 .await;
479 Ok(())
480 });
481
482 MightNeedSummaryFiles { files: tx, task }
483 }
484
485 fn summarize_files(
486 &self,
487 mut unsummarized_files: channel::Receiver<UnsummarizedFile>,
488 cx: &AppContext,
489 ) -> SummarizeFiles {
490 let (summarized_tx, summarized_rx) = channel::bounded(512);
491 let task = cx.spawn(|cx| async move {
492 while let Some(file) = unsummarized_files.next().await {
493 log::debug!("Summarizing {:?}", file);
494 let summary = cx
495 .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
496 .await
497 .unwrap_or_else(|err| {
498 // Log a warning because we'll continue anyway.
499 // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
500 // but this might give bad summaries due to cutting off source code files in the middle.
501 log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
502
503 String::new()
504 });
505
506 // Note that the summary could be empty because of an error talking to a cloud provider,
507 // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
508 if !summary.is_empty() {
509 summarized_tx
510 .send(SummarizedFile {
511 path: file.path.display().to_string(),
512 digest: file.digest,
513 summary,
514 mtime: file.mtime,
515 })
516 .await?
517 }
518 }
519
520 Ok(())
521 });
522
523 SummarizeFiles {
524 files: summarized_rx,
525 task,
526 }
527 }
528
529 fn summarize_code(
530 code: &str,
531 path: &Path,
532 cx: &AppContext,
533 ) -> impl Future<Output = Result<String>> {
534 let start = Instant::now();
535 let (summary_model_id, use_cache): (LanguageModelId, bool) = (
536 "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
537 false, // qwen2 doesn't have a cache, but we should probably infer this from the model
538 );
539 let Some(model) = LanguageModelRegistry::read_global(cx)
540 .available_models(cx)
541 .find(|model| &model.id() == &summary_model_id)
542 else {
543 return cx.background_executor().spawn(async move {
544 Err(anyhow!("Couldn't find the preferred summarization model ({:?}) in the language registry's available models", summary_model_id))
545 });
546 };
547 let utf8_path = path.to_string_lossy();
548 const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
549 let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
550
551 log::debug!(
552 "Summarizing code by sending this prompt to {:?}: {:?}",
553 model.name(),
554 &prompt
555 );
556
557 let request = LanguageModelRequest {
558 messages: vec![LanguageModelRequestMessage {
559 role: Role::User,
560 content: vec![prompt.into()],
561 cache: use_cache,
562 }],
563 tools: Vec::new(),
564 stop: Vec::new(),
565 temperature: None,
566 };
567
568 let code_len = code.len();
569 cx.spawn(|cx| async move {
570 let stream = model.stream_completion(request, &cx);
571 cx.background_executor()
572 .spawn(async move {
573 let answer: String = stream
574 .await?
575 .filter_map(|event| async {
576 if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
577 Some(text)
578 } else {
579 None
580 }
581 })
582 .collect()
583 .await;
584
585 log::info!(
586 "It took {:?} to summarize {:?} bytes of code.",
587 start.elapsed(),
588 code_len
589 );
590
591 log::debug!("Summary was: {:?}", &answer);
592
593 Ok(answer)
594 })
595 .await
596
597 // TODO if summarization failed, put it back in the backlog!
598 })
599 }
600
601 fn persist_summaries(
602 &self,
603 summaries: channel::Receiver<SummarizedFile>,
604 cx: &AppContext,
605 ) -> Task<Result<()>> {
606 let db_connection = self.db_connection.clone();
607 let digest_db = self.file_digest_db;
608 let summary_db = self.summary_db;
609 cx.background_executor().spawn(async move {
610 let mut summaries = summaries.chunks_timeout(4096, Duration::from_secs(2));
611 while let Some(summaries) = summaries.next().await {
612 let mut txn = db_connection.write_txn()?;
613 for file in &summaries {
614 log::debug!(
615 "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
616 &file.path,
617 file.summary.len(),
618 file.digest
619 );
620 digest_db.put(
621 &mut txn,
622 &file.path,
623 &FileDigest {
624 mtime: file.mtime,
625 digest: file.digest,
626 },
627 )?;
628 summary_db.put(&mut txn, &file.digest, &file.summary)?;
629 }
630 txn.commit()?;
631
632 drop(summaries);
633 log::debug!("committed summaries");
634 }
635
636 Ok(())
637 })
638 }
639
640 /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
641 pub(crate) fn flush_backlog(
642 &self,
643 worktree_abs_path: Arc<Path>,
644 cx: &AppContext,
645 ) -> impl Future<Output = Result<()>> {
646 let start = Instant::now();
647 let backlogged = {
648 let (tx, rx) = channel::bounded(512);
649 let needs_summary: Vec<(Arc<Path>, Option<MTime>)> = {
650 let mut backlog = self.backlog.lock();
651
652 backlog.drain().collect()
653 };
654
655 let task = cx.background_executor().spawn(async move {
656 tx.send(needs_summary).await?;
657 Ok(())
658 });
659
660 Backlogged {
661 paths_to_digest: rx,
662 task,
663 }
664 };
665
666 let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
667 let needs_summary = self.check_summary_cache(digest.files, cx);
668 let summaries = self.summarize_files(needs_summary.files, cx);
669 let persist = self.persist_summaries(summaries.files, cx);
670
671 async move {
672 futures::try_join!(
673 backlogged.task,
674 digest.task,
675 needs_summary.task,
676 summaries.task,
677 persist
678 )?;
679
680 log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
681
682 Ok(())
683 }
684 }
685
686 pub(crate) fn backlog_len(&self) -> usize {
687 self.backlog.lock().len()
688 }
689}
690
691fn db_key_for_path(path: &Arc<Path>) -> String {
692 path.to_string_lossy().replace('/', "\0")
693}