example.rs

  1use agent::{RequestKind, ThreadEvent, ThreadStore};
  2use anyhow::{Context as _, Result, anyhow};
  3use assistant_tool::ToolWorkingSet;
  4use client::proto::LspWorkProgress;
  5use collections::HashMap;
  6use dap::DapRegistry;
  7use futures::channel::mpsc;
  8use futures::{FutureExt, StreamExt as _, select_biased};
  9use gpui::{App, AppContext as _, AsyncApp, Entity, Task};
 10use handlebars::Handlebars;
 11use language::{DiagnosticSeverity, OffsetRangeExt};
 12use language_model::{
 13    LanguageModel, LanguageModelRequest, LanguageModelRequestMessage, MessageContent, Role,
 14    StopReason, TokenUsage,
 15};
 16use project::{LspStore, Project, ProjectPath};
 17use serde::{Deserialize, Serialize};
 18use std::fmt::Write as _;
 19use std::fs::File;
 20use std::io::Write as _;
 21use std::sync::{Arc, Mutex};
 22use std::time::Duration;
 23use std::{
 24    fs,
 25    path::{Path, PathBuf},
 26};
 27use unindent::Unindent as _;
 28use util::ResultExt as _;
 29use util::command::new_smol_command;
 30use util::serde::default_true;
 31
 32use crate::AgentAppState;
 33
 34pub const EXAMPLES_DIR: &str = "./crates/eval/examples";
 35pub const REPOS_DIR: &str = "./crates/eval/repos";
 36pub const WORKTREES_DIR: &str = "./crates/eval/worktrees";
 37
 38const THREAD_EVENT_TIMEOUT: Duration = Duration::from_secs(60 * 2);
 39
 40#[derive(Clone, Debug, Deserialize)]
 41pub struct ExampleBase {
 42    pub url: String,
 43    pub revision: String,
 44    pub language_extension: Option<String>,
 45    pub insert_id: Option<String>,
 46    #[serde(default = "default_true")]
 47    pub require_lsp: bool,
 48}
 49
 50#[derive(Clone, Debug)]
 51pub struct Example {
 52    pub name: String,
 53    /// Content of `base.toml`
 54    pub base: ExampleBase,
 55    /// Content of `prompt.md`
 56    pub prompt: String,
 57    /// Content of `criteria.md`
 58    pub criteria: String,
 59    /// Markdown output file to append to
 60    pub output_file: Arc<Mutex<File>>,
 61    /// Path to markdown output file
 62    pub output_file_path: PathBuf,
 63    /// Prefix used for logging that identifies this example
 64    pub log_prefix: String,
 65}
 66
 67#[derive(Debug, Serialize, Deserialize, Clone)]
 68pub struct RunOutput {
 69    pub repository_diff: String,
 70    pub diagnostics: String,
 71    pub response_count: usize,
 72    pub token_usage: TokenUsage,
 73    pub tool_use_counts: HashMap<Arc<str>, u32>,
 74}
 75
 76#[derive(Debug, Clone, Serialize, Deserialize)]
 77pub struct JudgeInput {
 78    pub repository_diff: String,
 79    pub criteria: String,
 80}
 81
 82#[derive(Debug, Clone, Serialize, Deserialize)]
 83pub struct JudgeOutput {
 84    pub analysis: String,
 85    pub score: u32,
 86}
 87
 88impl Example {
 89    /// Load an example from a directory containing base.toml, prompt.md, and criteria.md
 90    pub fn load_from_directory(dir_path: &Path, run_dir: &Path) -> Result<Self> {
 91        let name = Self::name_from_path(dir_path);
 92        let base_path = dir_path.join("base.toml");
 93        let prompt_path = dir_path.join("prompt.md");
 94        let criteria_path = dir_path.join("criteria.md");
 95
 96        let output_file_path = run_dir.join(format!(
 97            "{}.md",
 98            dir_path.file_name().unwrap().to_str().unwrap()
 99        ));
100        let output_file = Arc::new(Mutex::new(File::create(&output_file_path).unwrap()));
101
102        Ok(Example {
103            name: name.clone(),
104            base: toml::from_str(&fs::read_to_string(&base_path)?)?,
105            prompt: fs::read_to_string(prompt_path.clone())?,
106            criteria: fs::read_to_string(criteria_path.clone())?,
107            output_file,
108            output_file_path,
109            log_prefix: name,
110        })
111    }
112
113    pub fn set_log_prefix_style(&mut self, color: &str, name_width: usize) {
114        self.log_prefix = format!(
115            "{}{:<width$}\x1b[0m | ",
116            color,
117            self.name,
118            width = name_width
119        );
120    }
121
122    pub fn name_from_path(path: &Path) -> String {
123        path.file_name().unwrap().to_string_lossy().to_string()
124    }
125
126    pub fn worktree_path(&self) -> PathBuf {
127        Path::new(WORKTREES_DIR)
128            .canonicalize()
129            .context(format!("No such directory {WORKTREES_DIR}"))
130            .unwrap()
131            .join(&self.name)
132    }
133
134    /// Set up the example by checking out the specified Git revision
135    pub async fn setup(&self) -> Result<()> {
136        let repo_path = repo_path_for_url(&self.base.url);
137
138        println!("{}Fetching", self.log_prefix);
139
140        run_git(
141            &repo_path,
142            &["fetch", "--depth", "1", "origin", &self.base.revision],
143        )
144        .await?;
145
146        let worktree_path = self.worktree_path();
147
148        if worktree_path.is_dir() {
149            println!("{}Resetting existing worktree", self.log_prefix);
150
151            // TODO: consider including "-x" to remove ignored files. The downside of this is that
152            // it will also remove build artifacts, and so prevent incremental reuse there.
153            run_git(&worktree_path, &["clean", "--force", "-d"]).await?;
154            run_git(&worktree_path, &["reset", "--hard", "HEAD"]).await?;
155            run_git(&worktree_path, &["checkout", &self.base.revision]).await?;
156        } else {
157            println!("{}Creating worktree", self.log_prefix);
158
159            let worktree_path_string = worktree_path.to_string_lossy().to_string();
160
161            run_git(
162                &repo_path,
163                &[
164                    "worktree",
165                    "add",
166                    "-f",
167                    &worktree_path_string,
168                    &self.base.revision,
169                ],
170            )
171            .await?;
172        }
173
174        Ok(())
175    }
176
177    pub fn run(
178        &self,
179        model: Arc<dyn LanguageModel>,
180        app_state: Arc<AgentAppState>,
181        cx: &mut App,
182    ) -> Task<Result<RunOutput>> {
183        let project = Project::local(
184            app_state.client.clone(),
185            app_state.node_runtime.clone(),
186            app_state.user_store.clone(),
187            app_state.languages.clone(),
188            Arc::new(DapRegistry::default()),
189            app_state.fs.clone(),
190            None,
191            cx,
192        );
193
194        let worktree_path = self.worktree_path();
195        let worktree = project.update(cx, |project, cx| {
196            project.create_worktree(&worktree_path, true, cx)
197        });
198
199        let tools = cx.new(|_| ToolWorkingSet::default());
200        let thread_store =
201            ThreadStore::load(project.clone(), tools, app_state.prompt_builder.clone(), cx);
202        let this = self.clone();
203
204        cx.spawn(async move |cx| {
205            let worktree = worktree.await?;
206
207            // Wait for worktree scan to finish before choosing a file to open.
208            worktree
209                .update(cx, |worktree, _cx| {
210                    worktree.as_local().unwrap().scan_complete()
211                })?
212                .await;
213
214            let lsp_open_handle_and_store = if this.base.require_lsp {
215                let language_extension = this.base.language_extension.as_deref().context(
216                    "language_extension field is required in base.toml when `require_lsp == true`",
217                )?;
218
219                // Open a file that matches the language to cause LSP to start.
220                let language_file = worktree.read_with(cx, |worktree, _cx| {
221                    worktree
222                        .files(false, 0)
223                        .find_map(|e| {
224                            if e.path.clone().extension().and_then(|ext| ext.to_str())
225                                == Some(language_extension)
226                            {
227                                Some(ProjectPath {
228                                    worktree_id: worktree.id(),
229                                    path: e.path.clone(),
230                                })
231                            } else {
232                                None
233                            }
234                        })
235                        .context("Failed to find a file for example language")
236                })??;
237
238                let open_language_file_buffer_task = project.update(cx, |project, cx| {
239                    project.open_buffer(language_file.clone(), cx)
240                })?;
241
242                let language_file_buffer = open_language_file_buffer_task.await?;
243
244                let (lsp_open_handle, lsp_store) = project.update(cx, |project, cx| {
245                    (
246                        project.register_buffer_with_language_servers(&language_file_buffer, cx),
247                        project.lsp_store().clone(),
248                    )
249                })?;
250
251                // TODO: remove this once the diagnostics tool waits for new diagnostics
252                cx.background_executor().timer(Duration::new(5, 0)).await;
253                wait_for_lang_server(&lsp_store, this.log_prefix.clone(), cx).await?;
254
255                lsp_store.update(cx, |lsp_store, cx| {
256                    lsp_open_handle.update(cx, |buffer, cx| {
257                        buffer.update(cx, |buffer, cx| {
258                            let has_language_server = lsp_store
259                                .language_servers_for_local_buffer(buffer, cx)
260                                .next()
261                                .is_some();
262                            if has_language_server {
263                                Ok(())
264                            } else {
265                                Err(anyhow!(
266                                    "`{:?}` was opened to cause the language server to start, \
267                                    but no language servers are registered for its buffer. \
268                                    Set `require_lsp = false` in `base.toml` to skip this.",
269                                    language_file
270                                ))
271                            }
272                        })
273                    })
274                })??;
275
276                Some((lsp_open_handle, lsp_store))
277            } else {
278                None
279            };
280
281            if std::env::var("ZED_EVAL_SETUP_ONLY").is_ok() {
282                return Err(anyhow!("Setup only mode"));
283            }
284
285            let thread_store = thread_store.await;
286            let thread =
287                thread_store.update(cx, |thread_store, cx| thread_store.create_thread(cx))?;
288
289            {
290                let mut output_file = this.output_file.lock().unwrap();
291                writeln!(&mut output_file, "👤 USER:").log_err();
292                writeln!(&mut output_file, "{}", this.prompt).log_err();
293                writeln!(&mut output_file, "🤖 ASSISTANT:").log_err();
294                output_file.flush().log_err();
295            }
296
297            let tool_use_counts: Arc<Mutex<HashMap<Arc<str>, u32>>> =
298                Mutex::new(HashMap::default()).into();
299
300            let (thread_event_tx, mut thread_event_rx) = mpsc::unbounded();
301
302            let subscription = cx.subscribe(&thread, move |_thread, event: &ThreadEvent, _cx| {
303                thread_event_tx.unbounded_send(event.clone()).log_err();
304            });
305
306            let event_handler_task = cx.spawn({
307                let output_file = this.output_file.clone();
308                let log_prefix = this.log_prefix.clone();
309                let tool_use_counts = tool_use_counts.clone();
310                let thread = thread.downgrade();
311                async move |cx| {
312                    loop {
313                        let event = select_biased! {
314                            event = thread_event_rx.next() => event,
315                            _ = cx.background_executor().timer(THREAD_EVENT_TIMEOUT).fuse() => {
316                                return Err(anyhow!("Agentic loop stalled - waited {:?} without any events", THREAD_EVENT_TIMEOUT));
317                            }
318                        };
319                        let Some(event) = event else {
320                            return Err(anyhow!("ThreadEvent channel ended early"));
321                        };
322
323                        let mut output_file = output_file.lock().unwrap();
324
325                        match event {
326                            ThreadEvent::Stopped(reason) => match reason {
327                                Ok(StopReason::EndTurn) => {
328                                    return Ok(());
329                                }
330                                Ok(StopReason::MaxTokens) => {
331                                    return Err(anyhow!("Exceeded maximum tokens"));
332                                }
333                                Ok(StopReason::ToolUse) => {}
334                                Err(error) => {
335                                    return Err(anyhow!(error.clone()));
336                                }
337                            },
338                            ThreadEvent::ShowError(thread_error) => {
339                                break Err(anyhow!(thread_error.clone()));
340                            }
341                            ThreadEvent::StreamedAssistantText(_, chunk) => {
342                                write!(&mut output_file, "{}", chunk).log_err();
343                            }
344                            ThreadEvent::StreamedAssistantThinking(_, chunk) => {
345                                write!(&mut output_file, "{}", chunk).log_err();
346                            }
347                            ThreadEvent::UsePendingTools { tool_uses } => {
348                                writeln!(&mut output_file, "\n\nUSING TOOLS:").log_err();
349                                for tool_use in tool_uses {
350                                    writeln!(&mut output_file, "{}: {}", tool_use.name, tool_use.input)
351                                        .log_err();
352                                }
353                            }
354                            ThreadEvent::ToolFinished {
355                                tool_use_id,
356                                pending_tool_use,
357                                ..
358                            } => {
359                                if let Some(tool_use) = pending_tool_use {
360                                    let message = format!("TOOL FINISHED: {}", tool_use.name);
361                                    println!("{}{message}", log_prefix);
362                                    writeln!(&mut output_file, "\n{}", message).log_err();
363                                }
364                                thread.update(cx, |thread, _cx| {
365                                    if let Some(tool_result) = thread.tool_result(&tool_use_id) {
366                                        writeln!(&mut output_file, "\n{}\n", tool_result.content).log_err();
367                                        let mut tool_use_counts = tool_use_counts.lock().unwrap();
368                                        *tool_use_counts
369                                            .entry(tool_result.tool_name.clone())
370                                            .or_insert(0) += 1;
371                                    }
372                                })?;
373                            }
374                            _ => {}
375                        }
376
377                        output_file.flush().log_err();
378                    }
379                }
380            });
381
382            thread.update(cx, |thread, cx| {
383                let context = vec![];
384                thread.insert_user_message(this.prompt.clone(), context, None, cx);
385                thread.send_to_model(model, RequestKind::Chat, cx);
386            })?;
387
388            event_handler_task.await?;
389
390            if let Some((_, lsp_store)) = lsp_open_handle_and_store.as_ref() {
391                wait_for_lang_server(lsp_store, this.log_prefix.clone(), cx).await?;
392            }
393
394            let repository_diff = this.repository_diff().await?;
395            let diagnostics = cx
396                .update(move |cx| {
397                    cx.spawn(async move |cx| query_lsp_diagnostics(project, cx).await)
398                })?
399                .await?;
400
401            drop(subscription);
402            drop(lsp_open_handle_and_store);
403
404            thread.update(cx, |thread, _cx| {
405                let response_count = thread
406                    .messages()
407                    .filter(|message| message.role == language_model::Role::Assistant)
408                    .count();
409                RunOutput {
410                    repository_diff,
411                    diagnostics,
412                    response_count,
413                    token_usage: thread.cumulative_token_usage(),
414                    tool_use_counts: tool_use_counts.lock().unwrap().clone(),
415                }
416            })
417        })
418    }
419
420    pub async fn judge(
421        &self,
422        model: Arc<dyn LanguageModel>,
423        repository_diff: String,
424        cx: &AsyncApp,
425    ) -> Result<JudgeOutput> {
426        let judge_prompt = include_str!("judge_prompt.hbs");
427        let judge_prompt_name = "judge_prompt";
428        let mut handlebars = Handlebars::new();
429        handlebars.register_template_string(judge_prompt_name, judge_prompt)?;
430        let prompt = handlebars.render(
431            judge_prompt_name,
432            &JudgeInput {
433                repository_diff,
434                criteria: self.criteria.clone(),
435            },
436        )?;
437
438        let request = LanguageModelRequest {
439            messages: vec![LanguageModelRequestMessage {
440                role: Role::User,
441                content: vec![MessageContent::Text(prompt)],
442                cache: false,
443            }],
444            temperature: None,
445            tools: Vec::new(),
446            stop: Vec::new(),
447        };
448
449        let response = send_language_model_request(model, request, cx).await?;
450
451        let mut output_file = self.output_file.lock().unwrap();
452
453        writeln!(&mut output_file, "\n\n").log_err();
454        writeln!(&mut output_file, "========================================").log_err();
455        writeln!(&mut output_file, "              JUDGE OUTPUT              ").log_err();
456        writeln!(&mut output_file, "========================================").log_err();
457        writeln!(&mut output_file, "\n{}", &response).log_err();
458
459        parse_judge_output(&response)
460    }
461
462    pub async fn repository_diff(&self) -> Result<String> {
463        let worktree_path = self.worktree_path();
464        run_git(&worktree_path, &["add", "-N"]).await?;
465        run_git(&worktree_path, &["diff"]).await
466    }
467}
468
469fn wait_for_lang_server(
470    lsp_store: &Entity<LspStore>,
471    log_prefix: String,
472    cx: &mut AsyncApp,
473) -> Task<Result<()>> {
474    if cx
475        .update(|cx| !has_pending_lang_server_work(lsp_store, cx))
476        .unwrap()
477        || std::env::var("ZED_EVAL_SKIP_LS_WAIT").is_ok()
478    {
479        return Task::ready(anyhow::Ok(()));
480    }
481
482    println!("{}⏵ Waiting for language server", log_prefix);
483
484    let (mut tx, mut rx) = mpsc::channel(1);
485
486    let subscription =
487        cx.subscribe(&lsp_store, {
488            let log_prefix = log_prefix.clone();
489            move |lsp_store, event, cx| {
490                match event {
491                    project::LspStoreEvent::LanguageServerUpdate {
492                        message:
493                            client::proto::update_language_server::Variant::WorkProgress(
494                                LspWorkProgress {
495                                    message: Some(message),
496                                    ..
497                                },
498                            ),
499                        ..
500                    } => println!("{}{message}", log_prefix),
501                    _ => {}
502                }
503
504                if !has_pending_lang_server_work(&lsp_store, cx) {
505                    tx.try_send(()).ok();
506                }
507            }
508        });
509
510    cx.spawn(async move |cx| {
511        let timeout = cx.background_executor().timer(Duration::new(60 * 5, 0));
512        let result = futures::select! {
513            _ = rx.next() => {
514                println!("{}⚑ Language server idle", log_prefix);
515                anyhow::Ok(())
516            },
517            _ = timeout.fuse() => {
518                Err(anyhow!("LSP wait timed out after 5 minutes"))
519            }
520        };
521        drop(subscription);
522        result
523    })
524}
525
526fn has_pending_lang_server_work(lsp_store: &Entity<LspStore>, cx: &App) -> bool {
527    lsp_store
528        .read(cx)
529        .language_server_statuses()
530        .any(|(_, status)| !status.pending_work.is_empty())
531}
532
533async fn query_lsp_diagnostics(project: Entity<Project>, cx: &mut AsyncApp) -> Result<String> {
534    let paths_with_diagnostics = project.update(cx, |project, cx| {
535        project
536            .diagnostic_summaries(true, cx)
537            .filter(|(_, _, summary)| summary.error_count > 0 || summary.warning_count > 0)
538            .map(|(project_path, _, _)| project_path)
539            .collect::<Vec<_>>()
540    })?;
541
542    let mut output = String::new();
543    for project_path in paths_with_diagnostics {
544        let buffer = project
545            .update(cx, |project, cx| project.open_buffer(project_path, cx))?
546            .await?;
547        let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot())?;
548
549        for (_, group) in snapshot.diagnostic_groups(None) {
550            let entry = &group.entries[group.primary_ix];
551            let range = entry.range.to_point(&snapshot);
552            let severity = match entry.diagnostic.severity {
553                DiagnosticSeverity::ERROR => "error",
554                DiagnosticSeverity::WARNING => "warning",
555                _ => continue,
556            };
557
558            writeln!(
559                output,
560                "{} at line {}: {}",
561                severity,
562                range.start.row + 1,
563                entry.diagnostic.message
564            )?;
565        }
566    }
567    anyhow::Ok(output)
568}
569
570fn parse_judge_output(response: &str) -> Result<JudgeOutput> {
571    let analysis = get_tag("analysis", response)?.to_string();
572    let score = get_tag("score", response)?
573        .parse()
574        .context("error parsing score")?;
575
576    Ok(JudgeOutput { analysis, score })
577}
578
579fn get_tag(name: &'static str, response: &str) -> Result<String> {
580    let start_tag = format!("<{}>", name);
581    let end_tag = format!("</{}>", name);
582
583    let start_ix = response
584        .find(&start_tag)
585        .context(format!("{} start tag not found", name))?;
586    let content_start_ix = start_ix + start_tag.len();
587
588    let end_ix = content_start_ix
589        + response[content_start_ix..]
590            .find(&end_tag)
591            .context(format!("{} end tag not found", name))?;
592
593    let content = response[content_start_ix..end_ix].trim().unindent();
594
595    anyhow::Ok(content)
596}
597
598pub fn repo_path_for_url(repo_url: &str) -> PathBuf {
599    let repo_name = repo_url
600        .trim_start_matches("https://")
601        .replace(|c: char| !c.is_alphanumeric(), "-");
602    Path::new(REPOS_DIR)
603        .canonicalize()
604        .context(format!("No such directory {REPOS_DIR}"))
605        .unwrap()
606        .join(repo_name)
607}
608
609pub async fn run_git(repo_path: &Path, args: &[&str]) -> Result<String> {
610    let output = new_smol_command("git")
611        .current_dir(repo_path)
612        .args(args)
613        .output()
614        .await?;
615
616    if output.status.success() {
617        Ok(String::from_utf8(output.stdout)?.trim().to_string())
618    } else {
619        Err(anyhow!(
620            "`git {}` within `{}` failed with status: {}\nstderr:\n{}\nstdout:\n{}",
621            args.join(" "),
622            repo_path.display(),
623            output.status,
624            String::from_utf8_lossy(&output.stderr),
625            String::from_utf8_lossy(&output.stdout),
626        ))
627    }
628}
629
630pub async fn send_language_model_request(
631    model: Arc<dyn LanguageModel>,
632    request: LanguageModelRequest,
633    cx: &AsyncApp,
634) -> anyhow::Result<String> {
635    match model.stream_completion_text(request, &cx).await {
636        Ok(mut stream) => {
637            let mut full_response = String::new();
638            while let Some(chunk_result) = stream.stream.next().await {
639                match chunk_result {
640                    Ok(chunk_str) => {
641                        full_response.push_str(&chunk_str);
642                    }
643                    Err(err) => {
644                        return Err(anyhow!(
645                            "Error receiving response from language model: {err}"
646                        ));
647                    }
648                }
649            }
650            Ok(full_response)
651        }
652        Err(err) => Err(anyhow!(
653            "Failed to get response from language model. Error was: {err}"
654        )),
655    }
656}
657
658#[cfg(test)]
659mod test {
660    use super::*;
661
662    #[test]
663    fn test_parse_judge_output() {
664        let response = r#"
665            <analysis>The model did a good job but there were still compilations errors.</analysis>
666            <score>3</score>
667        "#
668        .unindent();
669
670        let output = parse_judge_output(&response).unwrap();
671        assert_eq!(
672            output.analysis,
673            "The model did a good job but there were still compilations errors."
674        );
675        assert_eq!(output.score, 3);
676
677        let response = r#"
678            Text around ignored
679
680            <analysis>
681                Failed to compile:
682                - Error 1
683                - Error 2
684            </analysis>
685
686            <score>1</score>
687        "#
688        .unindent();
689
690        let output = parse_judge_output(&response).unwrap();
691        assert_eq!(output.analysis, "Failed to compile:\n- Error 1\n- Error 2");
692        assert_eq!(output.score, 1);
693    }
694}