1use anyhow::{Context as _, Result, anyhow};
2use arrayvec::ArrayString;
3use fs::{Fs, MTime};
4use futures::{TryFutureExt, stream::StreamExt};
5use futures_batch::ChunksTimeoutStreamExt;
6use gpui::{App, AppContext as _, Entity, Task};
7use heed::{
8 RoTxn,
9 types::{SerdeBincode, Str},
10};
11use language_model::{
12 LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
13 LanguageModelRequestMessage, Role,
14};
15use log;
16use parking_lot::Mutex;
17use project::{Entry, UpdatedEntriesSet, Worktree};
18use serde::{Deserialize, Serialize};
19use smol::channel;
20use std::{
21 future::Future,
22 path::Path,
23 pin::pin,
24 sync::Arc,
25 time::{Duration, Instant},
26};
27use util::ResultExt;
28use worktree::Snapshot;
29
30use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
31
32#[derive(Serialize, Deserialize, Debug)]
33pub struct FileSummary {
34 pub filename: String,
35 pub summary: String,
36}
37
38#[derive(Debug, Serialize, Deserialize)]
39struct UnsummarizedFile {
40 // Path to the file on disk
41 path: Arc<Path>,
42 // The mtime of the file on disk
43 mtime: Option<MTime>,
44 // BLAKE3 hash of the source file's contents
45 digest: Blake3Digest,
46 // The source file's contents
47 contents: String,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
51struct SummarizedFile {
52 // Path to the file on disk
53 path: String,
54 // The mtime of the file on disk
55 mtime: Option<MTime>,
56 // BLAKE3 hash of the source file's contents
57 digest: Blake3Digest,
58 // The LLM's summary of the file's contents
59 summary: String,
60}
61
62/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
63pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
64
65#[derive(Debug, Serialize, Deserialize)]
66pub struct FileDigest {
67 pub mtime: Option<MTime>,
68 pub digest: Blake3Digest,
69}
70
71struct NeedsSummary {
72 files: channel::Receiver<UnsummarizedFile>,
73 task: Task<Result<()>>,
74}
75
76struct SummarizeFiles {
77 files: channel::Receiver<SummarizedFile>,
78 task: Task<Result<()>>,
79}
80
81pub struct SummaryIndex {
82 worktree: Entity<Worktree>,
83 fs: Arc<dyn Fs>,
84 db_connection: heed::Env,
85 file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
86 summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
87 backlog: Arc<Mutex<SummaryBacklog>>,
88 _entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
89}
90
91struct Backlogged {
92 paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
93 task: Task<Result<()>>,
94}
95
96struct MightNeedSummaryFiles {
97 files: channel::Receiver<UnsummarizedFile>,
98 task: Task<Result<()>>,
99}
100
101impl SummaryIndex {
102 pub fn new(
103 worktree: Entity<Worktree>,
104 fs: Arc<dyn Fs>,
105 db_connection: heed::Env,
106 file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
107 summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
108 _entry_ids_being_indexed: Arc<IndexingEntrySet>,
109 ) -> Self {
110 Self {
111 worktree,
112 fs,
113 db_connection,
114 file_digest_db,
115 summary_db,
116 _entry_ids_being_indexed,
117 backlog: Default::default(),
118 }
119 }
120
121 pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
122 self.file_digest_db
123 }
124
125 pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
126 self.summary_db
127 }
128
129 pub fn index_entries_changed_on_disk(
130 &self,
131 is_auto_available: bool,
132 cx: &App,
133 ) -> impl Future<Output = Result<()>> + use<> {
134 let start = Instant::now();
135 let backlogged;
136 let digest;
137 let needs_summary;
138 let summaries;
139 let persist;
140
141 if is_auto_available {
142 let worktree = self.worktree.read(cx).snapshot();
143 let worktree_abs_path = worktree.abs_path().clone();
144
145 backlogged = self.scan_entries(worktree, cx);
146 digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
147 needs_summary = self.check_summary_cache(digest.files, cx);
148 summaries = self.summarize_files(needs_summary.files, cx);
149 persist = self.persist_summaries(summaries.files, cx);
150 } else {
151 // This feature is only staff-shipped, so make the rest of these no-ops.
152 backlogged = Backlogged {
153 paths_to_digest: channel::unbounded().1,
154 task: Task::ready(Ok(())),
155 };
156 digest = MightNeedSummaryFiles {
157 files: channel::unbounded().1,
158 task: Task::ready(Ok(())),
159 };
160 needs_summary = NeedsSummary {
161 files: channel::unbounded().1,
162 task: Task::ready(Ok(())),
163 };
164 summaries = SummarizeFiles {
165 files: channel::unbounded().1,
166 task: Task::ready(Ok(())),
167 };
168 persist = Task::ready(Ok(()));
169 }
170
171 async move {
172 futures::try_join!(
173 backlogged.task,
174 digest.task,
175 needs_summary.task,
176 summaries.task,
177 persist
178 )?;
179
180 if is_auto_available {
181 log::info!(
182 "Summarizing everything that changed on disk took {:?}",
183 start.elapsed()
184 );
185 }
186
187 Ok(())
188 }
189 }
190
191 pub fn index_updated_entries(
192 &mut self,
193 updated_entries: UpdatedEntriesSet,
194 is_auto_available: bool,
195 cx: &App,
196 ) -> impl Future<Output = Result<()>> + use<> {
197 let start = Instant::now();
198 let backlogged;
199 let digest;
200 let needs_summary;
201 let summaries;
202 let persist;
203
204 if is_auto_available {
205 let worktree = self.worktree.read(cx).snapshot();
206 let worktree_abs_path = worktree.abs_path().clone();
207
208 backlogged = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
209 digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
210 needs_summary = self.check_summary_cache(digest.files, cx);
211 summaries = self.summarize_files(needs_summary.files, cx);
212 persist = self.persist_summaries(summaries.files, cx);
213 } else {
214 // This feature is only staff-shipped, so make the rest of these no-ops.
215 backlogged = Backlogged {
216 paths_to_digest: channel::unbounded().1,
217 task: Task::ready(Ok(())),
218 };
219 digest = MightNeedSummaryFiles {
220 files: channel::unbounded().1,
221 task: Task::ready(Ok(())),
222 };
223 needs_summary = NeedsSummary {
224 files: channel::unbounded().1,
225 task: Task::ready(Ok(())),
226 };
227 summaries = SummarizeFiles {
228 files: channel::unbounded().1,
229 task: Task::ready(Ok(())),
230 };
231 persist = Task::ready(Ok(()));
232 }
233
234 async move {
235 futures::try_join!(
236 backlogged.task,
237 digest.task,
238 needs_summary.task,
239 summaries.task,
240 persist
241 )?;
242
243 log::debug!("Summarizing updated entries took {:?}", start.elapsed());
244
245 Ok(())
246 }
247 }
248
249 fn check_summary_cache(
250 &self,
251 might_need_summary: channel::Receiver<UnsummarizedFile>,
252 cx: &App,
253 ) -> NeedsSummary {
254 let db_connection = self.db_connection.clone();
255 let db = self.summary_db;
256 let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
257 let task = cx.background_spawn(async move {
258 let mut might_need_summary = pin!(might_need_summary);
259 while let Some(file) = might_need_summary.next().await {
260 let tx = db_connection
261 .read_txn()
262 .context("Failed to create read transaction for checking which hashes are in summary cache")?;
263
264 match db.get(&tx, &file.digest) {
265 Ok(opt_answer) => {
266 if opt_answer.is_none() {
267 // It's not in the summary cache db, so we need to summarize it.
268 log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
269 needs_summary_tx.send(file).await?;
270 } else {
271 log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
272 }
273 }
274 Err(err) => {
275 log::error!("Reading from the summaries database failed: {:?}", err);
276 }
277 }
278 }
279
280 Ok(())
281 });
282
283 NeedsSummary {
284 files: needs_summary_rx,
285 task,
286 }
287 }
288
289 fn scan_entries(&self, worktree: Snapshot, cx: &App) -> Backlogged {
290 let (tx, rx) = channel::bounded(512);
291 let db_connection = self.db_connection.clone();
292 let digest_db = self.file_digest_db;
293 let backlog = Arc::clone(&self.backlog);
294 let task = cx.background_spawn(async move {
295 let txn = db_connection
296 .read_txn()
297 .context("failed to create read transaction")?;
298
299 for entry in worktree.files(false, 0) {
300 let needs_summary =
301 Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
302
303 if !needs_summary.is_empty() {
304 tx.send(needs_summary).await?;
305 }
306 }
307
308 // TODO delete db entries for deleted files
309
310 Ok(())
311 });
312
313 Backlogged {
314 paths_to_digest: rx,
315 task,
316 }
317 }
318
319 fn add_to_backlog(
320 backlog: Arc<Mutex<SummaryBacklog>>,
321 digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
322 txn: &RoTxn<'_>,
323 entry: &Entry,
324 ) -> Vec<(Arc<Path>, Option<MTime>)> {
325 let entry_db_key = db_key_for_path(&entry.path);
326
327 match digest_db.get(&txn, &entry_db_key) {
328 Ok(opt_saved_digest) => {
329 // The file path is the same, but the mtime is different. (Or there was no mtime.)
330 // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
331 if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
332 let mut backlog = backlog.lock();
333
334 log::info!(
335 "Inserting {:?} ({:?} bytes) into backlog",
336 &entry.path,
337 entry.size,
338 );
339 backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
340
341 if backlog.needs_drain() {
342 log::info!("Draining summary backlog...");
343 return backlog.drain().collect();
344 }
345 }
346 }
347 Err(err) => {
348 log::error!(
349 "Error trying to get file digest db entry {:?}: {:?}",
350 &entry_db_key,
351 err
352 );
353 }
354 }
355
356 Vec::new()
357 }
358
359 fn scan_updated_entries(
360 &self,
361 worktree: Snapshot,
362 updated_entries: UpdatedEntriesSet,
363 cx: &App,
364 ) -> Backlogged {
365 log::info!("Scanning for updated entries that might need summarization...");
366 let (tx, rx) = channel::bounded(512);
367 // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
368 let db_connection = self.db_connection.clone();
369 let digest_db = self.file_digest_db;
370 let backlog = Arc::clone(&self.backlog);
371 let task = cx.background_spawn(async move {
372 let txn = db_connection
373 .read_txn()
374 .context("failed to create read transaction")?;
375
376 for (path, entry_id, status) in updated_entries.iter() {
377 match status {
378 project::PathChange::Loaded
379 | project::PathChange::Added
380 | project::PathChange::Updated
381 | project::PathChange::AddedOrUpdated => {
382 if let Some(entry) = worktree.entry_for_id(*entry_id) {
383 if entry.is_file() {
384 let needs_summary = Self::add_to_backlog(
385 Arc::clone(&backlog),
386 digest_db,
387 &txn,
388 entry,
389 );
390
391 if !needs_summary.is_empty() {
392 tx.send(needs_summary).await?;
393 }
394 }
395 }
396 }
397 project::PathChange::Removed => {
398 let _db_path = db_key_for_path(path);
399 // TODO delete db entries for deleted files
400 // deleted_entry_ranges_tx
401 // .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
402 // .await?;
403 }
404 }
405 }
406
407 Ok(())
408 });
409
410 Backlogged {
411 paths_to_digest: rx,
412 // deleted_entry_ranges: deleted_entry_ranges_rx,
413 task,
414 }
415 }
416
417 fn digest_files(
418 &self,
419 paths: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
420 worktree_abs_path: Arc<Path>,
421 cx: &App,
422 ) -> MightNeedSummaryFiles {
423 let fs = self.fs.clone();
424 let (rx, tx) = channel::bounded(2048);
425 let task = cx.spawn(async move |cx| {
426 cx.background_executor()
427 .scoped(|cx| {
428 for _ in 0..cx.num_cpus() {
429 cx.spawn(async {
430 while let Ok(pairs) = paths.recv().await {
431 // Note: we could process all these files concurrently if desired. Might or might not speed things up.
432 for (path, mtime) in pairs {
433 let entry_abs_path = worktree_abs_path.join(&path);
434
435 // Load the file's contents and compute its hash digest.
436 let unsummarized_file = {
437 let Some(contents) = fs
438 .load(&entry_abs_path)
439 .await
440 .with_context(|| {
441 format!("failed to read path {entry_abs_path:?}")
442 })
443 .log_err()
444 else {
445 continue;
446 };
447
448 let digest = {
449 let mut hasher = blake3::Hasher::new();
450 // Incorporate both the (relative) file path as well as the contents of the file into the hash.
451 // This is because in some languages and frameworks, identical files can do different things
452 // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
453 hasher.update(path.display().to_string().as_bytes());
454 hasher.update(contents.as_bytes());
455 hasher.finalize().to_hex()
456 };
457
458 UnsummarizedFile {
459 digest,
460 contents,
461 path,
462 mtime,
463 }
464 };
465
466 if let Err(err) = rx
467 .send(unsummarized_file)
468 .map_err(|error| anyhow!(error))
469 .await
470 {
471 log::error!("Error: {:?}", err);
472
473 return;
474 }
475 }
476 }
477 });
478 }
479 })
480 .await;
481 Ok(())
482 });
483
484 MightNeedSummaryFiles { files: tx, task }
485 }
486
487 fn summarize_files(
488 &self,
489 unsummarized_files: channel::Receiver<UnsummarizedFile>,
490 cx: &App,
491 ) -> SummarizeFiles {
492 let (summarized_tx, summarized_rx) = channel::bounded(512);
493 let task = cx.spawn(async move |cx| {
494 while let Ok(file) = unsummarized_files.recv().await {
495 log::debug!("Summarizing {:?}", file);
496 let summary = cx
497 .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
498 .await
499 .unwrap_or_else(|err| {
500 // Log a warning because we'll continue anyway.
501 // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
502 // but this might give bad summaries due to cutting off source code files in the middle.
503 log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
504
505 String::new()
506 });
507
508 // Note that the summary could be empty because of an error talking to a cloud provider,
509 // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
510 if !summary.is_empty() {
511 summarized_tx
512 .send(SummarizedFile {
513 path: file.path.display().to_string(),
514 digest: file.digest,
515 summary,
516 mtime: file.mtime,
517 })
518 .await?
519 }
520 }
521
522 Ok(())
523 });
524
525 SummarizeFiles {
526 files: summarized_rx,
527 task,
528 }
529 }
530
531 fn summarize_code(
532 code: &str,
533 path: &Path,
534 cx: &App,
535 ) -> impl Future<Output = Result<String>> + use<> {
536 let start = Instant::now();
537 let (summary_model_id, use_cache): (LanguageModelId, bool) = (
538 "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
539 false, // qwen2 doesn't have a cache, but we should probably infer this from the model
540 );
541 let Some(model) = LanguageModelRegistry::read_global(cx)
542 .available_models(cx)
543 .find(|model| &model.id() == &summary_model_id)
544 else {
545 return cx.background_spawn(async move {
546 anyhow::bail!("Couldn't find the preferred summarization model ({summary_model_id:?}) in the language registry's available models")
547 });
548 };
549 let utf8_path = path.to_string_lossy();
550 const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
551 let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
552
553 log::debug!(
554 "Summarizing code by sending this prompt to {:?}: {:?}",
555 model.name(),
556 &prompt
557 );
558
559 let request = LanguageModelRequest {
560 thread_id: None,
561 prompt_id: None,
562 mode: None,
563 intent: None,
564 messages: vec![LanguageModelRequestMessage {
565 role: Role::User,
566 content: vec![prompt.into()],
567 cache: use_cache,
568 }],
569 tools: Vec::new(),
570 tool_choice: None,
571 stop: Vec::new(),
572 temperature: None,
573 thinking_allowed: true,
574 };
575
576 let code_len = code.len();
577 cx.spawn(async move |cx| {
578 let stream = model.stream_completion(request, &cx);
579 cx.background_spawn(async move {
580 let answer: String = stream
581 .await?
582 .filter_map(|event| async {
583 if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
584 Some(text)
585 } else {
586 None
587 }
588 })
589 .collect()
590 .await;
591
592 log::info!(
593 "It took {:?} to summarize {:?} bytes of code.",
594 start.elapsed(),
595 code_len
596 );
597
598 log::debug!("Summary was: {:?}", &answer);
599
600 Ok(answer)
601 })
602 .await
603
604 // TODO if summarization failed, put it back in the backlog!
605 })
606 }
607
608 fn persist_summaries(
609 &self,
610 summaries: channel::Receiver<SummarizedFile>,
611 cx: &App,
612 ) -> Task<Result<()>> {
613 let db_connection = self.db_connection.clone();
614 let digest_db = self.file_digest_db;
615 let summary_db = self.summary_db;
616 cx.background_spawn(async move {
617 let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2)));
618 while let Some(summaries) = summaries.next().await {
619 let mut txn = db_connection.write_txn()?;
620 for file in &summaries {
621 log::debug!(
622 "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
623 &file.path,
624 file.summary.len(),
625 file.digest
626 );
627 digest_db.put(
628 &mut txn,
629 &file.path,
630 &FileDigest {
631 mtime: file.mtime,
632 digest: file.digest,
633 },
634 )?;
635 summary_db.put(&mut txn, &file.digest, &file.summary)?;
636 }
637 txn.commit()?;
638
639 drop(summaries);
640 log::debug!("committed summaries");
641 }
642
643 Ok(())
644 })
645 }
646
647 /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
648 pub(crate) fn flush_backlog(
649 &self,
650 worktree_abs_path: Arc<Path>,
651 cx: &App,
652 ) -> impl Future<Output = Result<()>> + use<> {
653 let start = Instant::now();
654 let backlogged = {
655 let (tx, rx) = channel::bounded(512);
656 let needs_summary: Vec<(Arc<Path>, Option<MTime>)> = {
657 let mut backlog = self.backlog.lock();
658
659 backlog.drain().collect()
660 };
661
662 let task = cx.background_spawn(async move {
663 tx.send(needs_summary).await?;
664 Ok(())
665 });
666
667 Backlogged {
668 paths_to_digest: rx,
669 task,
670 }
671 };
672
673 let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
674 let needs_summary = self.check_summary_cache(digest.files, cx);
675 let summaries = self.summarize_files(needs_summary.files, cx);
676 let persist = self.persist_summaries(summaries.files, cx);
677
678 async move {
679 futures::try_join!(
680 backlogged.task,
681 digest.task,
682 needs_summary.task,
683 summaries.task,
684 persist
685 )?;
686
687 log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
688
689 Ok(())
690 }
691 }
692
693 pub(crate) fn backlog_len(&self) -> usize {
694 self.backlog.lock().len()
695 }
696}
697
698fn db_key_for_path(path: &Arc<Path>) -> String {
699 path.to_string_lossy().replace('/', "\0")
700}