1use anyhow::{Context as _, Result, anyhow};
2use arrayvec::ArrayString;
3use fs::{Fs, MTime};
4use futures::{TryFutureExt, stream::StreamExt};
5use futures_batch::ChunksTimeoutStreamExt;
6use gpui::{App, AppContext as _, Entity, Task};
7use heed::{
8 RoTxn,
9 types::{SerdeBincode, Str},
10};
11use language_model::{
12 LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
13 LanguageModelRequestMessage, Role,
14};
15use log;
16use parking_lot::Mutex;
17use project::{Entry, UpdatedEntriesSet, Worktree};
18use serde::{Deserialize, Serialize};
19use smol::channel;
20use std::{
21 future::Future,
22 path::Path,
23 pin::pin,
24 sync::Arc,
25 time::{Duration, Instant},
26};
27use util::ResultExt;
28use worktree::Snapshot;
29
30use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
31
32#[derive(Serialize, Deserialize, Debug)]
33pub struct FileSummary {
34 pub filename: String,
35 pub summary: String,
36}
37
38#[derive(Debug, Serialize, Deserialize)]
39struct UnsummarizedFile {
40 // Path to the file on disk
41 path: Arc<Path>,
42 // The mtime of the file on disk
43 mtime: Option<MTime>,
44 // BLAKE3 hash of the source file's contents
45 digest: Blake3Digest,
46 // The source file's contents
47 contents: String,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
51struct SummarizedFile {
52 // Path to the file on disk
53 path: String,
54 // The mtime of the file on disk
55 mtime: Option<MTime>,
56 // BLAKE3 hash of the source file's contents
57 digest: Blake3Digest,
58 // The LLM's summary of the file's contents
59 summary: String,
60}
61
62/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
63pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
64
65#[derive(Debug, Serialize, Deserialize)]
66pub struct FileDigest {
67 pub mtime: Option<MTime>,
68 pub digest: Blake3Digest,
69}
70
71struct NeedsSummary {
72 files: channel::Receiver<UnsummarizedFile>,
73 task: Task<Result<()>>,
74}
75
76struct SummarizeFiles {
77 files: channel::Receiver<SummarizedFile>,
78 task: Task<Result<()>>,
79}
80
81pub struct SummaryIndex {
82 worktree: Entity<Worktree>,
83 fs: Arc<dyn Fs>,
84 db_connection: heed::Env,
85 file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
86 summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
87 backlog: Arc<Mutex<SummaryBacklog>>,
88 _entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
89}
90
91struct Backlogged {
92 paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
93 task: Task<Result<()>>,
94}
95
96struct MightNeedSummaryFiles {
97 files: channel::Receiver<UnsummarizedFile>,
98 task: Task<Result<()>>,
99}
100
101impl SummaryIndex {
102 pub fn new(
103 worktree: Entity<Worktree>,
104 fs: Arc<dyn Fs>,
105 db_connection: heed::Env,
106 file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
107 summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
108 _entry_ids_being_indexed: Arc<IndexingEntrySet>,
109 ) -> Self {
110 Self {
111 worktree,
112 fs,
113 db_connection,
114 file_digest_db,
115 summary_db,
116 _entry_ids_being_indexed,
117 backlog: Default::default(),
118 }
119 }
120
121 pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
122 self.file_digest_db
123 }
124
125 pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
126 self.summary_db
127 }
128
129 pub fn index_entries_changed_on_disk(
130 &self,
131 is_auto_available: bool,
132 cx: &App,
133 ) -> impl Future<Output = Result<()>> + use<> {
134 let start = Instant::now();
135 let backlogged;
136 let digest;
137 let needs_summary;
138 let summaries;
139 let persist;
140
141 if is_auto_available {
142 let worktree = self.worktree.read(cx).snapshot();
143 let worktree_abs_path = worktree.abs_path().clone();
144
145 backlogged = self.scan_entries(worktree, cx);
146 digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
147 needs_summary = self.check_summary_cache(digest.files, cx);
148 summaries = self.summarize_files(needs_summary.files, cx);
149 persist = self.persist_summaries(summaries.files, cx);
150 } else {
151 // This feature is only staff-shipped, so make the rest of these no-ops.
152 backlogged = Backlogged {
153 paths_to_digest: channel::unbounded().1,
154 task: Task::ready(Ok(())),
155 };
156 digest = MightNeedSummaryFiles {
157 files: channel::unbounded().1,
158 task: Task::ready(Ok(())),
159 };
160 needs_summary = NeedsSummary {
161 files: channel::unbounded().1,
162 task: Task::ready(Ok(())),
163 };
164 summaries = SummarizeFiles {
165 files: channel::unbounded().1,
166 task: Task::ready(Ok(())),
167 };
168 persist = Task::ready(Ok(()));
169 }
170
171 async move {
172 futures::try_join!(
173 backlogged.task,
174 digest.task,
175 needs_summary.task,
176 summaries.task,
177 persist
178 )?;
179
180 if is_auto_available {
181 log::info!(
182 "Summarizing everything that changed on disk took {:?}",
183 start.elapsed()
184 );
185 }
186
187 Ok(())
188 }
189 }
190
191 pub fn index_updated_entries(
192 &mut self,
193 updated_entries: UpdatedEntriesSet,
194 is_auto_available: bool,
195 cx: &App,
196 ) -> impl Future<Output = Result<()>> + use<> {
197 let start = Instant::now();
198 let backlogged;
199 let digest;
200 let needs_summary;
201 let summaries;
202 let persist;
203
204 if is_auto_available {
205 let worktree = self.worktree.read(cx).snapshot();
206 let worktree_abs_path = worktree.abs_path().clone();
207
208 backlogged = self.scan_updated_entries(worktree, updated_entries, cx);
209 digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
210 needs_summary = self.check_summary_cache(digest.files, cx);
211 summaries = self.summarize_files(needs_summary.files, cx);
212 persist = self.persist_summaries(summaries.files, cx);
213 } else {
214 // This feature is only staff-shipped, so make the rest of these no-ops.
215 backlogged = Backlogged {
216 paths_to_digest: channel::unbounded().1,
217 task: Task::ready(Ok(())),
218 };
219 digest = MightNeedSummaryFiles {
220 files: channel::unbounded().1,
221 task: Task::ready(Ok(())),
222 };
223 needs_summary = NeedsSummary {
224 files: channel::unbounded().1,
225 task: Task::ready(Ok(())),
226 };
227 summaries = SummarizeFiles {
228 files: channel::unbounded().1,
229 task: Task::ready(Ok(())),
230 };
231 persist = Task::ready(Ok(()));
232 }
233
234 async move {
235 futures::try_join!(
236 backlogged.task,
237 digest.task,
238 needs_summary.task,
239 summaries.task,
240 persist
241 )?;
242
243 log::debug!("Summarizing updated entries took {:?}", start.elapsed());
244
245 Ok(())
246 }
247 }
248
249 fn check_summary_cache(
250 &self,
251 might_need_summary: channel::Receiver<UnsummarizedFile>,
252 cx: &App,
253 ) -> NeedsSummary {
254 let db_connection = self.db_connection.clone();
255 let db = self.summary_db;
256 let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
257 let task = cx.background_spawn(async move {
258 let mut might_need_summary = pin!(might_need_summary);
259 while let Some(file) = might_need_summary.next().await {
260 let tx = db_connection
261 .read_txn()
262 .context("Failed to create read transaction for checking which hashes are in summary cache")?;
263
264 match db.get(&tx, &file.digest) {
265 Ok(opt_answer) => {
266 if opt_answer.is_none() {
267 // It's not in the summary cache db, so we need to summarize it.
268 log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
269 needs_summary_tx.send(file).await?;
270 } else {
271 log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
272 }
273 }
274 Err(err) => {
275 log::error!("Reading from the summaries database failed: {:?}", err);
276 }
277 }
278 }
279
280 Ok(())
281 });
282
283 NeedsSummary {
284 files: needs_summary_rx,
285 task,
286 }
287 }
288
289 fn scan_entries(&self, worktree: Snapshot, cx: &App) -> Backlogged {
290 let (tx, rx) = channel::bounded(512);
291 let db_connection = self.db_connection.clone();
292 let digest_db = self.file_digest_db;
293 let backlog = Arc::clone(&self.backlog);
294 let task = cx.background_spawn(async move {
295 let txn = db_connection
296 .read_txn()
297 .context("failed to create read transaction")?;
298
299 for entry in worktree.files(false, 0) {
300 let needs_summary =
301 Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
302
303 if !needs_summary.is_empty() {
304 tx.send(needs_summary).await?;
305 }
306 }
307
308 // TODO delete db entries for deleted files
309
310 Ok(())
311 });
312
313 Backlogged {
314 paths_to_digest: rx,
315 task,
316 }
317 }
318
319 fn add_to_backlog(
320 backlog: Arc<Mutex<SummaryBacklog>>,
321 digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
322 txn: &RoTxn<'_>,
323 entry: &Entry,
324 ) -> Vec<(Arc<Path>, Option<MTime>)> {
325 let entry_db_key = db_key_for_path(&entry.path);
326
327 match digest_db.get(txn, &entry_db_key) {
328 Ok(opt_saved_digest) => {
329 // The file path is the same, but the mtime is different. (Or there was no mtime.)
330 // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
331 if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
332 let mut backlog = backlog.lock();
333
334 log::info!(
335 "Inserting {:?} ({:?} bytes) into backlog",
336 &entry.path,
337 entry.size,
338 );
339 backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
340
341 if backlog.needs_drain() {
342 log::info!("Draining summary backlog...");
343 return backlog.drain().collect();
344 }
345 }
346 }
347 Err(err) => {
348 log::error!(
349 "Error trying to get file digest db entry {:?}: {:?}",
350 &entry_db_key,
351 err
352 );
353 }
354 }
355
356 Vec::new()
357 }
358
359 fn scan_updated_entries(
360 &self,
361 worktree: Snapshot,
362 updated_entries: UpdatedEntriesSet,
363 cx: &App,
364 ) -> Backlogged {
365 log::info!("Scanning for updated entries that might need summarization...");
366 let (tx, rx) = channel::bounded(512);
367 // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
368 let db_connection = self.db_connection.clone();
369 let digest_db = self.file_digest_db;
370 let backlog = Arc::clone(&self.backlog);
371 let task = cx.background_spawn(async move {
372 let txn = db_connection
373 .read_txn()
374 .context("failed to create read transaction")?;
375
376 for (path, entry_id, status) in updated_entries.iter() {
377 match status {
378 project::PathChange::Loaded
379 | project::PathChange::Added
380 | project::PathChange::Updated
381 | project::PathChange::AddedOrUpdated => {
382 if let Some(entry) = worktree.entry_for_id(*entry_id)
383 && entry.is_file()
384 {
385 let needs_summary =
386 Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
387
388 if !needs_summary.is_empty() {
389 tx.send(needs_summary).await?;
390 }
391 }
392 }
393 project::PathChange::Removed => {
394 let _db_path = db_key_for_path(path);
395 // TODO delete db entries for deleted files
396 // deleted_entry_ranges_tx
397 // .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
398 // .await?;
399 }
400 }
401 }
402
403 Ok(())
404 });
405
406 Backlogged {
407 paths_to_digest: rx,
408 // deleted_entry_ranges: deleted_entry_ranges_rx,
409 task,
410 }
411 }
412
413 fn digest_files(
414 &self,
415 paths: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
416 worktree_abs_path: Arc<Path>,
417 cx: &App,
418 ) -> MightNeedSummaryFiles {
419 let fs = self.fs.clone();
420 let (rx, tx) = channel::bounded(2048);
421 let task = cx.spawn(async move |cx| {
422 cx.background_executor()
423 .scoped(|cx| {
424 for _ in 0..cx.num_cpus() {
425 cx.spawn(async {
426 while let Ok(pairs) = paths.recv().await {
427 // Note: we could process all these files concurrently if desired. Might or might not speed things up.
428 for (path, mtime) in pairs {
429 let entry_abs_path = worktree_abs_path.join(&path);
430
431 // Load the file's contents and compute its hash digest.
432 let unsummarized_file = {
433 let Some(contents) = fs
434 .load(&entry_abs_path)
435 .await
436 .with_context(|| {
437 format!("failed to read path {entry_abs_path:?}")
438 })
439 .log_err()
440 else {
441 continue;
442 };
443
444 let digest = {
445 let mut hasher = blake3::Hasher::new();
446 // Incorporate both the (relative) file path as well as the contents of the file into the hash.
447 // This is because in some languages and frameworks, identical files can do different things
448 // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
449 hasher.update(path.display().to_string().as_bytes());
450 hasher.update(contents.as_bytes());
451 hasher.finalize().to_hex()
452 };
453
454 UnsummarizedFile {
455 digest,
456 contents,
457 path,
458 mtime,
459 }
460 };
461
462 if let Err(err) = rx
463 .send(unsummarized_file)
464 .map_err(|error| anyhow!(error))
465 .await
466 {
467 log::error!("Error: {:?}", err);
468
469 return;
470 }
471 }
472 }
473 });
474 }
475 })
476 .await;
477 Ok(())
478 });
479
480 MightNeedSummaryFiles { files: tx, task }
481 }
482
483 fn summarize_files(
484 &self,
485 unsummarized_files: channel::Receiver<UnsummarizedFile>,
486 cx: &App,
487 ) -> SummarizeFiles {
488 let (summarized_tx, summarized_rx) = channel::bounded(512);
489 let task = cx.spawn(async move |cx| {
490 while let Ok(file) = unsummarized_files.recv().await {
491 log::debug!("Summarizing {:?}", file);
492 let summary = cx
493 .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
494 .await
495 .unwrap_or_else(|err| {
496 // Log a warning because we'll continue anyway.
497 // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
498 // but this might give bad summaries due to cutting off source code files in the middle.
499 log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
500
501 String::new()
502 });
503
504 // Note that the summary could be empty because of an error talking to a cloud provider,
505 // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
506 if !summary.is_empty() {
507 summarized_tx
508 .send(SummarizedFile {
509 path: file.path.display().to_string(),
510 digest: file.digest,
511 summary,
512 mtime: file.mtime,
513 })
514 .await?
515 }
516 }
517
518 Ok(())
519 });
520
521 SummarizeFiles {
522 files: summarized_rx,
523 task,
524 }
525 }
526
527 fn summarize_code(
528 code: &str,
529 path: &Path,
530 cx: &App,
531 ) -> impl Future<Output = Result<String>> + use<> {
532 let start = Instant::now();
533 let (summary_model_id, use_cache): (LanguageModelId, bool) = (
534 "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
535 false, // qwen2 doesn't have a cache, but we should probably infer this from the model
536 );
537 let Some(model) = LanguageModelRegistry::read_global(cx)
538 .available_models(cx)
539 .find(|model| &model.id() == &summary_model_id)
540 else {
541 return cx.background_spawn(async move {
542 anyhow::bail!("Couldn't find the preferred summarization model ({summary_model_id:?}) in the language registry's available models")
543 });
544 };
545 let utf8_path = path.to_string_lossy();
546 const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
547 let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
548
549 log::debug!(
550 "Summarizing code by sending this prompt to {:?}: {:?}",
551 model.name(),
552 &prompt
553 );
554
555 let request = LanguageModelRequest {
556 thread_id: None,
557 prompt_id: None,
558 mode: None,
559 intent: None,
560 messages: vec![LanguageModelRequestMessage {
561 role: Role::User,
562 content: vec![prompt.into()],
563 cache: use_cache,
564 }],
565 tools: Vec::new(),
566 tool_choice: None,
567 stop: Vec::new(),
568 temperature: None,
569 thinking_allowed: true,
570 };
571
572 let code_len = code.len();
573 cx.spawn(async move |cx| {
574 let stream = model.stream_completion(request, cx);
575 cx.background_spawn(async move {
576 let answer: String = stream
577 .await?
578 .filter_map(|event| async {
579 if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
580 Some(text)
581 } else {
582 None
583 }
584 })
585 .collect()
586 .await;
587
588 log::info!(
589 "It took {:?} to summarize {:?} bytes of code.",
590 start.elapsed(),
591 code_len
592 );
593
594 log::debug!("Summary was: {:?}", &answer);
595
596 Ok(answer)
597 })
598 .await
599
600 // TODO if summarization failed, put it back in the backlog!
601 })
602 }
603
604 fn persist_summaries(
605 &self,
606 summaries: channel::Receiver<SummarizedFile>,
607 cx: &App,
608 ) -> Task<Result<()>> {
609 let db_connection = self.db_connection.clone();
610 let digest_db = self.file_digest_db;
611 let summary_db = self.summary_db;
612 cx.background_spawn(async move {
613 let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2)));
614 while let Some(summaries) = summaries.next().await {
615 let mut txn = db_connection.write_txn()?;
616 for file in &summaries {
617 log::debug!(
618 "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
619 &file.path,
620 file.summary.len(),
621 file.digest
622 );
623 digest_db.put(
624 &mut txn,
625 &file.path,
626 &FileDigest {
627 mtime: file.mtime,
628 digest: file.digest,
629 },
630 )?;
631 summary_db.put(&mut txn, &file.digest, &file.summary)?;
632 }
633 txn.commit()?;
634
635 drop(summaries);
636 log::debug!("committed summaries");
637 }
638
639 Ok(())
640 })
641 }
642
643 /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
644 pub(crate) fn flush_backlog(
645 &self,
646 worktree_abs_path: Arc<Path>,
647 cx: &App,
648 ) -> impl Future<Output = Result<()>> + use<> {
649 let start = Instant::now();
650 let backlogged = {
651 let (tx, rx) = channel::bounded(512);
652 let needs_summary: Vec<(Arc<Path>, Option<MTime>)> = {
653 let mut backlog = self.backlog.lock();
654
655 backlog.drain().collect()
656 };
657
658 let task = cx.background_spawn(async move {
659 tx.send(needs_summary).await?;
660 Ok(())
661 });
662
663 Backlogged {
664 paths_to_digest: rx,
665 task,
666 }
667 };
668
669 let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
670 let needs_summary = self.check_summary_cache(digest.files, cx);
671 let summaries = self.summarize_files(needs_summary.files, cx);
672 let persist = self.persist_summaries(summaries.files, cx);
673
674 async move {
675 futures::try_join!(
676 backlogged.task,
677 digest.task,
678 needs_summary.task,
679 summaries.task,
680 persist
681 )?;
682
683 log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
684
685 Ok(())
686 }
687 }
688
689 pub(crate) fn backlog_len(&self) -> usize {
690 self.backlog.lock().len()
691 }
692}
693
694fn db_key_for_path(path: &Arc<Path>) -> String {
695 path.to_string_lossy().replace('/', "\0")
696}