summary_index.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use arrayvec::ArrayString;
  3use fs::{Fs, MTime};
  4use futures::{TryFutureExt, stream::StreamExt};
  5use futures_batch::ChunksTimeoutStreamExt;
  6use gpui::{App, AppContext as _, Entity, Task};
  7use heed::{
  8    RoTxn,
  9    types::{SerdeBincode, Str},
 10};
 11use language_model::{
 12    LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
 13    LanguageModelRequestMessage, Role,
 14};
 15use log;
 16use parking_lot::Mutex;
 17use project::{Entry, UpdatedEntriesSet, Worktree};
 18use serde::{Deserialize, Serialize};
 19use smol::channel;
 20use std::{
 21    future::Future,
 22    path::Path,
 23    pin::pin,
 24    sync::Arc,
 25    time::{Duration, Instant},
 26};
 27use util::ResultExt;
 28use worktree::Snapshot;
 29
 30use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
 31
 32#[derive(Serialize, Deserialize, Debug)]
 33pub struct FileSummary {
 34    pub filename: String,
 35    pub summary: String,
 36}
 37
 38#[derive(Debug, Serialize, Deserialize)]
 39struct UnsummarizedFile {
 40    // Path to the file on disk
 41    path: Arc<Path>,
 42    // The mtime of the file on disk
 43    mtime: Option<MTime>,
 44    // BLAKE3 hash of the source file's contents
 45    digest: Blake3Digest,
 46    // The source file's contents
 47    contents: String,
 48}
 49
 50#[derive(Debug, Serialize, Deserialize)]
 51struct SummarizedFile {
 52    // Path to the file on disk
 53    path: String,
 54    // The mtime of the file on disk
 55    mtime: Option<MTime>,
 56    // BLAKE3 hash of the source file's contents
 57    digest: Blake3Digest,
 58    // The LLM's summary of the file's contents
 59    summary: String,
 60}
 61
 62/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
 63pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
 64
 65#[derive(Debug, Serialize, Deserialize)]
 66pub struct FileDigest {
 67    pub mtime: Option<MTime>,
 68    pub digest: Blake3Digest,
 69}
 70
 71struct NeedsSummary {
 72    files: channel::Receiver<UnsummarizedFile>,
 73    task: Task<Result<()>>,
 74}
 75
 76struct SummarizeFiles {
 77    files: channel::Receiver<SummarizedFile>,
 78    task: Task<Result<()>>,
 79}
 80
 81pub struct SummaryIndex {
 82    worktree: Entity<Worktree>,
 83    fs: Arc<dyn Fs>,
 84    db_connection: heed::Env,
 85    file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
 86    summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
 87    backlog: Arc<Mutex<SummaryBacklog>>,
 88    _entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
 89}
 90
 91struct Backlogged {
 92    paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
 93    task: Task<Result<()>>,
 94}
 95
 96struct MightNeedSummaryFiles {
 97    files: channel::Receiver<UnsummarizedFile>,
 98    task: Task<Result<()>>,
 99}
100
101impl SummaryIndex {
102    pub fn new(
103        worktree: Entity<Worktree>,
104        fs: Arc<dyn Fs>,
105        db_connection: heed::Env,
106        file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
107        summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
108        _entry_ids_being_indexed: Arc<IndexingEntrySet>,
109    ) -> Self {
110        Self {
111            worktree,
112            fs,
113            db_connection,
114            file_digest_db,
115            summary_db,
116            _entry_ids_being_indexed,
117            backlog: Default::default(),
118        }
119    }
120
121    pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
122        self.file_digest_db
123    }
124
125    pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
126        self.summary_db
127    }
128
129    pub fn index_entries_changed_on_disk(
130        &self,
131        is_auto_available: bool,
132        cx: &App,
133    ) -> impl Future<Output = Result<()>> + use<> {
134        let start = Instant::now();
135        let backlogged;
136        let digest;
137        let needs_summary;
138        let summaries;
139        let persist;
140
141        if is_auto_available {
142            let worktree = self.worktree.read(cx).snapshot();
143            let worktree_abs_path = worktree.abs_path().clone();
144
145            backlogged = self.scan_entries(worktree, cx);
146            digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
147            needs_summary = self.check_summary_cache(digest.files, cx);
148            summaries = self.summarize_files(needs_summary.files, cx);
149            persist = self.persist_summaries(summaries.files, cx);
150        } else {
151            // This feature is only staff-shipped, so make the rest of these no-ops.
152            backlogged = Backlogged {
153                paths_to_digest: channel::unbounded().1,
154                task: Task::ready(Ok(())),
155            };
156            digest = MightNeedSummaryFiles {
157                files: channel::unbounded().1,
158                task: Task::ready(Ok(())),
159            };
160            needs_summary = NeedsSummary {
161                files: channel::unbounded().1,
162                task: Task::ready(Ok(())),
163            };
164            summaries = SummarizeFiles {
165                files: channel::unbounded().1,
166                task: Task::ready(Ok(())),
167            };
168            persist = Task::ready(Ok(()));
169        }
170
171        async move {
172            futures::try_join!(
173                backlogged.task,
174                digest.task,
175                needs_summary.task,
176                summaries.task,
177                persist
178            )?;
179
180            if is_auto_available {
181                log::info!(
182                    "Summarizing everything that changed on disk took {:?}",
183                    start.elapsed()
184                );
185            }
186
187            Ok(())
188        }
189    }
190
191    pub fn index_updated_entries(
192        &mut self,
193        updated_entries: UpdatedEntriesSet,
194        is_auto_available: bool,
195        cx: &App,
196    ) -> impl Future<Output = Result<()>> + use<> {
197        let start = Instant::now();
198        let backlogged;
199        let digest;
200        let needs_summary;
201        let summaries;
202        let persist;
203
204        if is_auto_available {
205            let worktree = self.worktree.read(cx).snapshot();
206            let worktree_abs_path = worktree.abs_path().clone();
207
208            backlogged = self.scan_updated_entries(worktree, updated_entries, cx);
209            digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
210            needs_summary = self.check_summary_cache(digest.files, cx);
211            summaries = self.summarize_files(needs_summary.files, cx);
212            persist = self.persist_summaries(summaries.files, cx);
213        } else {
214            // This feature is only staff-shipped, so make the rest of these no-ops.
215            backlogged = Backlogged {
216                paths_to_digest: channel::unbounded().1,
217                task: Task::ready(Ok(())),
218            };
219            digest = MightNeedSummaryFiles {
220                files: channel::unbounded().1,
221                task: Task::ready(Ok(())),
222            };
223            needs_summary = NeedsSummary {
224                files: channel::unbounded().1,
225                task: Task::ready(Ok(())),
226            };
227            summaries = SummarizeFiles {
228                files: channel::unbounded().1,
229                task: Task::ready(Ok(())),
230            };
231            persist = Task::ready(Ok(()));
232        }
233
234        async move {
235            futures::try_join!(
236                backlogged.task,
237                digest.task,
238                needs_summary.task,
239                summaries.task,
240                persist
241            )?;
242
243            log::debug!("Summarizing updated entries took {:?}", start.elapsed());
244
245            Ok(())
246        }
247    }
248
249    fn check_summary_cache(
250        &self,
251        might_need_summary: channel::Receiver<UnsummarizedFile>,
252        cx: &App,
253    ) -> NeedsSummary {
254        let db_connection = self.db_connection.clone();
255        let db = self.summary_db;
256        let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
257        let task = cx.background_spawn(async move {
258            let mut might_need_summary = pin!(might_need_summary);
259            while let Some(file) = might_need_summary.next().await {
260                let tx = db_connection
261                    .read_txn()
262                    .context("Failed to create read transaction for checking which hashes are in summary cache")?;
263
264                match db.get(&tx, &file.digest) {
265                    Ok(opt_answer) => {
266                        if opt_answer.is_none() {
267                            // It's not in the summary cache db, so we need to summarize it.
268                            log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
269                            needs_summary_tx.send(file).await?;
270                        } else {
271                            log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
272                        }
273                    }
274                    Err(err) => {
275                        log::error!("Reading from the summaries database failed: {:?}", err);
276                    }
277                }
278            }
279
280            Ok(())
281        });
282
283        NeedsSummary {
284            files: needs_summary_rx,
285            task,
286        }
287    }
288
289    fn scan_entries(&self, worktree: Snapshot, cx: &App) -> Backlogged {
290        let (tx, rx) = channel::bounded(512);
291        let db_connection = self.db_connection.clone();
292        let digest_db = self.file_digest_db;
293        let backlog = Arc::clone(&self.backlog);
294        let task = cx.background_spawn(async move {
295            let txn = db_connection
296                .read_txn()
297                .context("failed to create read transaction")?;
298
299            for entry in worktree.files(false, 0) {
300                let needs_summary =
301                    Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
302
303                if !needs_summary.is_empty() {
304                    tx.send(needs_summary).await?;
305                }
306            }
307
308            // TODO delete db entries for deleted files
309
310            Ok(())
311        });
312
313        Backlogged {
314            paths_to_digest: rx,
315            task,
316        }
317    }
318
319    fn add_to_backlog(
320        backlog: Arc<Mutex<SummaryBacklog>>,
321        digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
322        txn: &RoTxn<'_>,
323        entry: &Entry,
324    ) -> Vec<(Arc<Path>, Option<MTime>)> {
325        let entry_db_key = db_key_for_path(&entry.path);
326
327        match digest_db.get(txn, &entry_db_key) {
328            Ok(opt_saved_digest) => {
329                // The file path is the same, but the mtime is different. (Or there was no mtime.)
330                // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
331                if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
332                    let mut backlog = backlog.lock();
333
334                    log::info!(
335                        "Inserting {:?} ({:?} bytes) into backlog",
336                        &entry.path,
337                        entry.size,
338                    );
339                    backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
340
341                    if backlog.needs_drain() {
342                        log::info!("Draining summary backlog...");
343                        return backlog.drain().collect();
344                    }
345                }
346            }
347            Err(err) => {
348                log::error!(
349                    "Error trying to get file digest db entry {:?}: {:?}",
350                    &entry_db_key,
351                    err
352                );
353            }
354        }
355
356        Vec::new()
357    }
358
359    fn scan_updated_entries(
360        &self,
361        worktree: Snapshot,
362        updated_entries: UpdatedEntriesSet,
363        cx: &App,
364    ) -> Backlogged {
365        log::info!("Scanning for updated entries that might need summarization...");
366        let (tx, rx) = channel::bounded(512);
367        // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
368        let db_connection = self.db_connection.clone();
369        let digest_db = self.file_digest_db;
370        let backlog = Arc::clone(&self.backlog);
371        let task = cx.background_spawn(async move {
372            let txn = db_connection
373                .read_txn()
374                .context("failed to create read transaction")?;
375
376            for (path, entry_id, status) in updated_entries.iter() {
377                match status {
378                    project::PathChange::Loaded
379                    | project::PathChange::Added
380                    | project::PathChange::Updated
381                    | project::PathChange::AddedOrUpdated => {
382                        if let Some(entry) = worktree.entry_for_id(*entry_id)
383                            && entry.is_file()
384                        {
385                            let needs_summary =
386                                Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
387
388                            if !needs_summary.is_empty() {
389                                tx.send(needs_summary).await?;
390                            }
391                        }
392                    }
393                    project::PathChange::Removed => {
394                        let _db_path = db_key_for_path(path);
395                        // TODO delete db entries for deleted files
396                        // deleted_entry_ranges_tx
397                        //     .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
398                        //     .await?;
399                    }
400                }
401            }
402
403            Ok(())
404        });
405
406        Backlogged {
407            paths_to_digest: rx,
408            // deleted_entry_ranges: deleted_entry_ranges_rx,
409            task,
410        }
411    }
412
413    fn digest_files(
414        &self,
415        paths: channel::Receiver<Vec<(Arc<Path>, Option<MTime>)>>,
416        worktree_abs_path: Arc<Path>,
417        cx: &App,
418    ) -> MightNeedSummaryFiles {
419        let fs = self.fs.clone();
420        let (rx, tx) = channel::bounded(2048);
421        let task = cx.spawn(async move |cx| {
422            cx.background_executor()
423                .scoped(|cx| {
424                    for _ in 0..cx.num_cpus() {
425                        cx.spawn(async {
426                            while let Ok(pairs) = paths.recv().await {
427                                // Note: we could process all these files concurrently if desired. Might or might not speed things up.
428                                for (path, mtime) in pairs {
429                                    let entry_abs_path = worktree_abs_path.join(&path);
430
431                                    // Load the file's contents and compute its hash digest.
432                                    let unsummarized_file = {
433                                        let Some(contents) = fs
434                                            .load(&entry_abs_path)
435                                            .await
436                                            .with_context(|| {
437                                                format!("failed to read path {entry_abs_path:?}")
438                                            })
439                                            .log_err()
440                                        else {
441                                            continue;
442                                        };
443
444                                        let digest = {
445                                            let mut hasher = blake3::Hasher::new();
446                                            // Incorporate both the (relative) file path as well as the contents of the file into the hash.
447                                            // This is because in some languages and frameworks, identical files can do different things
448                                            // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
449                                            hasher.update(path.display().to_string().as_bytes());
450                                            hasher.update(contents.as_bytes());
451                                            hasher.finalize().to_hex()
452                                        };
453
454                                        UnsummarizedFile {
455                                            digest,
456                                            contents,
457                                            path,
458                                            mtime,
459                                        }
460                                    };
461
462                                    if let Err(err) = rx
463                                        .send(unsummarized_file)
464                                        .map_err(|error| anyhow!(error))
465                                        .await
466                                    {
467                                        log::error!("Error: {:?}", err);
468
469                                        return;
470                                    }
471                                }
472                            }
473                        });
474                    }
475                })
476                .await;
477            Ok(())
478        });
479
480        MightNeedSummaryFiles { files: tx, task }
481    }
482
483    fn summarize_files(
484        &self,
485        unsummarized_files: channel::Receiver<UnsummarizedFile>,
486        cx: &App,
487    ) -> SummarizeFiles {
488        let (summarized_tx, summarized_rx) = channel::bounded(512);
489        let task = cx.spawn(async move |cx| {
490            while let Ok(file) = unsummarized_files.recv().await {
491                log::debug!("Summarizing {:?}", file);
492                let summary = cx
493                    .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
494                    .await
495                    .unwrap_or_else(|err| {
496                        // Log a warning because we'll continue anyway.
497                        // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
498                        // but this might give bad summaries due to cutting off source code files in the middle.
499                        log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
500
501                        String::new()
502                    });
503
504                // Note that the summary could be empty because of an error talking to a cloud provider,
505                // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
506                if !summary.is_empty() {
507                    summarized_tx
508                        .send(SummarizedFile {
509                            path: file.path.display().to_string(),
510                            digest: file.digest,
511                            summary,
512                            mtime: file.mtime,
513                        })
514                        .await?
515                }
516            }
517
518            Ok(())
519        });
520
521        SummarizeFiles {
522            files: summarized_rx,
523            task,
524        }
525    }
526
527    fn summarize_code(
528        code: &str,
529        path: &Path,
530        cx: &App,
531    ) -> impl Future<Output = Result<String>> + use<> {
532        let start = Instant::now();
533        let (summary_model_id, use_cache): (LanguageModelId, bool) = (
534            "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
535            false, // qwen2 doesn't have a cache, but we should probably infer this from the model
536        );
537        let Some(model) = LanguageModelRegistry::read_global(cx)
538            .available_models(cx)
539            .find(|model| &model.id() == &summary_model_id)
540        else {
541            return cx.background_spawn(async move {
542                anyhow::bail!("Couldn't find the preferred summarization model ({summary_model_id:?}) in the language registry's available models")
543            });
544        };
545        let utf8_path = path.to_string_lossy();
546        const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
547        let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
548
549        log::debug!(
550            "Summarizing code by sending this prompt to {:?}: {:?}",
551            model.name(),
552            &prompt
553        );
554
555        let request = LanguageModelRequest {
556            thread_id: None,
557            prompt_id: None,
558            mode: None,
559            intent: None,
560            messages: vec![LanguageModelRequestMessage {
561                role: Role::User,
562                content: vec![prompt.into()],
563                cache: use_cache,
564            }],
565            tools: Vec::new(),
566            tool_choice: None,
567            stop: Vec::new(),
568            temperature: None,
569            thinking_allowed: true,
570        };
571
572        let code_len = code.len();
573        cx.spawn(async move |cx| {
574            let stream = model.stream_completion(request, cx);
575            cx.background_spawn(async move {
576                let answer: String = stream
577                    .await?
578                    .filter_map(|event| async {
579                        if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
580                            Some(text)
581                        } else {
582                            None
583                        }
584                    })
585                    .collect()
586                    .await;
587
588                log::info!(
589                    "It took {:?} to summarize {:?} bytes of code.",
590                    start.elapsed(),
591                    code_len
592                );
593
594                log::debug!("Summary was: {:?}", &answer);
595
596                Ok(answer)
597            })
598            .await
599
600            // TODO if summarization failed, put it back in the backlog!
601        })
602    }
603
604    fn persist_summaries(
605        &self,
606        summaries: channel::Receiver<SummarizedFile>,
607        cx: &App,
608    ) -> Task<Result<()>> {
609        let db_connection = self.db_connection.clone();
610        let digest_db = self.file_digest_db;
611        let summary_db = self.summary_db;
612        cx.background_spawn(async move {
613            let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2)));
614            while let Some(summaries) = summaries.next().await {
615                let mut txn = db_connection.write_txn()?;
616                for file in &summaries {
617                    log::debug!(
618                        "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
619                        &file.path,
620                        file.summary.len(),
621                        file.digest
622                    );
623                    digest_db.put(
624                        &mut txn,
625                        &file.path,
626                        &FileDigest {
627                            mtime: file.mtime,
628                            digest: file.digest,
629                        },
630                    )?;
631                    summary_db.put(&mut txn, &file.digest, &file.summary)?;
632                }
633                txn.commit()?;
634
635                drop(summaries);
636                log::debug!("committed summaries");
637            }
638
639            Ok(())
640        })
641    }
642
643    /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
644    pub(crate) fn flush_backlog(
645        &self,
646        worktree_abs_path: Arc<Path>,
647        cx: &App,
648    ) -> impl Future<Output = Result<()>> + use<> {
649        let start = Instant::now();
650        let backlogged = {
651            let (tx, rx) = channel::bounded(512);
652            let needs_summary: Vec<(Arc<Path>, Option<MTime>)> = {
653                let mut backlog = self.backlog.lock();
654
655                backlog.drain().collect()
656            };
657
658            let task = cx.background_spawn(async move {
659                tx.send(needs_summary).await?;
660                Ok(())
661            });
662
663            Backlogged {
664                paths_to_digest: rx,
665                task,
666            }
667        };
668
669        let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
670        let needs_summary = self.check_summary_cache(digest.files, cx);
671        let summaries = self.summarize_files(needs_summary.files, cx);
672        let persist = self.persist_summaries(summaries.files, cx);
673
674        async move {
675            futures::try_join!(
676                backlogged.task,
677                digest.task,
678                needs_summary.task,
679                summaries.task,
680                persist
681            )?;
682
683            log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
684
685            Ok(())
686        }
687    }
688
689    pub(crate) fn backlog_len(&self) -> usize {
690        self.backlog.lock().len()
691    }
692}
693
694fn db_key_for_path(path: &Arc<Path>) -> String {
695    path.to_string_lossy().replace('/', "\0")
696}