summary_index.rs

  1use anyhow::{anyhow, Context as _, Result};
  2use arrayvec::ArrayString;
  3use fs::{Fs, MTime};
  4use futures::{stream::StreamExt, TryFutureExt};
  5use futures_batch::ChunksTimeoutStreamExt;
  6use gpui::{AppContext, Model, Task};
  7use heed::{
  8    types::{SerdeBincode, Str},
  9    RoTxn,
 10};
 11use language_model::{
 12    LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
 13    LanguageModelRequestMessage, Role,
 14};
 15use log;
 16use parking_lot::Mutex;
 17use project::{Entry, UpdatedEntriesSet, Worktree};
 18use serde::{Deserialize, Serialize};
 19use smol::channel;
 20use std::{
 21    future::Future,
 22    path::Path,
 23    sync::Arc,
 24    time::{Duration, Instant},
 25};
 26use util::ResultExt;
 27use worktree::Snapshot;
 28
 29use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
 30
 31#[derive(Serialize, Deserialize, Debug)]
 32pub struct FileSummary {
 33    pub filename: String,
 34    pub summary: String,
 35}
 36
 37#[derive(Debug, Serialize, Deserialize)]
 38struct UnsummarizedFile {
 39    // Path to the file on disk
 40    path: Arc<Path>,
 41    // The mtime of the file on disk
 42    mtime: Option<MTime>,
 43    // BLAKE3 hash of the source file's contents
 44    digest: Blake3Digest,
 45    // The source file's contents
 46    contents: String,
 47}
 48
 49#[derive(Debug, Serialize, Deserialize)]
 50struct SummarizedFile {
 51    // Path to the file on disk
 52    path: String,
 53    // The mtime of the file on disk
 54    mtime: Option<MTime>,
 55    // BLAKE3 hash of the source file's contents
 56    digest: Blake3Digest,
 57    // The LLM's summary of the file's contents
 58    summary: String,
 59}
 60
 61/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
 62pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
 63
 64#[derive(Debug, Serialize, Deserialize)]
 65pub struct FileDigest {
 66    pub mtime: Option<MTime>,
 67    pub digest: Blake3Digest,
 68}
 69
 70struct NeedsSummary {
 71    files: channel::Receiver<UnsummarizedFile>,
 72    task: Task<Result<()>>,
 73}
 74
 75struct SummarizeFiles {
 76    files: channel::Receiver<SummarizedFile>,
 77    task: Task<Result<()>>,
 78}
 79
 80pub struct SummaryIndex {
 81    worktree: Model<Worktree>,
 82    fs: Arc<dyn Fs>,
 83    db_connection: heed::Env,
 84    file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
 85    summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
 86    backlog: Arc<Mutex<SummaryBacklog>>,
 87    _entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
 88}
 89
 90struct Backlogged {
 91    paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
 92    task: Task<Result<()>>,
 93}
 94
 95struct MightNeedSummaryFiles {
 96    files: channel::Receiver<UnsummarizedFile>,
 97    task: Task<Result<()>>,
 98}
 99
100impl SummaryIndex {
101    pub fn new(
102        worktree: Model<Worktree>,
103        fs: Arc<dyn Fs>,
104        db_connection: heed::Env,
105        file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
106        summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
107        _entry_ids_being_indexed: Arc<IndexingEntrySet>,
108    ) -> Self {
109        Self {
110            worktree,
111            fs,
112            db_connection,
113            file_digest_db,
114            summary_db,
115            _entry_ids_being_indexed,
116            backlog: Default::default(),
117        }
118    }
119
120    pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
121        self.file_digest_db
122    }
123
124    pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
125        self.summary_db
126    }
127
128    pub fn index_entries_changed_on_disk(
129        &self,
130        is_auto_available: bool,
131        cx: &AppContext,
132    ) -> impl Future<Output = Result<()>> {
133        let start = Instant::now();
134        let backlogged;
135        let digest;
136        let needs_summary;
137        let summaries;
138        let persist;
139
140        if is_auto_available {
141            let worktree = self.worktree.read(cx).snapshot();
142            let worktree_abs_path = worktree.abs_path().clone();
143
144            backlogged = self.scan_entries(worktree, cx);
145            digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
146            needs_summary = self.check_summary_cache(digest.files, cx);
147            summaries = self.summarize_files(needs_summary.files, cx);
148            persist = self.persist_summaries(summaries.files, cx);
149        } else {
150            // This feature is only staff-shipped, so make the rest of these no-ops.
151            backlogged = Backlogged {
152                paths_to_digest: channel::unbounded().1,
153                task: Task::ready(Ok(())),
154            };
155            digest = MightNeedSummaryFiles {
156                files: channel::unbounded().1,
157                task: Task::ready(Ok(())),
158            };
159            needs_summary = NeedsSummary {
160                files: channel::unbounded().1,
161                task: Task::ready(Ok(())),
162            };
163            summaries = SummarizeFiles {
164                files: channel::unbounded().1,
165                task: Task::ready(Ok(())),
166            };
167            persist = Task::ready(Ok(()));
168        }
169
170        async move {
171            futures::try_join!(
172                backlogged.task,
173                digest.task,
174                needs_summary.task,
175                summaries.task,
176                persist
177            )?;
178
179            if is_auto_available {
180                log::info!(
181                    "Summarizing everything that changed on disk took {:?}",
182                    start.elapsed()
183                );
184            }
185
186            Ok(())
187        }
188    }
189
190    pub fn index_updated_entries(
191        &mut self,
192        updated_entries: UpdatedEntriesSet,
193        is_auto_available: bool,
194        cx: &AppContext,
195    ) -> impl Future<Output = Result<()>> {
196        let start = Instant::now();
197        let backlogged;
198        let digest;
199        let needs_summary;
200        let summaries;
201        let persist;
202
203        if is_auto_available {
204            let worktree = self.worktree.read(cx).snapshot();
205            let worktree_abs_path = worktree.abs_path().clone();
206
207            backlogged = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
208            digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
209            needs_summary = self.check_summary_cache(digest.files, cx);
210            summaries = self.summarize_files(needs_summary.files, cx);
211            persist = self.persist_summaries(summaries.files, cx);
212        } else {
213            // This feature is only staff-shipped, so make the rest of these no-ops.
214            backlogged = Backlogged {
215                paths_to_digest: channel::unbounded().1,
216                task: Task::ready(Ok(())),
217            };
218            digest = MightNeedSummaryFiles {
219                files: channel::unbounded().1,
220                task: Task::ready(Ok(())),
221            };
222            needs_summary = NeedsSummary {
223                files: channel::unbounded().1,
224                task: Task::ready(Ok(())),
225            };
226            summaries = SummarizeFiles {
227                files: channel::unbounded().1,
228                task: Task::ready(Ok(())),
229            };
230            persist = Task::ready(Ok(()));
231        }
232
233        async move {
234            futures::try_join!(
235                backlogged.task,
236                digest.task,
237                needs_summary.task,
238                summaries.task,
239                persist
240            )?;
241
242            log::debug!("Summarizing updated entries took {:?}", start.elapsed());
243
244            Ok(())
245        }
246    }
247
248    fn check_summary_cache(
249        &self,
250        mut might_need_summary: channel::Receiver<UnsummarizedFile>,
251        cx: &AppContext,
252    ) -> NeedsSummary {
253        let db_connection = self.db_connection.clone();
254        let db = self.summary_db;
255        let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
256        let task = cx.background_executor().spawn(async move {
257            while let Some(file) = might_need_summary.next().await {
258                let tx = db_connection
259                    .read_txn()
260                    .context("Failed to create read transaction for checking which hashes are in summary cache")?;
261
262                match db.get(&tx, &file.digest) {
263                    Ok(opt_answer) => {
264                        if opt_answer.is_none() {
265                            // It's not in the summary cache db, so we need to summarize it.
266                            log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
267                            needs_summary_tx.send(file).await?;
268                        } else {
269                            log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
270                        }
271                    }
272                    Err(err) => {
273                        log::error!("Reading from the summaries database failed: {:?}", err);
274                    }
275                }
276            }
277
278            Ok(())
279        });
280
281        NeedsSummary {
282            files: needs_summary_rx,
283            task,
284        }
285    }
286
287    fn scan_entries(&self, worktree: Snapshot, cx: &AppContext) -> Backlogged {
288        let (tx, rx) = channel::bounded(512);
289        let db_connection = self.db_connection.clone();
290        let digest_db = self.file_digest_db;
291        let backlog = Arc::clone(&self.backlog);
292        let task = cx.background_executor().spawn(async move {
293            let txn = db_connection
294                .read_txn()
295                .context("failed to create read transaction")?;
296
297            for entry in worktree.files(false, 0) {
298                let needs_summary =
299                    Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
300
301                if !needs_summary.is_empty() {
302                    tx.send(needs_summary).await?;
303                }
304            }
305
306            // TODO delete db entries for deleted files
307
308            Ok(())
309        });
310
311        Backlogged {
312            paths_to_digest: rx,
313            task,
314        }
315    }
316
317    fn add_to_backlog(
318        backlog: Arc<Mutex<SummaryBacklog>>,
319        digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
320        txn: &RoTxn<'_>,
321        entry: &Entry,
322    ) -> Vec<(Arc<Path>, Option<MTime>)> {
323        let entry_db_key = db_key_for_path(&entry.path);
324
325        match digest_db.get(&txn, &entry_db_key) {
326            Ok(opt_saved_digest) => {
327                // The file path is the same, but the mtime is different. (Or there was no mtime.)
328                // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
329                if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
330                    let mut backlog = backlog.lock();
331
332                    log::info!(
333                        "Inserting {:?} ({:?} bytes) into backlog",
334                        &entry.path,
335                        entry.size,
336                    );
337                    backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
338
339                    if backlog.needs_drain() {
340                        log::info!("Draining summary backlog...");
341                        return backlog.drain().collect();
342                    }
343                }
344            }
345            Err(err) => {
346                log::error!(
347                    "Error trying to get file digest db entry {:?}: {:?}",
348                    &entry_db_key,
349                    err
350                );
351            }
352        }
353
354        Vec::new()
355    }
356
357    fn scan_updated_entries(
358        &self,
359        worktree: Snapshot,
360        updated_entries: UpdatedEntriesSet,
361        cx: &AppContext,
362    ) -> Backlogged {
363        log::info!("Scanning for updated entries that might need summarization...");
364        let (tx, rx) = channel::bounded(512);
365        // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
366        let db_connection = self.db_connection.clone();
367        let digest_db = self.file_digest_db;
368        let backlog = Arc::clone(&self.backlog);
369        let task = cx.background_executor().spawn(async move {
370            let txn = db_connection
371                .read_txn()
372                .context("failed to create read transaction")?;
373
374            for (path, entry_id, status) in updated_entries.iter() {
375                match status {
376                    project::PathChange::Loaded
377                    | project::PathChange::Added
378                    | project::PathChange::Updated
379                    | project::PathChange::AddedOrUpdated => {
380                        if let Some(entry) = worktree.entry_for_id(*entry_id) {
381                            if entry.is_file() {
382                                let needs_summary = Self::add_to_backlog(
383                                    Arc::clone(&backlog),
384                                    digest_db,
385                                    &txn,
386                                    entry,
387                                );
388
389                                if !needs_summary.is_empty() {
390                                    tx.send(needs_summary).await?;
391                                }
392                            }
393                        }
394                    }
395                    project::PathChange::Removed => {
396                        let _db_path = db_key_for_path(path);
397                        // TODO delete db entries for deleted files
398                        // deleted_entry_ranges_tx
399                        //     .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
400                        //     .await?;
401                    }
402                }
403            }
404
405            Ok(())
406        });
407
408        Backlogged {
409            paths_to_digest: rx,
410            // deleted_entry_ranges: deleted_entry_ranges_rx,
411            task,
412        }
413    }
414
415    fn digest_files(
416        &self,
417        paths: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
418        worktree_abs_path: Arc<Path>,
419        cx: &AppContext,
420    ) -> MightNeedSummaryFiles {
421        let fs = self.fs.clone();
422        let (rx, tx) = channel::bounded(2048);
423        let task = cx.spawn(|cx| async move {
424            cx.background_executor()
425                .scoped(|cx| {
426                    for _ in 0..cx.num_cpus() {
427                        cx.spawn(async {
428                            while let Ok(pairs) = paths.recv().await {
429                                // Note: we could process all these files concurrently if desired. Might or might not speed things up.
430                                for (path, mtime) in pairs {
431                                    let entry_abs_path = worktree_abs_path.join(&path);
432
433                                    // Load the file's contents and compute its hash digest.
434                                    let unsummarized_file = {
435                                        let Some(contents) = fs
436                                            .load(&entry_abs_path)
437                                            .await
438                                            .with_context(|| {
439                                                format!("failed to read path {entry_abs_path:?}")
440                                            })
441                                            .log_err()
442                                        else {
443                                            continue;
444                                        };
445
446                                        let digest = {
447                                            let mut hasher = blake3::Hasher::new();
448                                            // Incorporate both the (relative) file path as well as the contents of the file into the hash.
449                                            // This is because in some languages and frameworks, identical files can do different things
450                                            // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
451                                            hasher.update(path.display().to_string().as_bytes());
452                                            hasher.update(contents.as_bytes());
453                                            hasher.finalize().to_hex()
454                                        };
455
456                                        UnsummarizedFile {
457                                            digest,
458                                            contents,
459                                            path,
460                                            mtime,
461                                        }
462                                    };
463
464                                    if let Err(err) = rx
465                                        .send(unsummarized_file)
466                                        .map_err(|error| anyhow!(error))
467                                        .await
468                                    {
469                                        log::error!("Error: {:?}", err);
470
471                                        return;
472                                    }
473                                }
474                            }
475                        });
476                    }
477                })
478                .await;
479            Ok(())
480        });
481
482        MightNeedSummaryFiles { files: tx, task }
483    }
484
485    fn summarize_files(
486        &self,
487        mut unsummarized_files: channel::Receiver<UnsummarizedFile>,
488        cx: &AppContext,
489    ) -> SummarizeFiles {
490        let (summarized_tx, summarized_rx) = channel::bounded(512);
491        let task = cx.spawn(|cx| async move {
492            while let Some(file) = unsummarized_files.next().await {
493                log::debug!("Summarizing {:?}", file);
494                let summary = cx
495                    .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
496                    .await
497                    .unwrap_or_else(|err| {
498                        // Log a warning because we'll continue anyway.
499                        // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
500                        // but this might give bad summaries due to cutting off source code files in the middle.
501                        log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
502
503                        String::new()
504                    });
505
506                // Note that the summary could be empty because of an error talking to a cloud provider,
507                // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
508                if !summary.is_empty() {
509                    summarized_tx
510                        .send(SummarizedFile {
511                            path: file.path.display().to_string(),
512                            digest: file.digest,
513                            summary,
514                            mtime: file.mtime,
515                        })
516                        .await?
517                }
518            }
519
520            Ok(())
521        });
522
523        SummarizeFiles {
524            files: summarized_rx,
525            task,
526        }
527    }
528
529    fn summarize_code(
530        code: &str,
531        path: &Path,
532        cx: &AppContext,
533    ) -> impl Future<Output = Result<String>> {
534        let start = Instant::now();
535        let (summary_model_id, use_cache): (LanguageModelId, bool) = (
536            "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
537            false, // qwen2 doesn't have a cache, but we should probably infer this from the model
538        );
539        let Some(model) = LanguageModelRegistry::read_global(cx)
540            .available_models(cx)
541            .find(|model| &model.id() == &summary_model_id)
542        else {
543            return cx.background_executor().spawn(async move {
544                Err(anyhow!("Couldn't find the preferred summarization model ({:?}) in the language registry's available models", summary_model_id))
545            });
546        };
547        let utf8_path = path.to_string_lossy();
548        const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
549        let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
550
551        log::debug!(
552            "Summarizing code by sending this prompt to {:?}: {:?}",
553            model.name(),
554            &prompt
555        );
556
557        let request = LanguageModelRequest {
558            messages: vec![LanguageModelRequestMessage {
559                role: Role::User,
560                content: vec![prompt.into()],
561                cache: use_cache,
562            }],
563            tools: Vec::new(),
564            stop: Vec::new(),
565            temperature: None,
566        };
567
568        let code_len = code.len();
569        cx.spawn(|cx| async move {
570            let stream = model.stream_completion(request, &cx);
571            cx.background_executor()
572                .spawn(async move {
573                    let answer: String = stream
574                        .await?
575                        .filter_map(|event| async {
576                            if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
577                                Some(text)
578                            } else {
579                                None
580                            }
581                        })
582                        .collect()
583                        .await;
584
585                    log::info!(
586                        "It took {:?} to summarize {:?} bytes of code.",
587                        start.elapsed(),
588                        code_len
589                    );
590
591                    log::debug!("Summary was: {:?}", &answer);
592
593                    Ok(answer)
594                })
595                .await
596
597            // TODO if summarization failed, put it back in the backlog!
598        })
599    }
600
601    fn persist_summaries(
602        &self,
603        summaries: channel::Receiver<SummarizedFile>,
604        cx: &AppContext,
605    ) -> Task<Result<()>> {
606        let db_connection = self.db_connection.clone();
607        let digest_db = self.file_digest_db;
608        let summary_db = self.summary_db;
609        cx.background_executor().spawn(async move {
610            let mut summaries = summaries.chunks_timeout(4096, Duration::from_secs(2));
611            while let Some(summaries) = summaries.next().await {
612                let mut txn = db_connection.write_txn()?;
613                for file in &summaries {
614                    log::debug!(
615                        "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
616                        &file.path,
617                        file.summary.len(),
618                        file.digest
619                    );
620                    digest_db.put(
621                        &mut txn,
622                        &file.path,
623                        &FileDigest {
624                            mtime: file.mtime,
625                            digest: file.digest,
626                        },
627                    )?;
628                    summary_db.put(&mut txn, &file.digest, &file.summary)?;
629                }
630                txn.commit()?;
631
632                drop(summaries);
633                log::debug!("committed summaries");
634            }
635
636            Ok(())
637        })
638    }
639
640    /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
641    pub(crate) fn flush_backlog(
642        &self,
643        worktree_abs_path: Arc<Path>,
644        cx: &AppContext,
645    ) -> impl Future<Output = Result<()>> {
646        let start = Instant::now();
647        let backlogged = {
648            let (tx, rx) = channel::bounded(512);
649            let needs_summary: Vec<(Arc<Path>, Option<MTime>)> = {
650                let mut backlog = self.backlog.lock();
651
652                backlog.drain().collect()
653            };
654
655            let task = cx.background_executor().spawn(async move {
656                tx.send(needs_summary).await?;
657                Ok(())
658            });
659
660            Backlogged {
661                paths_to_digest: rx,
662                task,
663            }
664        };
665
666        let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
667        let needs_summary = self.check_summary_cache(digest.files, cx);
668        let summaries = self.summarize_files(needs_summary.files, cx);
669        let persist = self.persist_summaries(summaries.files, cx);
670
671        async move {
672            futures::try_join!(
673                backlogged.task,
674                digest.task,
675                needs_summary.task,
676                summaries.task,
677                persist
678            )?;
679
680            log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
681
682            Ok(())
683        }
684    }
685
686    pub(crate) fn backlog_len(&self) -> usize {
687        self.backlog.lock().len()
688    }
689}
690
691fn db_key_for_path(path: &Arc<Path>) -> String {
692    path.to_string_lossy().replace('/', "\0")
693}