mod.rs

   1use super::*;
   2use acp_thread::{AgentConnection, AgentModelGroupName, AgentModelList, UserMessageId};
   3use agent_client_protocol::{self as acp};
   4use agent_settings::AgentProfileId;
   5use anyhow::Result;
   6use client::{Client, UserStore};
   7use cloud_llm_client::CompletionIntent;
   8use collections::IndexMap;
   9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  10use feature_flags::FeatureFlagAppExt as _;
  11use fs::{FakeFs, Fs};
  12use futures::{
  13    FutureExt as _, StreamExt,
  14    channel::{
  15        mpsc::{self, UnboundedReceiver},
  16        oneshot,
  17    },
  18    future::{Fuse, Shared},
  19};
  20use gpui::{
  21    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  22    http_client::FakeHttpClient,
  23};
  24use indoc::indoc;
  25use language_model::{
  26    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  27    LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  28    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  29    LanguageModelToolUse, MessageContent, Role, StopReason, fake_provider::FakeLanguageModel,
  30};
  31use pretty_assertions::assert_eq;
  32use project::{
  33    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  34};
  35use prompt_store::ProjectContext;
  36use reqwest_client::ReqwestClient;
  37use schemars::JsonSchema;
  38use serde::{Deserialize, Serialize};
  39use serde_json::json;
  40use settings::{Settings, SettingsStore};
  41use std::{
  42    path::Path,
  43    pin::Pin,
  44    rc::Rc,
  45    sync::{
  46        Arc,
  47        atomic::{AtomicBool, Ordering},
  48    },
  49    time::Duration,
  50};
  51use util::path;
  52
  53mod test_tools;
  54use test_tools::*;
  55
  56fn init_test(cx: &mut TestAppContext) {
  57    cx.update(|cx| {
  58        let settings_store = SettingsStore::test(cx);
  59        cx.set_global(settings_store);
  60    });
  61}
  62
  63struct FakeTerminalHandle {
  64    killed: Arc<AtomicBool>,
  65    stopped_by_user: Arc<AtomicBool>,
  66    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  67    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  68    output: acp::TerminalOutputResponse,
  69    id: acp::TerminalId,
  70}
  71
  72impl FakeTerminalHandle {
  73    fn new_never_exits(cx: &mut App) -> Self {
  74        let killed = Arc::new(AtomicBool::new(false));
  75        let stopped_by_user = Arc::new(AtomicBool::new(false));
  76
  77        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  78
  79        let wait_for_exit = cx
  80            .spawn(async move |_cx| {
  81                // Wait for the exit signal (sent when kill() is called)
  82                let _ = exit_receiver.await;
  83                acp::TerminalExitStatus::new()
  84            })
  85            .shared();
  86
  87        Self {
  88            killed,
  89            stopped_by_user,
  90            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  91            wait_for_exit,
  92            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  93            id: acp::TerminalId::new("fake_terminal".to_string()),
  94        }
  95    }
  96
  97    fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
  98        let killed = Arc::new(AtomicBool::new(false));
  99        let stopped_by_user = Arc::new(AtomicBool::new(false));
 100        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 101
 102        let wait_for_exit = cx
 103            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 104            .shared();
 105
 106        Self {
 107            killed,
 108            stopped_by_user,
 109            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 110            wait_for_exit,
 111            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 112            id: acp::TerminalId::new("fake_terminal".to_string()),
 113        }
 114    }
 115
 116    fn was_killed(&self) -> bool {
 117        self.killed.load(Ordering::SeqCst)
 118    }
 119
 120    fn set_stopped_by_user(&self, stopped: bool) {
 121        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 122    }
 123
 124    fn signal_exit(&self) {
 125        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 126            let _ = sender.send(());
 127        }
 128    }
 129}
 130
 131impl crate::TerminalHandle for FakeTerminalHandle {
 132    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 133        Ok(self.id.clone())
 134    }
 135
 136    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 137        Ok(self.output.clone())
 138    }
 139
 140    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 141        Ok(self.wait_for_exit.clone())
 142    }
 143
 144    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 145        self.killed.store(true, Ordering::SeqCst);
 146        self.signal_exit();
 147        Ok(())
 148    }
 149
 150    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 151        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 152    }
 153}
 154
 155struct FakeThreadEnvironment {
 156    handle: Rc<FakeTerminalHandle>,
 157}
 158
 159impl crate::ThreadEnvironment for FakeThreadEnvironment {
 160    fn create_terminal(
 161        &self,
 162        _command: String,
 163        _cwd: Option<std::path::PathBuf>,
 164        _output_byte_limit: Option<u64>,
 165        _cx: &mut AsyncApp,
 166    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 167        Task::ready(Ok(self.handle.clone() as Rc<dyn crate::TerminalHandle>))
 168    }
 169}
 170
 171/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 172struct MultiTerminalEnvironment {
 173    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 174}
 175
 176impl MultiTerminalEnvironment {
 177    fn new() -> Self {
 178        Self {
 179            handles: std::cell::RefCell::new(Vec::new()),
 180        }
 181    }
 182
 183    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 184        self.handles.borrow().clone()
 185    }
 186}
 187
 188impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 189    fn create_terminal(
 190        &self,
 191        _command: String,
 192        _cwd: Option<std::path::PathBuf>,
 193        _output_byte_limit: Option<u64>,
 194        cx: &mut AsyncApp,
 195    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 196        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 197        self.handles.borrow_mut().push(handle.clone());
 198        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 199    }
 200}
 201
 202fn always_allow_tools(cx: &mut TestAppContext) {
 203    cx.update(|cx| {
 204        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 205        settings.always_allow_tool_actions = true;
 206        agent_settings::AgentSettings::override_global(settings, cx);
 207    });
 208}
 209
 210#[gpui::test]
 211async fn test_echo(cx: &mut TestAppContext) {
 212    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 213    let fake_model = model.as_fake();
 214
 215    let events = thread
 216        .update(cx, |thread, cx| {
 217            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 218        })
 219        .unwrap();
 220    cx.run_until_parked();
 221    fake_model.send_last_completion_stream_text_chunk("Hello");
 222    fake_model
 223        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 224    fake_model.end_last_completion_stream();
 225
 226    let events = events.collect().await;
 227    thread.update(cx, |thread, _cx| {
 228        assert_eq!(
 229            thread.last_message().unwrap().to_markdown(),
 230            indoc! {"
 231                ## Assistant
 232
 233                Hello
 234            "}
 235        )
 236    });
 237    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 238}
 239
 240#[gpui::test]
 241async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 242    init_test(cx);
 243    always_allow_tools(cx);
 244
 245    let fs = FakeFs::new(cx.executor());
 246    let project = Project::test(fs, [], cx).await;
 247
 248    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 249    let environment = Rc::new(FakeThreadEnvironment {
 250        handle: handle.clone(),
 251    });
 252
 253    #[allow(clippy::arc_with_non_send_sync)]
 254    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 255    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 256
 257    let task = cx.update(|cx| {
 258        tool.run(
 259            crate::TerminalToolInput {
 260                command: "sleep 1000".to_string(),
 261                cd: ".".to_string(),
 262                timeout_ms: Some(5),
 263            },
 264            event_stream,
 265            cx,
 266        )
 267    });
 268
 269    let update = rx.expect_update_fields().await;
 270    assert!(
 271        update.content.iter().any(|blocks| {
 272            blocks
 273                .iter()
 274                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 275        }),
 276        "expected tool call update to include terminal content"
 277    );
 278
 279    let mut task_future: Pin<Box<Fuse<Task<Result<String>>>>> = Box::pin(task.fuse());
 280
 281    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 282    loop {
 283        if let Some(result) = task_future.as_mut().now_or_never() {
 284            let result = result.expect("terminal tool task should complete");
 285
 286            assert!(
 287                handle.was_killed(),
 288                "expected terminal handle to be killed on timeout"
 289            );
 290            assert!(
 291                result.contains("partial output"),
 292                "expected result to include terminal output, got: {result}"
 293            );
 294            return;
 295        }
 296
 297        if std::time::Instant::now() >= deadline {
 298            panic!("timed out waiting for terminal tool task to complete");
 299        }
 300
 301        cx.run_until_parked();
 302        cx.background_executor.timer(Duration::from_millis(1)).await;
 303    }
 304}
 305
 306#[gpui::test]
 307#[ignore]
 308async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 309    init_test(cx);
 310    always_allow_tools(cx);
 311
 312    let fs = FakeFs::new(cx.executor());
 313    let project = Project::test(fs, [], cx).await;
 314
 315    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 316    let environment = Rc::new(FakeThreadEnvironment {
 317        handle: handle.clone(),
 318    });
 319
 320    #[allow(clippy::arc_with_non_send_sync)]
 321    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 322    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 323
 324    let _task = cx.update(|cx| {
 325        tool.run(
 326            crate::TerminalToolInput {
 327                command: "sleep 1000".to_string(),
 328                cd: ".".to_string(),
 329                timeout_ms: None,
 330            },
 331            event_stream,
 332            cx,
 333        )
 334    });
 335
 336    let update = rx.expect_update_fields().await;
 337    assert!(
 338        update.content.iter().any(|blocks| {
 339            blocks
 340                .iter()
 341                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 342        }),
 343        "expected tool call update to include terminal content"
 344    );
 345
 346    cx.background_executor
 347        .timer(Duration::from_millis(25))
 348        .await;
 349
 350    assert!(
 351        !handle.was_killed(),
 352        "did not expect terminal handle to be killed without a timeout"
 353    );
 354}
 355
 356#[gpui::test]
 357async fn test_thinking(cx: &mut TestAppContext) {
 358    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 359    let fake_model = model.as_fake();
 360
 361    let events = thread
 362        .update(cx, |thread, cx| {
 363            thread.send(
 364                UserMessageId::new(),
 365                [indoc! {"
 366                    Testing:
 367
 368                    Generate a thinking step where you just think the word 'Think',
 369                    and have your final answer be 'Hello'
 370                "}],
 371                cx,
 372            )
 373        })
 374        .unwrap();
 375    cx.run_until_parked();
 376    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 377        text: "Think".to_string(),
 378        signature: None,
 379    });
 380    fake_model.send_last_completion_stream_text_chunk("Hello");
 381    fake_model
 382        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 383    fake_model.end_last_completion_stream();
 384
 385    let events = events.collect().await;
 386    thread.update(cx, |thread, _cx| {
 387        assert_eq!(
 388            thread.last_message().unwrap().to_markdown(),
 389            indoc! {"
 390                ## Assistant
 391
 392                <think>Think</think>
 393                Hello
 394            "}
 395        )
 396    });
 397    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 398}
 399
 400#[gpui::test]
 401async fn test_system_prompt(cx: &mut TestAppContext) {
 402    let ThreadTest {
 403        model,
 404        thread,
 405        project_context,
 406        ..
 407    } = setup(cx, TestModel::Fake).await;
 408    let fake_model = model.as_fake();
 409
 410    project_context.update(cx, |project_context, _cx| {
 411        project_context.shell = "test-shell".into()
 412    });
 413    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 414    thread
 415        .update(cx, |thread, cx| {
 416            thread.send(UserMessageId::new(), ["abc"], cx)
 417        })
 418        .unwrap();
 419    cx.run_until_parked();
 420    let mut pending_completions = fake_model.pending_completions();
 421    assert_eq!(
 422        pending_completions.len(),
 423        1,
 424        "unexpected pending completions: {:?}",
 425        pending_completions
 426    );
 427
 428    let pending_completion = pending_completions.pop().unwrap();
 429    assert_eq!(pending_completion.messages[0].role, Role::System);
 430
 431    let system_message = &pending_completion.messages[0];
 432    let system_prompt = system_message.content[0].to_str().unwrap();
 433    assert!(
 434        system_prompt.contains("test-shell"),
 435        "unexpected system message: {:?}",
 436        system_message
 437    );
 438    assert!(
 439        system_prompt.contains("## Fixing Diagnostics"),
 440        "unexpected system message: {:?}",
 441        system_message
 442    );
 443}
 444
 445#[gpui::test]
 446async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 447    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 448    let fake_model = model.as_fake();
 449
 450    thread
 451        .update(cx, |thread, cx| {
 452            thread.send(UserMessageId::new(), ["abc"], cx)
 453        })
 454        .unwrap();
 455    cx.run_until_parked();
 456    let mut pending_completions = fake_model.pending_completions();
 457    assert_eq!(
 458        pending_completions.len(),
 459        1,
 460        "unexpected pending completions: {:?}",
 461        pending_completions
 462    );
 463
 464    let pending_completion = pending_completions.pop().unwrap();
 465    assert_eq!(pending_completion.messages[0].role, Role::System);
 466
 467    let system_message = &pending_completion.messages[0];
 468    let system_prompt = system_message.content[0].to_str().unwrap();
 469    assert!(
 470        !system_prompt.contains("## Tool Use"),
 471        "unexpected system message: {:?}",
 472        system_message
 473    );
 474    assert!(
 475        !system_prompt.contains("## Fixing Diagnostics"),
 476        "unexpected system message: {:?}",
 477        system_message
 478    );
 479}
 480
 481#[gpui::test]
 482async fn test_prompt_caching(cx: &mut TestAppContext) {
 483    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 484    let fake_model = model.as_fake();
 485
 486    // Send initial user message and verify it's cached
 487    thread
 488        .update(cx, |thread, cx| {
 489            thread.send(UserMessageId::new(), ["Message 1"], cx)
 490        })
 491        .unwrap();
 492    cx.run_until_parked();
 493
 494    let completion = fake_model.pending_completions().pop().unwrap();
 495    assert_eq!(
 496        completion.messages[1..],
 497        vec![LanguageModelRequestMessage {
 498            role: Role::User,
 499            content: vec!["Message 1".into()],
 500            cache: true,
 501            reasoning_details: None,
 502        }]
 503    );
 504    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 505        "Response to Message 1".into(),
 506    ));
 507    fake_model.end_last_completion_stream();
 508    cx.run_until_parked();
 509
 510    // Send another user message and verify only the latest is cached
 511    thread
 512        .update(cx, |thread, cx| {
 513            thread.send(UserMessageId::new(), ["Message 2"], cx)
 514        })
 515        .unwrap();
 516    cx.run_until_parked();
 517
 518    let completion = fake_model.pending_completions().pop().unwrap();
 519    assert_eq!(
 520        completion.messages[1..],
 521        vec![
 522            LanguageModelRequestMessage {
 523                role: Role::User,
 524                content: vec!["Message 1".into()],
 525                cache: false,
 526                reasoning_details: None,
 527            },
 528            LanguageModelRequestMessage {
 529                role: Role::Assistant,
 530                content: vec!["Response to Message 1".into()],
 531                cache: false,
 532                reasoning_details: None,
 533            },
 534            LanguageModelRequestMessage {
 535                role: Role::User,
 536                content: vec!["Message 2".into()],
 537                cache: true,
 538                reasoning_details: None,
 539            }
 540        ]
 541    );
 542    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 543        "Response to Message 2".into(),
 544    ));
 545    fake_model.end_last_completion_stream();
 546    cx.run_until_parked();
 547
 548    // Simulate a tool call and verify that the latest tool result is cached
 549    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 550    thread
 551        .update(cx, |thread, cx| {
 552            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 553        })
 554        .unwrap();
 555    cx.run_until_parked();
 556
 557    let tool_use = LanguageModelToolUse {
 558        id: "tool_1".into(),
 559        name: EchoTool::name().into(),
 560        raw_input: json!({"text": "test"}).to_string(),
 561        input: json!({"text": "test"}),
 562        is_input_complete: true,
 563        thought_signature: None,
 564    };
 565    fake_model
 566        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 567    fake_model.end_last_completion_stream();
 568    cx.run_until_parked();
 569
 570    let completion = fake_model.pending_completions().pop().unwrap();
 571    let tool_result = LanguageModelToolResult {
 572        tool_use_id: "tool_1".into(),
 573        tool_name: EchoTool::name().into(),
 574        is_error: false,
 575        content: "test".into(),
 576        output: Some("test".into()),
 577    };
 578    assert_eq!(
 579        completion.messages[1..],
 580        vec![
 581            LanguageModelRequestMessage {
 582                role: Role::User,
 583                content: vec!["Message 1".into()],
 584                cache: false,
 585                reasoning_details: None,
 586            },
 587            LanguageModelRequestMessage {
 588                role: Role::Assistant,
 589                content: vec!["Response to Message 1".into()],
 590                cache: false,
 591                reasoning_details: None,
 592            },
 593            LanguageModelRequestMessage {
 594                role: Role::User,
 595                content: vec!["Message 2".into()],
 596                cache: false,
 597                reasoning_details: None,
 598            },
 599            LanguageModelRequestMessage {
 600                role: Role::Assistant,
 601                content: vec!["Response to Message 2".into()],
 602                cache: false,
 603                reasoning_details: None,
 604            },
 605            LanguageModelRequestMessage {
 606                role: Role::User,
 607                content: vec!["Use the echo tool".into()],
 608                cache: false,
 609                reasoning_details: None,
 610            },
 611            LanguageModelRequestMessage {
 612                role: Role::Assistant,
 613                content: vec![MessageContent::ToolUse(tool_use)],
 614                cache: false,
 615                reasoning_details: None,
 616            },
 617            LanguageModelRequestMessage {
 618                role: Role::User,
 619                content: vec![MessageContent::ToolResult(tool_result)],
 620                cache: true,
 621                reasoning_details: None,
 622            }
 623        ]
 624    );
 625}
 626
 627#[gpui::test]
 628#[cfg_attr(not(feature = "e2e"), ignore)]
 629async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 630    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 631
 632    // Test a tool call that's likely to complete *before* streaming stops.
 633    let events = thread
 634        .update(cx, |thread, cx| {
 635            thread.add_tool(EchoTool);
 636            thread.send(
 637                UserMessageId::new(),
 638                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 639                cx,
 640            )
 641        })
 642        .unwrap()
 643        .collect()
 644        .await;
 645    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 646
 647    // Test a tool calls that's likely to complete *after* streaming stops.
 648    let events = thread
 649        .update(cx, |thread, cx| {
 650            thread.remove_tool(&EchoTool::name());
 651            thread.add_tool(DelayTool);
 652            thread.send(
 653                UserMessageId::new(),
 654                [
 655                    "Now call the delay tool with 200ms.",
 656                    "When the timer goes off, then you echo the output of the tool.",
 657                ],
 658                cx,
 659            )
 660        })
 661        .unwrap()
 662        .collect()
 663        .await;
 664    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 665    thread.update(cx, |thread, _cx| {
 666        assert!(
 667            thread
 668                .last_message()
 669                .unwrap()
 670                .as_agent_message()
 671                .unwrap()
 672                .content
 673                .iter()
 674                .any(|content| {
 675                    if let AgentMessageContent::Text(text) = content {
 676                        text.contains("Ding")
 677                    } else {
 678                        false
 679                    }
 680                }),
 681            "{}",
 682            thread.to_markdown()
 683        );
 684    });
 685}
 686
 687#[gpui::test]
 688#[cfg_attr(not(feature = "e2e"), ignore)]
 689async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 690    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 691
 692    // Test a tool call that's likely to complete *before* streaming stops.
 693    let mut events = thread
 694        .update(cx, |thread, cx| {
 695            thread.add_tool(WordListTool);
 696            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 697        })
 698        .unwrap();
 699
 700    let mut saw_partial_tool_use = false;
 701    while let Some(event) = events.next().await {
 702        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 703            thread.update(cx, |thread, _cx| {
 704                // Look for a tool use in the thread's last message
 705                let message = thread.last_message().unwrap();
 706                let agent_message = message.as_agent_message().unwrap();
 707                let last_content = agent_message.content.last().unwrap();
 708                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 709                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 710                    if tool_call.status == acp::ToolCallStatus::Pending {
 711                        if !last_tool_use.is_input_complete
 712                            && last_tool_use.input.get("g").is_none()
 713                        {
 714                            saw_partial_tool_use = true;
 715                        }
 716                    } else {
 717                        last_tool_use
 718                            .input
 719                            .get("a")
 720                            .expect("'a' has streamed because input is now complete");
 721                        last_tool_use
 722                            .input
 723                            .get("g")
 724                            .expect("'g' has streamed because input is now complete");
 725                    }
 726                } else {
 727                    panic!("last content should be a tool use");
 728                }
 729            });
 730        }
 731    }
 732
 733    assert!(
 734        saw_partial_tool_use,
 735        "should see at least one partially streamed tool use in the history"
 736    );
 737}
 738
 739#[gpui::test]
 740async fn test_tool_authorization(cx: &mut TestAppContext) {
 741    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 742    let fake_model = model.as_fake();
 743
 744    let mut events = thread
 745        .update(cx, |thread, cx| {
 746            thread.add_tool(ToolRequiringPermission);
 747            thread.send(UserMessageId::new(), ["abc"], cx)
 748        })
 749        .unwrap();
 750    cx.run_until_parked();
 751    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 752        LanguageModelToolUse {
 753            id: "tool_id_1".into(),
 754            name: ToolRequiringPermission::name().into(),
 755            raw_input: "{}".into(),
 756            input: json!({}),
 757            is_input_complete: true,
 758            thought_signature: None,
 759        },
 760    ));
 761    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 762        LanguageModelToolUse {
 763            id: "tool_id_2".into(),
 764            name: ToolRequiringPermission::name().into(),
 765            raw_input: "{}".into(),
 766            input: json!({}),
 767            is_input_complete: true,
 768            thought_signature: None,
 769        },
 770    ));
 771    fake_model.end_last_completion_stream();
 772    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 773    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 774
 775    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 776    tool_call_auth_1
 777        .response
 778        .send(acp::PermissionOptionId::new("allow"))
 779        .unwrap();
 780    cx.run_until_parked();
 781
 782    // Reject the second - send "deny" option_id directly since Deny is now a button
 783    tool_call_auth_2
 784        .response
 785        .send(acp::PermissionOptionId::new("deny"))
 786        .unwrap();
 787    cx.run_until_parked();
 788
 789    let completion = fake_model.pending_completions().pop().unwrap();
 790    let message = completion.messages.last().unwrap();
 791    assert_eq!(
 792        message.content,
 793        vec![
 794            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 795                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 796                tool_name: ToolRequiringPermission::name().into(),
 797                is_error: false,
 798                content: "Allowed".into(),
 799                output: Some("Allowed".into())
 800            }),
 801            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 802                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 803                tool_name: ToolRequiringPermission::name().into(),
 804                is_error: true,
 805                content: "Permission to run tool denied by user".into(),
 806                output: Some("Permission to run tool denied by user".into())
 807            })
 808        ]
 809    );
 810
 811    // Simulate yet another tool call.
 812    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 813        LanguageModelToolUse {
 814            id: "tool_id_3".into(),
 815            name: ToolRequiringPermission::name().into(),
 816            raw_input: "{}".into(),
 817            input: json!({}),
 818            is_input_complete: true,
 819            thought_signature: None,
 820        },
 821    ));
 822    fake_model.end_last_completion_stream();
 823
 824    // Respond by always allowing tools - send transformed option_id
 825    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 826    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 827    tool_call_auth_3
 828        .response
 829        .send(acp::PermissionOptionId::new(
 830            "always_allow:tool_requiring_permission",
 831        ))
 832        .unwrap();
 833    cx.run_until_parked();
 834    let completion = fake_model.pending_completions().pop().unwrap();
 835    let message = completion.messages.last().unwrap();
 836    assert_eq!(
 837        message.content,
 838        vec![language_model::MessageContent::ToolResult(
 839            LanguageModelToolResult {
 840                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 841                tool_name: ToolRequiringPermission::name().into(),
 842                is_error: false,
 843                content: "Allowed".into(),
 844                output: Some("Allowed".into())
 845            }
 846        )]
 847    );
 848
 849    // Simulate a final tool call, ensuring we don't trigger authorization.
 850    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 851        LanguageModelToolUse {
 852            id: "tool_id_4".into(),
 853            name: ToolRequiringPermission::name().into(),
 854            raw_input: "{}".into(),
 855            input: json!({}),
 856            is_input_complete: true,
 857            thought_signature: None,
 858        },
 859    ));
 860    fake_model.end_last_completion_stream();
 861    cx.run_until_parked();
 862    let completion = fake_model.pending_completions().pop().unwrap();
 863    let message = completion.messages.last().unwrap();
 864    assert_eq!(
 865        message.content,
 866        vec![language_model::MessageContent::ToolResult(
 867            LanguageModelToolResult {
 868                tool_use_id: "tool_id_4".into(),
 869                tool_name: ToolRequiringPermission::name().into(),
 870                is_error: false,
 871                content: "Allowed".into(),
 872                output: Some("Allowed".into())
 873            }
 874        )]
 875    );
 876}
 877
 878#[gpui::test]
 879async fn test_tool_hallucination(cx: &mut TestAppContext) {
 880    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 881    let fake_model = model.as_fake();
 882
 883    let mut events = thread
 884        .update(cx, |thread, cx| {
 885            thread.send(UserMessageId::new(), ["abc"], cx)
 886        })
 887        .unwrap();
 888    cx.run_until_parked();
 889    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 890        LanguageModelToolUse {
 891            id: "tool_id_1".into(),
 892            name: "nonexistent_tool".into(),
 893            raw_input: "{}".into(),
 894            input: json!({}),
 895            is_input_complete: true,
 896            thought_signature: None,
 897        },
 898    ));
 899    fake_model.end_last_completion_stream();
 900
 901    let tool_call = expect_tool_call(&mut events).await;
 902    assert_eq!(tool_call.title, "nonexistent_tool");
 903    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 904    let update = expect_tool_call_update_fields(&mut events).await;
 905    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 906}
 907
 908async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 909    let event = events
 910        .next()
 911        .await
 912        .expect("no tool call authorization event received")
 913        .unwrap();
 914    match event {
 915        ThreadEvent::ToolCall(tool_call) => tool_call,
 916        event => {
 917            panic!("Unexpected event {event:?}");
 918        }
 919    }
 920}
 921
 922async fn expect_tool_call_update_fields(
 923    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 924) -> acp::ToolCallUpdate {
 925    let event = events
 926        .next()
 927        .await
 928        .expect("no tool call authorization event received")
 929        .unwrap();
 930    match event {
 931        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
 932        event => {
 933            panic!("Unexpected event {event:?}");
 934        }
 935    }
 936}
 937
 938async fn next_tool_call_authorization(
 939    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 940) -> ToolCallAuthorization {
 941    loop {
 942        let event = events
 943            .next()
 944            .await
 945            .expect("no tool call authorization event received")
 946            .unwrap();
 947        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
 948            let permission_kinds = tool_call_authorization
 949                .options
 950                .iter()
 951                .map(|o| o.kind)
 952                .collect::<Vec<_>>();
 953            // Only 2 options now: AllowAlways (for tool) and AllowOnce (granularity only)
 954            // Deny is handled by the UI buttons, not as a separate option
 955            assert_eq!(
 956                permission_kinds,
 957                vec![
 958                    acp::PermissionOptionKind::AllowAlways,
 959                    acp::PermissionOptionKind::AllowOnce,
 960                ]
 961            );
 962            return tool_call_authorization;
 963        }
 964    }
 965}
 966
 967#[gpui::test]
 968#[cfg_attr(not(feature = "e2e"), ignore)]
 969async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
 970    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 971
 972    // Test concurrent tool calls with different delay times
 973    let events = thread
 974        .update(cx, |thread, cx| {
 975            thread.add_tool(DelayTool);
 976            thread.send(
 977                UserMessageId::new(),
 978                [
 979                    "Call the delay tool twice in the same message.",
 980                    "Once with 100ms. Once with 300ms.",
 981                    "When both timers are complete, describe the outputs.",
 982                ],
 983                cx,
 984            )
 985        })
 986        .unwrap()
 987        .collect()
 988        .await;
 989
 990    let stop_reasons = stop_events(events);
 991    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
 992
 993    thread.update(cx, |thread, _cx| {
 994        let last_message = thread.last_message().unwrap();
 995        let agent_message = last_message.as_agent_message().unwrap();
 996        let text = agent_message
 997            .content
 998            .iter()
 999            .filter_map(|content| {
1000                if let AgentMessageContent::Text(text) = content {
1001                    Some(text.as_str())
1002                } else {
1003                    None
1004                }
1005            })
1006            .collect::<String>();
1007
1008        assert!(text.contains("Ding"));
1009    });
1010}
1011
1012#[gpui::test]
1013async fn test_profiles(cx: &mut TestAppContext) {
1014    let ThreadTest {
1015        model, thread, fs, ..
1016    } = setup(cx, TestModel::Fake).await;
1017    let fake_model = model.as_fake();
1018
1019    thread.update(cx, |thread, _cx| {
1020        thread.add_tool(DelayTool);
1021        thread.add_tool(EchoTool);
1022        thread.add_tool(InfiniteTool);
1023    });
1024
1025    // Override profiles and wait for settings to be loaded.
1026    fs.insert_file(
1027        paths::settings_file(),
1028        json!({
1029            "agent": {
1030                "profiles": {
1031                    "test-1": {
1032                        "name": "Test Profile 1",
1033                        "tools": {
1034                            EchoTool::name(): true,
1035                            DelayTool::name(): true,
1036                        }
1037                    },
1038                    "test-2": {
1039                        "name": "Test Profile 2",
1040                        "tools": {
1041                            InfiniteTool::name(): true,
1042                        }
1043                    }
1044                }
1045            }
1046        })
1047        .to_string()
1048        .into_bytes(),
1049    )
1050    .await;
1051    cx.run_until_parked();
1052
1053    // Test that test-1 profile (default) has echo and delay tools
1054    thread
1055        .update(cx, |thread, cx| {
1056            thread.set_profile(AgentProfileId("test-1".into()), cx);
1057            thread.send(UserMessageId::new(), ["test"], cx)
1058        })
1059        .unwrap();
1060    cx.run_until_parked();
1061
1062    let mut pending_completions = fake_model.pending_completions();
1063    assert_eq!(pending_completions.len(), 1);
1064    let completion = pending_completions.pop().unwrap();
1065    let tool_names: Vec<String> = completion
1066        .tools
1067        .iter()
1068        .map(|tool| tool.name.clone())
1069        .collect();
1070    assert_eq!(tool_names, vec![DelayTool::name(), EchoTool::name()]);
1071    fake_model.end_last_completion_stream();
1072
1073    // Switch to test-2 profile, and verify that it has only the infinite tool.
1074    thread
1075        .update(cx, |thread, cx| {
1076            thread.set_profile(AgentProfileId("test-2".into()), cx);
1077            thread.send(UserMessageId::new(), ["test2"], cx)
1078        })
1079        .unwrap();
1080    cx.run_until_parked();
1081    let mut pending_completions = fake_model.pending_completions();
1082    assert_eq!(pending_completions.len(), 1);
1083    let completion = pending_completions.pop().unwrap();
1084    let tool_names: Vec<String> = completion
1085        .tools
1086        .iter()
1087        .map(|tool| tool.name.clone())
1088        .collect();
1089    assert_eq!(tool_names, vec![InfiniteTool::name()]);
1090}
1091
1092#[gpui::test]
1093async fn test_mcp_tools(cx: &mut TestAppContext) {
1094    let ThreadTest {
1095        model,
1096        thread,
1097        context_server_store,
1098        fs,
1099        ..
1100    } = setup(cx, TestModel::Fake).await;
1101    let fake_model = model.as_fake();
1102
1103    // Override profiles and wait for settings to be loaded.
1104    fs.insert_file(
1105        paths::settings_file(),
1106        json!({
1107            "agent": {
1108                "always_allow_tool_actions": true,
1109                "profiles": {
1110                    "test": {
1111                        "name": "Test Profile",
1112                        "enable_all_context_servers": true,
1113                        "tools": {
1114                            EchoTool::name(): true,
1115                        }
1116                    },
1117                }
1118            }
1119        })
1120        .to_string()
1121        .into_bytes(),
1122    )
1123    .await;
1124    cx.run_until_parked();
1125    thread.update(cx, |thread, cx| {
1126        thread.set_profile(AgentProfileId("test".into()), cx)
1127    });
1128
1129    let mut mcp_tool_calls = setup_context_server(
1130        "test_server",
1131        vec![context_server::types::Tool {
1132            name: "echo".into(),
1133            description: None,
1134            input_schema: serde_json::to_value(EchoTool::input_schema(
1135                LanguageModelToolSchemaFormat::JsonSchema,
1136            ))
1137            .unwrap(),
1138            output_schema: None,
1139            annotations: None,
1140        }],
1141        &context_server_store,
1142        cx,
1143    );
1144
1145    let events = thread.update(cx, |thread, cx| {
1146        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1147    });
1148    cx.run_until_parked();
1149
1150    // Simulate the model calling the MCP tool.
1151    let completion = fake_model.pending_completions().pop().unwrap();
1152    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1153    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1154        LanguageModelToolUse {
1155            id: "tool_1".into(),
1156            name: "echo".into(),
1157            raw_input: json!({"text": "test"}).to_string(),
1158            input: json!({"text": "test"}),
1159            is_input_complete: true,
1160            thought_signature: None,
1161        },
1162    ));
1163    fake_model.end_last_completion_stream();
1164    cx.run_until_parked();
1165
1166    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1167    assert_eq!(tool_call_params.name, "echo");
1168    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1169    tool_call_response
1170        .send(context_server::types::CallToolResponse {
1171            content: vec![context_server::types::ToolResponseContent::Text {
1172                text: "test".into(),
1173            }],
1174            is_error: None,
1175            meta: None,
1176            structured_content: None,
1177        })
1178        .unwrap();
1179    cx.run_until_parked();
1180
1181    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1182    fake_model.send_last_completion_stream_text_chunk("Done!");
1183    fake_model.end_last_completion_stream();
1184    events.collect::<Vec<_>>().await;
1185
1186    // Send again after adding the echo tool, ensuring the name collision is resolved.
1187    let events = thread.update(cx, |thread, cx| {
1188        thread.add_tool(EchoTool);
1189        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1190    });
1191    cx.run_until_parked();
1192    let completion = fake_model.pending_completions().pop().unwrap();
1193    assert_eq!(
1194        tool_names_for_completion(&completion),
1195        vec!["echo", "test_server_echo"]
1196    );
1197    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1198        LanguageModelToolUse {
1199            id: "tool_2".into(),
1200            name: "test_server_echo".into(),
1201            raw_input: json!({"text": "mcp"}).to_string(),
1202            input: json!({"text": "mcp"}),
1203            is_input_complete: true,
1204            thought_signature: None,
1205        },
1206    ));
1207    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1208        LanguageModelToolUse {
1209            id: "tool_3".into(),
1210            name: "echo".into(),
1211            raw_input: json!({"text": "native"}).to_string(),
1212            input: json!({"text": "native"}),
1213            is_input_complete: true,
1214            thought_signature: None,
1215        },
1216    ));
1217    fake_model.end_last_completion_stream();
1218    cx.run_until_parked();
1219
1220    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1221    assert_eq!(tool_call_params.name, "echo");
1222    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1223    tool_call_response
1224        .send(context_server::types::CallToolResponse {
1225            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1226            is_error: None,
1227            meta: None,
1228            structured_content: None,
1229        })
1230        .unwrap();
1231    cx.run_until_parked();
1232
1233    // Ensure the tool results were inserted with the correct names.
1234    let completion = fake_model.pending_completions().pop().unwrap();
1235    assert_eq!(
1236        completion.messages.last().unwrap().content,
1237        vec![
1238            MessageContent::ToolResult(LanguageModelToolResult {
1239                tool_use_id: "tool_3".into(),
1240                tool_name: "echo".into(),
1241                is_error: false,
1242                content: "native".into(),
1243                output: Some("native".into()),
1244            },),
1245            MessageContent::ToolResult(LanguageModelToolResult {
1246                tool_use_id: "tool_2".into(),
1247                tool_name: "test_server_echo".into(),
1248                is_error: false,
1249                content: "mcp".into(),
1250                output: Some("mcp".into()),
1251            },),
1252        ]
1253    );
1254    fake_model.end_last_completion_stream();
1255    events.collect::<Vec<_>>().await;
1256}
1257
1258#[gpui::test]
1259async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1260    let ThreadTest {
1261        model,
1262        thread,
1263        context_server_store,
1264        fs,
1265        ..
1266    } = setup(cx, TestModel::Fake).await;
1267    let fake_model = model.as_fake();
1268
1269    // Set up a profile with all tools enabled
1270    fs.insert_file(
1271        paths::settings_file(),
1272        json!({
1273            "agent": {
1274                "profiles": {
1275                    "test": {
1276                        "name": "Test Profile",
1277                        "enable_all_context_servers": true,
1278                        "tools": {
1279                            EchoTool::name(): true,
1280                            DelayTool::name(): true,
1281                            WordListTool::name(): true,
1282                            ToolRequiringPermission::name(): true,
1283                            InfiniteTool::name(): true,
1284                        }
1285                    },
1286                }
1287            }
1288        })
1289        .to_string()
1290        .into_bytes(),
1291    )
1292    .await;
1293    cx.run_until_parked();
1294
1295    thread.update(cx, |thread, cx| {
1296        thread.set_profile(AgentProfileId("test".into()), cx);
1297        thread.add_tool(EchoTool);
1298        thread.add_tool(DelayTool);
1299        thread.add_tool(WordListTool);
1300        thread.add_tool(ToolRequiringPermission);
1301        thread.add_tool(InfiniteTool);
1302    });
1303
1304    // Set up multiple context servers with some overlapping tool names
1305    let _server1_calls = setup_context_server(
1306        "xxx",
1307        vec![
1308            context_server::types::Tool {
1309                name: "echo".into(), // Conflicts with native EchoTool
1310                description: None,
1311                input_schema: serde_json::to_value(EchoTool::input_schema(
1312                    LanguageModelToolSchemaFormat::JsonSchema,
1313                ))
1314                .unwrap(),
1315                output_schema: None,
1316                annotations: None,
1317            },
1318            context_server::types::Tool {
1319                name: "unique_tool_1".into(),
1320                description: None,
1321                input_schema: json!({"type": "object", "properties": {}}),
1322                output_schema: None,
1323                annotations: None,
1324            },
1325        ],
1326        &context_server_store,
1327        cx,
1328    );
1329
1330    let _server2_calls = setup_context_server(
1331        "yyy",
1332        vec![
1333            context_server::types::Tool {
1334                name: "echo".into(), // Also conflicts with native EchoTool
1335                description: None,
1336                input_schema: serde_json::to_value(EchoTool::input_schema(
1337                    LanguageModelToolSchemaFormat::JsonSchema,
1338                ))
1339                .unwrap(),
1340                output_schema: None,
1341                annotations: None,
1342            },
1343            context_server::types::Tool {
1344                name: "unique_tool_2".into(),
1345                description: None,
1346                input_schema: json!({"type": "object", "properties": {}}),
1347                output_schema: None,
1348                annotations: None,
1349            },
1350            context_server::types::Tool {
1351                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1352                description: None,
1353                input_schema: json!({"type": "object", "properties": {}}),
1354                output_schema: None,
1355                annotations: None,
1356            },
1357            context_server::types::Tool {
1358                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1359                description: None,
1360                input_schema: json!({"type": "object", "properties": {}}),
1361                output_schema: None,
1362                annotations: None,
1363            },
1364        ],
1365        &context_server_store,
1366        cx,
1367    );
1368    let _server3_calls = setup_context_server(
1369        "zzz",
1370        vec![
1371            context_server::types::Tool {
1372                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1373                description: None,
1374                input_schema: json!({"type": "object", "properties": {}}),
1375                output_schema: None,
1376                annotations: None,
1377            },
1378            context_server::types::Tool {
1379                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1380                description: None,
1381                input_schema: json!({"type": "object", "properties": {}}),
1382                output_schema: None,
1383                annotations: None,
1384            },
1385            context_server::types::Tool {
1386                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1387                description: None,
1388                input_schema: json!({"type": "object", "properties": {}}),
1389                output_schema: None,
1390                annotations: None,
1391            },
1392        ],
1393        &context_server_store,
1394        cx,
1395    );
1396
1397    thread
1398        .update(cx, |thread, cx| {
1399            thread.send(UserMessageId::new(), ["Go"], cx)
1400        })
1401        .unwrap();
1402    cx.run_until_parked();
1403    let completion = fake_model.pending_completions().pop().unwrap();
1404    assert_eq!(
1405        tool_names_for_completion(&completion),
1406        vec![
1407            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1408            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1409            "delay",
1410            "echo",
1411            "infinite",
1412            "tool_requiring_permission",
1413            "unique_tool_1",
1414            "unique_tool_2",
1415            "word_list",
1416            "xxx_echo",
1417            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1418            "yyy_echo",
1419            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1420        ]
1421    );
1422}
1423
1424#[gpui::test]
1425#[cfg_attr(not(feature = "e2e"), ignore)]
1426async fn test_cancellation(cx: &mut TestAppContext) {
1427    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1428
1429    let mut events = thread
1430        .update(cx, |thread, cx| {
1431            thread.add_tool(InfiniteTool);
1432            thread.add_tool(EchoTool);
1433            thread.send(
1434                UserMessageId::new(),
1435                ["Call the echo tool, then call the infinite tool, then explain their output"],
1436                cx,
1437            )
1438        })
1439        .unwrap();
1440
1441    // Wait until both tools are called.
1442    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1443    let mut echo_id = None;
1444    let mut echo_completed = false;
1445    while let Some(event) = events.next().await {
1446        match event.unwrap() {
1447            ThreadEvent::ToolCall(tool_call) => {
1448                assert_eq!(tool_call.title, expected_tools.remove(0));
1449                if tool_call.title == "Echo" {
1450                    echo_id = Some(tool_call.tool_call_id);
1451                }
1452            }
1453            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1454                acp::ToolCallUpdate {
1455                    tool_call_id,
1456                    fields:
1457                        acp::ToolCallUpdateFields {
1458                            status: Some(acp::ToolCallStatus::Completed),
1459                            ..
1460                        },
1461                    ..
1462                },
1463            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1464                echo_completed = true;
1465            }
1466            _ => {}
1467        }
1468
1469        if expected_tools.is_empty() && echo_completed {
1470            break;
1471        }
1472    }
1473
1474    // Cancel the current send and ensure that the event stream is closed, even
1475    // if one of the tools is still running.
1476    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1477    let events = events.collect::<Vec<_>>().await;
1478    let last_event = events.last();
1479    assert!(
1480        matches!(
1481            last_event,
1482            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1483        ),
1484        "unexpected event {last_event:?}"
1485    );
1486
1487    // Ensure we can still send a new message after cancellation.
1488    let events = thread
1489        .update(cx, |thread, cx| {
1490            thread.send(
1491                UserMessageId::new(),
1492                ["Testing: reply with 'Hello' then stop."],
1493                cx,
1494            )
1495        })
1496        .unwrap()
1497        .collect::<Vec<_>>()
1498        .await;
1499    thread.update(cx, |thread, _cx| {
1500        let message = thread.last_message().unwrap();
1501        let agent_message = message.as_agent_message().unwrap();
1502        assert_eq!(
1503            agent_message.content,
1504            vec![AgentMessageContent::Text("Hello".to_string())]
1505        );
1506    });
1507    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
1508}
1509
1510#[gpui::test]
1511async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
1512    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1513    always_allow_tools(cx);
1514    let fake_model = model.as_fake();
1515
1516    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
1517    let environment = Rc::new(FakeThreadEnvironment {
1518        handle: handle.clone(),
1519    });
1520
1521    let mut events = thread
1522        .update(cx, |thread, cx| {
1523            thread.add_tool(crate::TerminalTool::new(
1524                thread.project().clone(),
1525                environment,
1526            ));
1527            thread.send(UserMessageId::new(), ["run a command"], cx)
1528        })
1529        .unwrap();
1530
1531    cx.run_until_parked();
1532
1533    // Simulate the model calling the terminal tool
1534    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1535        LanguageModelToolUse {
1536            id: "terminal_tool_1".into(),
1537            name: "terminal".into(),
1538            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
1539            input: json!({"command": "sleep 1000", "cd": "."}),
1540            is_input_complete: true,
1541            thought_signature: None,
1542        },
1543    ));
1544    fake_model.end_last_completion_stream();
1545
1546    // Wait for the terminal tool to start running
1547    wait_for_terminal_tool_started(&mut events, cx).await;
1548
1549    // Cancel the thread while the terminal is running
1550    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
1551
1552    // Collect remaining events, driving the executor to let cancellation complete
1553    let remaining_events = collect_events_until_stop(&mut events, cx).await;
1554
1555    // Verify the terminal was killed
1556    assert!(
1557        handle.was_killed(),
1558        "expected terminal handle to be killed on cancellation"
1559    );
1560
1561    // Verify we got a cancellation stop event
1562    assert_eq!(
1563        stop_events(remaining_events),
1564        vec![acp::StopReason::Cancelled],
1565    );
1566
1567    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
1568    thread.update(cx, |thread, _cx| {
1569        let message = thread.last_message().unwrap();
1570        let agent_message = message.as_agent_message().unwrap();
1571
1572        let tool_use = agent_message
1573            .content
1574            .iter()
1575            .find_map(|content| match content {
1576                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
1577                _ => None,
1578            })
1579            .expect("expected tool use in agent message");
1580
1581        let tool_result = agent_message
1582            .tool_results
1583            .get(&tool_use.id)
1584            .expect("expected tool result");
1585
1586        let result_text = match &tool_result.content {
1587            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
1588            _ => panic!("expected text content in tool result"),
1589        };
1590
1591        // "partial output" comes from FakeTerminalHandle's output field
1592        assert!(
1593            result_text.contains("partial output"),
1594            "expected tool result to contain terminal output, got: {result_text}"
1595        );
1596        // Match the actual format from process_content in terminal_tool.rs
1597        assert!(
1598            result_text.contains("The user stopped this command"),
1599            "expected tool result to indicate user stopped, got: {result_text}"
1600        );
1601    });
1602
1603    // Verify we can send a new message after cancellation
1604    verify_thread_recovery(&thread, &fake_model, cx).await;
1605}
1606
1607#[gpui::test]
1608async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
1609    // This test verifies that tools which properly handle cancellation via
1610    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
1611    // to cancellation and report that they were cancelled.
1612    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1613    always_allow_tools(cx);
1614    let fake_model = model.as_fake();
1615
1616    let (tool, was_cancelled) = CancellationAwareTool::new();
1617
1618    let mut events = thread
1619        .update(cx, |thread, cx| {
1620            thread.add_tool(tool);
1621            thread.send(
1622                UserMessageId::new(),
1623                ["call the cancellation aware tool"],
1624                cx,
1625            )
1626        })
1627        .unwrap();
1628
1629    cx.run_until_parked();
1630
1631    // Simulate the model calling the cancellation-aware tool
1632    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1633        LanguageModelToolUse {
1634            id: "cancellation_aware_1".into(),
1635            name: "cancellation_aware".into(),
1636            raw_input: r#"{}"#.into(),
1637            input: json!({}),
1638            is_input_complete: true,
1639            thought_signature: None,
1640        },
1641    ));
1642    fake_model.end_last_completion_stream();
1643
1644    cx.run_until_parked();
1645
1646    // Wait for the tool call to be reported
1647    let mut tool_started = false;
1648    let deadline = cx.executor().num_cpus() * 100;
1649    for _ in 0..deadline {
1650        cx.run_until_parked();
1651
1652        while let Some(Some(event)) = events.next().now_or_never() {
1653            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
1654                if tool_call.title == "Cancellation Aware Tool" {
1655                    tool_started = true;
1656                    break;
1657                }
1658            }
1659        }
1660
1661        if tool_started {
1662            break;
1663        }
1664
1665        cx.background_executor
1666            .timer(Duration::from_millis(10))
1667            .await;
1668    }
1669    assert!(tool_started, "expected cancellation aware tool to start");
1670
1671    // Cancel the thread and wait for it to complete
1672    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
1673
1674    // The cancel task should complete promptly because the tool handles cancellation
1675    let timeout = cx.background_executor.timer(Duration::from_secs(5));
1676    futures::select! {
1677        _ = cancel_task.fuse() => {}
1678        _ = timeout.fuse() => {
1679            panic!("cancel task timed out - tool did not respond to cancellation");
1680        }
1681    }
1682
1683    // Verify the tool detected cancellation via its flag
1684    assert!(
1685        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
1686        "tool should have detected cancellation via event_stream.cancelled_by_user()"
1687    );
1688
1689    // Collect remaining events
1690    let remaining_events = collect_events_until_stop(&mut events, cx).await;
1691
1692    // Verify we got a cancellation stop event
1693    assert_eq!(
1694        stop_events(remaining_events),
1695        vec![acp::StopReason::Cancelled],
1696    );
1697
1698    // Verify we can send a new message after cancellation
1699    verify_thread_recovery(&thread, &fake_model, cx).await;
1700}
1701
1702/// Helper to verify thread can recover after cancellation by sending a simple message.
1703async fn verify_thread_recovery(
1704    thread: &Entity<Thread>,
1705    fake_model: &FakeLanguageModel,
1706    cx: &mut TestAppContext,
1707) {
1708    let events = thread
1709        .update(cx, |thread, cx| {
1710            thread.send(
1711                UserMessageId::new(),
1712                ["Testing: reply with 'Hello' then stop."],
1713                cx,
1714            )
1715        })
1716        .unwrap();
1717    cx.run_until_parked();
1718    fake_model.send_last_completion_stream_text_chunk("Hello");
1719    fake_model
1720        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1721    fake_model.end_last_completion_stream();
1722
1723    let events = events.collect::<Vec<_>>().await;
1724    thread.update(cx, |thread, _cx| {
1725        let message = thread.last_message().unwrap();
1726        let agent_message = message.as_agent_message().unwrap();
1727        assert_eq!(
1728            agent_message.content,
1729            vec![AgentMessageContent::Text("Hello".to_string())]
1730        );
1731    });
1732    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
1733}
1734
1735/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
1736async fn wait_for_terminal_tool_started(
1737    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
1738    cx: &mut TestAppContext,
1739) {
1740    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
1741    for _ in 0..deadline {
1742        cx.run_until_parked();
1743
1744        while let Some(Some(event)) = events.next().now_or_never() {
1745            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1746                update,
1747            ))) = &event
1748            {
1749                if update.fields.content.as_ref().is_some_and(|content| {
1750                    content
1751                        .iter()
1752                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
1753                }) {
1754                    return;
1755                }
1756            }
1757        }
1758
1759        cx.background_executor
1760            .timer(Duration::from_millis(10))
1761            .await;
1762    }
1763    panic!("terminal tool did not start within the expected time");
1764}
1765
1766/// Collects events until a Stop event is received, driving the executor to completion.
1767async fn collect_events_until_stop(
1768    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
1769    cx: &mut TestAppContext,
1770) -> Vec<Result<ThreadEvent>> {
1771    let mut collected = Vec::new();
1772    let deadline = cx.executor().num_cpus() * 200;
1773
1774    for _ in 0..deadline {
1775        cx.executor().advance_clock(Duration::from_millis(10));
1776        cx.run_until_parked();
1777
1778        while let Some(Some(event)) = events.next().now_or_never() {
1779            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
1780            collected.push(event);
1781            if is_stop {
1782                return collected;
1783            }
1784        }
1785    }
1786    panic!(
1787        "did not receive Stop event within the expected time; collected {} events",
1788        collected.len()
1789    );
1790}
1791
1792#[gpui::test]
1793async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
1794    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1795    always_allow_tools(cx);
1796    let fake_model = model.as_fake();
1797
1798    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
1799    let environment = Rc::new(FakeThreadEnvironment {
1800        handle: handle.clone(),
1801    });
1802
1803    let message_id = UserMessageId::new();
1804    let mut events = thread
1805        .update(cx, |thread, cx| {
1806            thread.add_tool(crate::TerminalTool::new(
1807                thread.project().clone(),
1808                environment,
1809            ));
1810            thread.send(message_id.clone(), ["run a command"], cx)
1811        })
1812        .unwrap();
1813
1814    cx.run_until_parked();
1815
1816    // Simulate the model calling the terminal tool
1817    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1818        LanguageModelToolUse {
1819            id: "terminal_tool_1".into(),
1820            name: "terminal".into(),
1821            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
1822            input: json!({"command": "sleep 1000", "cd": "."}),
1823            is_input_complete: true,
1824            thought_signature: None,
1825        },
1826    ));
1827    fake_model.end_last_completion_stream();
1828
1829    // Wait for the terminal tool to start running
1830    wait_for_terminal_tool_started(&mut events, cx).await;
1831
1832    // Truncate the thread while the terminal is running
1833    thread
1834        .update(cx, |thread, cx| thread.truncate(message_id, cx))
1835        .unwrap();
1836
1837    // Drive the executor to let cancellation complete
1838    let _ = collect_events_until_stop(&mut events, cx).await;
1839
1840    // Verify the terminal was killed
1841    assert!(
1842        handle.was_killed(),
1843        "expected terminal handle to be killed on truncate"
1844    );
1845
1846    // Verify the thread is empty after truncation
1847    thread.update(cx, |thread, _cx| {
1848        assert_eq!(
1849            thread.to_markdown(),
1850            "",
1851            "expected thread to be empty after truncating the only message"
1852        );
1853    });
1854
1855    // Verify we can send a new message after truncation
1856    verify_thread_recovery(&thread, &fake_model, cx).await;
1857}
1858
1859#[gpui::test]
1860async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
1861    // Tests that cancellation properly kills all running terminal tools when multiple are active.
1862    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1863    always_allow_tools(cx);
1864    let fake_model = model.as_fake();
1865
1866    let environment = Rc::new(MultiTerminalEnvironment::new());
1867
1868    let mut events = thread
1869        .update(cx, |thread, cx| {
1870            thread.add_tool(crate::TerminalTool::new(
1871                thread.project().clone(),
1872                environment.clone(),
1873            ));
1874            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
1875        })
1876        .unwrap();
1877
1878    cx.run_until_parked();
1879
1880    // Simulate the model calling two terminal tools
1881    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1882        LanguageModelToolUse {
1883            id: "terminal_tool_1".into(),
1884            name: "terminal".into(),
1885            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
1886            input: json!({"command": "sleep 1000", "cd": "."}),
1887            is_input_complete: true,
1888            thought_signature: None,
1889        },
1890    ));
1891    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1892        LanguageModelToolUse {
1893            id: "terminal_tool_2".into(),
1894            name: "terminal".into(),
1895            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
1896            input: json!({"command": "sleep 2000", "cd": "."}),
1897            is_input_complete: true,
1898            thought_signature: None,
1899        },
1900    ));
1901    fake_model.end_last_completion_stream();
1902
1903    // Wait for both terminal tools to start by counting terminal content updates
1904    let mut terminals_started = 0;
1905    let deadline = cx.executor().num_cpus() * 100;
1906    for _ in 0..deadline {
1907        cx.run_until_parked();
1908
1909        while let Some(Some(event)) = events.next().now_or_never() {
1910            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1911                update,
1912            ))) = &event
1913            {
1914                if update.fields.content.as_ref().is_some_and(|content| {
1915                    content
1916                        .iter()
1917                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
1918                }) {
1919                    terminals_started += 1;
1920                    if terminals_started >= 2 {
1921                        break;
1922                    }
1923                }
1924            }
1925        }
1926        if terminals_started >= 2 {
1927            break;
1928        }
1929
1930        cx.background_executor
1931            .timer(Duration::from_millis(10))
1932            .await;
1933    }
1934    assert!(
1935        terminals_started >= 2,
1936        "expected 2 terminal tools to start, got {terminals_started}"
1937    );
1938
1939    // Cancel the thread while both terminals are running
1940    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
1941
1942    // Collect remaining events
1943    let remaining_events = collect_events_until_stop(&mut events, cx).await;
1944
1945    // Verify both terminal handles were killed
1946    let handles = environment.handles();
1947    assert_eq!(
1948        handles.len(),
1949        2,
1950        "expected 2 terminal handles to be created"
1951    );
1952    assert!(
1953        handles[0].was_killed(),
1954        "expected first terminal handle to be killed on cancellation"
1955    );
1956    assert!(
1957        handles[1].was_killed(),
1958        "expected second terminal handle to be killed on cancellation"
1959    );
1960
1961    // Verify we got a cancellation stop event
1962    assert_eq!(
1963        stop_events(remaining_events),
1964        vec![acp::StopReason::Cancelled],
1965    );
1966}
1967
1968#[gpui::test]
1969async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
1970    // Tests that clicking the stop button on the terminal card (as opposed to the main
1971    // cancel button) properly reports user stopped via the was_stopped_by_user path.
1972    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1973    always_allow_tools(cx);
1974    let fake_model = model.as_fake();
1975
1976    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
1977    let environment = Rc::new(FakeThreadEnvironment {
1978        handle: handle.clone(),
1979    });
1980
1981    let mut events = thread
1982        .update(cx, |thread, cx| {
1983            thread.add_tool(crate::TerminalTool::new(
1984                thread.project().clone(),
1985                environment,
1986            ));
1987            thread.send(UserMessageId::new(), ["run a command"], cx)
1988        })
1989        .unwrap();
1990
1991    cx.run_until_parked();
1992
1993    // Simulate the model calling the terminal tool
1994    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1995        LanguageModelToolUse {
1996            id: "terminal_tool_1".into(),
1997            name: "terminal".into(),
1998            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
1999            input: json!({"command": "sleep 1000", "cd": "."}),
2000            is_input_complete: true,
2001            thought_signature: None,
2002        },
2003    ));
2004    fake_model.end_last_completion_stream();
2005
2006    // Wait for the terminal tool to start running
2007    wait_for_terminal_tool_started(&mut events, cx).await;
2008
2009    // Simulate user clicking stop on the terminal card itself.
2010    // This sets the flag and signals exit (simulating what the real UI would do).
2011    handle.set_stopped_by_user(true);
2012    handle.killed.store(true, Ordering::SeqCst);
2013    handle.signal_exit();
2014
2015    // Wait for the tool to complete
2016    cx.run_until_parked();
2017
2018    // The thread continues after tool completion - simulate the model ending its turn
2019    fake_model
2020        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2021    fake_model.end_last_completion_stream();
2022
2023    // Collect remaining events
2024    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2025
2026    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2027    assert_eq!(
2028        stop_events(remaining_events),
2029        vec![acp::StopReason::EndTurn],
2030    );
2031
2032    // Verify the tool result indicates user stopped
2033    thread.update(cx, |thread, _cx| {
2034        let message = thread.last_message().unwrap();
2035        let agent_message = message.as_agent_message().unwrap();
2036
2037        let tool_use = agent_message
2038            .content
2039            .iter()
2040            .find_map(|content| match content {
2041                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2042                _ => None,
2043            })
2044            .expect("expected tool use in agent message");
2045
2046        let tool_result = agent_message
2047            .tool_results
2048            .get(&tool_use.id)
2049            .expect("expected tool result");
2050
2051        let result_text = match &tool_result.content {
2052            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2053            _ => panic!("expected text content in tool result"),
2054        };
2055
2056        assert!(
2057            result_text.contains("The user stopped this command"),
2058            "expected tool result to indicate user stopped, got: {result_text}"
2059        );
2060    });
2061}
2062
2063#[gpui::test]
2064async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2065    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2066    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2067    always_allow_tools(cx);
2068    let fake_model = model.as_fake();
2069
2070    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
2071    let environment = Rc::new(FakeThreadEnvironment {
2072        handle: handle.clone(),
2073    });
2074
2075    let mut events = thread
2076        .update(cx, |thread, cx| {
2077            thread.add_tool(crate::TerminalTool::new(
2078                thread.project().clone(),
2079                environment,
2080            ));
2081            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2082        })
2083        .unwrap();
2084
2085    cx.run_until_parked();
2086
2087    // Simulate the model calling the terminal tool with a short timeout
2088    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2089        LanguageModelToolUse {
2090            id: "terminal_tool_1".into(),
2091            name: "terminal".into(),
2092            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2093            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2094            is_input_complete: true,
2095            thought_signature: None,
2096        },
2097    ));
2098    fake_model.end_last_completion_stream();
2099
2100    // Wait for the terminal tool to start running
2101    wait_for_terminal_tool_started(&mut events, cx).await;
2102
2103    // Advance clock past the timeout
2104    cx.executor().advance_clock(Duration::from_millis(200));
2105    cx.run_until_parked();
2106
2107    // The thread continues after tool completion - simulate the model ending its turn
2108    fake_model
2109        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2110    fake_model.end_last_completion_stream();
2111
2112    // Collect remaining events
2113    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2114
2115    // Verify the terminal was killed due to timeout
2116    assert!(
2117        handle.was_killed(),
2118        "expected terminal handle to be killed on timeout"
2119    );
2120
2121    // Verify we got an EndTurn (the tool completed, just with timeout)
2122    assert_eq!(
2123        stop_events(remaining_events),
2124        vec![acp::StopReason::EndTurn],
2125    );
2126
2127    // Verify the tool result indicates timeout, not user stopped
2128    thread.update(cx, |thread, _cx| {
2129        let message = thread.last_message().unwrap();
2130        let agent_message = message.as_agent_message().unwrap();
2131
2132        let tool_use = agent_message
2133            .content
2134            .iter()
2135            .find_map(|content| match content {
2136                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2137                _ => None,
2138            })
2139            .expect("expected tool use in agent message");
2140
2141        let tool_result = agent_message
2142            .tool_results
2143            .get(&tool_use.id)
2144            .expect("expected tool result");
2145
2146        let result_text = match &tool_result.content {
2147            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2148            _ => panic!("expected text content in tool result"),
2149        };
2150
2151        assert!(
2152            result_text.contains("timed out"),
2153            "expected tool result to indicate timeout, got: {result_text}"
2154        );
2155        assert!(
2156            !result_text.contains("The user stopped"),
2157            "tool result should not mention user stopped when it timed out, got: {result_text}"
2158        );
2159    });
2160}
2161
2162#[gpui::test]
2163async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2164    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2165    let fake_model = model.as_fake();
2166
2167    let events_1 = thread
2168        .update(cx, |thread, cx| {
2169            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2170        })
2171        .unwrap();
2172    cx.run_until_parked();
2173    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2174    cx.run_until_parked();
2175
2176    let events_2 = thread
2177        .update(cx, |thread, cx| {
2178            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2179        })
2180        .unwrap();
2181    cx.run_until_parked();
2182    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2183    fake_model
2184        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2185    fake_model.end_last_completion_stream();
2186
2187    let events_1 = events_1.collect::<Vec<_>>().await;
2188    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2189    let events_2 = events_2.collect::<Vec<_>>().await;
2190    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2191}
2192
2193#[gpui::test]
2194async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2195    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2196    let fake_model = model.as_fake();
2197
2198    let events_1 = thread
2199        .update(cx, |thread, cx| {
2200            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2201        })
2202        .unwrap();
2203    cx.run_until_parked();
2204    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2205    fake_model
2206        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2207    fake_model.end_last_completion_stream();
2208    let events_1 = events_1.collect::<Vec<_>>().await;
2209
2210    let events_2 = thread
2211        .update(cx, |thread, cx| {
2212            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2213        })
2214        .unwrap();
2215    cx.run_until_parked();
2216    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2217    fake_model
2218        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2219    fake_model.end_last_completion_stream();
2220    let events_2 = events_2.collect::<Vec<_>>().await;
2221
2222    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2223    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2224}
2225
2226#[gpui::test]
2227async fn test_refusal(cx: &mut TestAppContext) {
2228    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2229    let fake_model = model.as_fake();
2230
2231    let events = thread
2232        .update(cx, |thread, cx| {
2233            thread.send(UserMessageId::new(), ["Hello"], cx)
2234        })
2235        .unwrap();
2236    cx.run_until_parked();
2237    thread.read_with(cx, |thread, _| {
2238        assert_eq!(
2239            thread.to_markdown(),
2240            indoc! {"
2241                ## User
2242
2243                Hello
2244            "}
2245        );
2246    });
2247
2248    fake_model.send_last_completion_stream_text_chunk("Hey!");
2249    cx.run_until_parked();
2250    thread.read_with(cx, |thread, _| {
2251        assert_eq!(
2252            thread.to_markdown(),
2253            indoc! {"
2254                ## User
2255
2256                Hello
2257
2258                ## Assistant
2259
2260                Hey!
2261            "}
2262        );
2263    });
2264
2265    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2266    fake_model
2267        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2268    let events = events.collect::<Vec<_>>().await;
2269    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2270    thread.read_with(cx, |thread, _| {
2271        assert_eq!(thread.to_markdown(), "");
2272    });
2273}
2274
2275#[gpui::test]
2276async fn test_truncate_first_message(cx: &mut TestAppContext) {
2277    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2278    let fake_model = model.as_fake();
2279
2280    let message_id = UserMessageId::new();
2281    thread
2282        .update(cx, |thread, cx| {
2283            thread.send(message_id.clone(), ["Hello"], cx)
2284        })
2285        .unwrap();
2286    cx.run_until_parked();
2287    thread.read_with(cx, |thread, _| {
2288        assert_eq!(
2289            thread.to_markdown(),
2290            indoc! {"
2291                ## User
2292
2293                Hello
2294            "}
2295        );
2296        assert_eq!(thread.latest_token_usage(), None);
2297    });
2298
2299    fake_model.send_last_completion_stream_text_chunk("Hey!");
2300    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2301        language_model::TokenUsage {
2302            input_tokens: 32_000,
2303            output_tokens: 16_000,
2304            cache_creation_input_tokens: 0,
2305            cache_read_input_tokens: 0,
2306        },
2307    ));
2308    cx.run_until_parked();
2309    thread.read_with(cx, |thread, _| {
2310        assert_eq!(
2311            thread.to_markdown(),
2312            indoc! {"
2313                ## User
2314
2315                Hello
2316
2317                ## Assistant
2318
2319                Hey!
2320            "}
2321        );
2322        assert_eq!(
2323            thread.latest_token_usage(),
2324            Some(acp_thread::TokenUsage {
2325                used_tokens: 32_000 + 16_000,
2326                max_tokens: 1_000_000,
2327                input_tokens: 32_000,
2328                output_tokens: 16_000,
2329            })
2330        );
2331    });
2332
2333    thread
2334        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2335        .unwrap();
2336    cx.run_until_parked();
2337    thread.read_with(cx, |thread, _| {
2338        assert_eq!(thread.to_markdown(), "");
2339        assert_eq!(thread.latest_token_usage(), None);
2340    });
2341
2342    // Ensure we can still send a new message after truncation.
2343    thread
2344        .update(cx, |thread, cx| {
2345            thread.send(UserMessageId::new(), ["Hi"], cx)
2346        })
2347        .unwrap();
2348    thread.update(cx, |thread, _cx| {
2349        assert_eq!(
2350            thread.to_markdown(),
2351            indoc! {"
2352                ## User
2353
2354                Hi
2355            "}
2356        );
2357    });
2358    cx.run_until_parked();
2359    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2360    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2361        language_model::TokenUsage {
2362            input_tokens: 40_000,
2363            output_tokens: 20_000,
2364            cache_creation_input_tokens: 0,
2365            cache_read_input_tokens: 0,
2366        },
2367    ));
2368    cx.run_until_parked();
2369    thread.read_with(cx, |thread, _| {
2370        assert_eq!(
2371            thread.to_markdown(),
2372            indoc! {"
2373                ## User
2374
2375                Hi
2376
2377                ## Assistant
2378
2379                Ahoy!
2380            "}
2381        );
2382
2383        assert_eq!(
2384            thread.latest_token_usage(),
2385            Some(acp_thread::TokenUsage {
2386                used_tokens: 40_000 + 20_000,
2387                max_tokens: 1_000_000,
2388                input_tokens: 40_000,
2389                output_tokens: 20_000,
2390            })
2391        );
2392    });
2393}
2394
2395#[gpui::test]
2396async fn test_truncate_second_message(cx: &mut TestAppContext) {
2397    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2398    let fake_model = model.as_fake();
2399
2400    thread
2401        .update(cx, |thread, cx| {
2402            thread.send(UserMessageId::new(), ["Message 1"], cx)
2403        })
2404        .unwrap();
2405    cx.run_until_parked();
2406    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
2407    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2408        language_model::TokenUsage {
2409            input_tokens: 32_000,
2410            output_tokens: 16_000,
2411            cache_creation_input_tokens: 0,
2412            cache_read_input_tokens: 0,
2413        },
2414    ));
2415    fake_model.end_last_completion_stream();
2416    cx.run_until_parked();
2417
2418    let assert_first_message_state = |cx: &mut TestAppContext| {
2419        thread.clone().read_with(cx, |thread, _| {
2420            assert_eq!(
2421                thread.to_markdown(),
2422                indoc! {"
2423                    ## User
2424
2425                    Message 1
2426
2427                    ## Assistant
2428
2429                    Message 1 response
2430                "}
2431            );
2432
2433            assert_eq!(
2434                thread.latest_token_usage(),
2435                Some(acp_thread::TokenUsage {
2436                    used_tokens: 32_000 + 16_000,
2437                    max_tokens: 1_000_000,
2438                    input_tokens: 32_000,
2439                    output_tokens: 16_000,
2440                })
2441            );
2442        });
2443    };
2444
2445    assert_first_message_state(cx);
2446
2447    let second_message_id = UserMessageId::new();
2448    thread
2449        .update(cx, |thread, cx| {
2450            thread.send(second_message_id.clone(), ["Message 2"], cx)
2451        })
2452        .unwrap();
2453    cx.run_until_parked();
2454
2455    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
2456    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2457        language_model::TokenUsage {
2458            input_tokens: 40_000,
2459            output_tokens: 20_000,
2460            cache_creation_input_tokens: 0,
2461            cache_read_input_tokens: 0,
2462        },
2463    ));
2464    fake_model.end_last_completion_stream();
2465    cx.run_until_parked();
2466
2467    thread.read_with(cx, |thread, _| {
2468        assert_eq!(
2469            thread.to_markdown(),
2470            indoc! {"
2471                ## User
2472
2473                Message 1
2474
2475                ## Assistant
2476
2477                Message 1 response
2478
2479                ## User
2480
2481                Message 2
2482
2483                ## Assistant
2484
2485                Message 2 response
2486            "}
2487        );
2488
2489        assert_eq!(
2490            thread.latest_token_usage(),
2491            Some(acp_thread::TokenUsage {
2492                used_tokens: 40_000 + 20_000,
2493                max_tokens: 1_000_000,
2494                input_tokens: 40_000,
2495                output_tokens: 20_000,
2496            })
2497        );
2498    });
2499
2500    thread
2501        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
2502        .unwrap();
2503    cx.run_until_parked();
2504
2505    assert_first_message_state(cx);
2506}
2507
2508#[gpui::test]
2509async fn test_title_generation(cx: &mut TestAppContext) {
2510    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2511    let fake_model = model.as_fake();
2512
2513    let summary_model = Arc::new(FakeLanguageModel::default());
2514    thread.update(cx, |thread, cx| {
2515        thread.set_summarization_model(Some(summary_model.clone()), cx)
2516    });
2517
2518    let send = thread
2519        .update(cx, |thread, cx| {
2520            thread.send(UserMessageId::new(), ["Hello"], cx)
2521        })
2522        .unwrap();
2523    cx.run_until_parked();
2524
2525    fake_model.send_last_completion_stream_text_chunk("Hey!");
2526    fake_model.end_last_completion_stream();
2527    cx.run_until_parked();
2528    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "New Thread"));
2529
2530    // Ensure the summary model has been invoked to generate a title.
2531    summary_model.send_last_completion_stream_text_chunk("Hello ");
2532    summary_model.send_last_completion_stream_text_chunk("world\nG");
2533    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
2534    summary_model.end_last_completion_stream();
2535    send.collect::<Vec<_>>().await;
2536    cx.run_until_parked();
2537    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
2538
2539    // Send another message, ensuring no title is generated this time.
2540    let send = thread
2541        .update(cx, |thread, cx| {
2542            thread.send(UserMessageId::new(), ["Hello again"], cx)
2543        })
2544        .unwrap();
2545    cx.run_until_parked();
2546    fake_model.send_last_completion_stream_text_chunk("Hey again!");
2547    fake_model.end_last_completion_stream();
2548    cx.run_until_parked();
2549    assert_eq!(summary_model.pending_completions(), Vec::new());
2550    send.collect::<Vec<_>>().await;
2551    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
2552}
2553
2554#[gpui::test]
2555async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
2556    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2557    let fake_model = model.as_fake();
2558
2559    let _events = thread
2560        .update(cx, |thread, cx| {
2561            thread.add_tool(ToolRequiringPermission);
2562            thread.add_tool(EchoTool);
2563            thread.send(UserMessageId::new(), ["Hey!"], cx)
2564        })
2565        .unwrap();
2566    cx.run_until_parked();
2567
2568    let permission_tool_use = LanguageModelToolUse {
2569        id: "tool_id_1".into(),
2570        name: ToolRequiringPermission::name().into(),
2571        raw_input: "{}".into(),
2572        input: json!({}),
2573        is_input_complete: true,
2574        thought_signature: None,
2575    };
2576    let echo_tool_use = LanguageModelToolUse {
2577        id: "tool_id_2".into(),
2578        name: EchoTool::name().into(),
2579        raw_input: json!({"text": "test"}).to_string(),
2580        input: json!({"text": "test"}),
2581        is_input_complete: true,
2582        thought_signature: None,
2583    };
2584    fake_model.send_last_completion_stream_text_chunk("Hi!");
2585    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2586        permission_tool_use,
2587    ));
2588    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2589        echo_tool_use.clone(),
2590    ));
2591    fake_model.end_last_completion_stream();
2592    cx.run_until_parked();
2593
2594    // Ensure pending tools are skipped when building a request.
2595    let request = thread
2596        .read_with(cx, |thread, cx| {
2597            thread.build_completion_request(CompletionIntent::EditFile, cx)
2598        })
2599        .unwrap();
2600    assert_eq!(
2601        request.messages[1..],
2602        vec![
2603            LanguageModelRequestMessage {
2604                role: Role::User,
2605                content: vec!["Hey!".into()],
2606                cache: true,
2607                reasoning_details: None,
2608            },
2609            LanguageModelRequestMessage {
2610                role: Role::Assistant,
2611                content: vec![
2612                    MessageContent::Text("Hi!".into()),
2613                    MessageContent::ToolUse(echo_tool_use.clone())
2614                ],
2615                cache: false,
2616                reasoning_details: None,
2617            },
2618            LanguageModelRequestMessage {
2619                role: Role::User,
2620                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
2621                    tool_use_id: echo_tool_use.id.clone(),
2622                    tool_name: echo_tool_use.name,
2623                    is_error: false,
2624                    content: "test".into(),
2625                    output: Some("test".into())
2626                })],
2627                cache: false,
2628                reasoning_details: None,
2629            },
2630        ],
2631    );
2632}
2633
2634#[gpui::test]
2635async fn test_agent_connection(cx: &mut TestAppContext) {
2636    cx.update(settings::init);
2637    let templates = Templates::new();
2638
2639    // Initialize language model system with test provider
2640    cx.update(|cx| {
2641        gpui_tokio::init(cx);
2642
2643        let http_client = FakeHttpClient::with_404_response();
2644        let clock = Arc::new(clock::FakeSystemClock::new());
2645        let client = Client::new(clock, http_client, cx);
2646        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
2647        language_model::init(client.clone(), cx);
2648        language_models::init(user_store, client.clone(), cx);
2649        LanguageModelRegistry::test(cx);
2650    });
2651    cx.executor().forbid_parking();
2652
2653    // Create a project for new_thread
2654    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
2655    fake_fs.insert_tree(path!("/test"), json!({})).await;
2656    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
2657    let cwd = Path::new("/test");
2658    let thread_store = cx.new(|cx| ThreadStore::new(cx));
2659
2660    // Create agent and connection
2661    let agent = NativeAgent::new(
2662        project.clone(),
2663        thread_store,
2664        templates.clone(),
2665        None,
2666        fake_fs.clone(),
2667        &mut cx.to_async(),
2668    )
2669    .await
2670    .unwrap();
2671    let connection = NativeAgentConnection(agent.clone());
2672
2673    // Create a thread using new_thread
2674    let connection_rc = Rc::new(connection.clone());
2675    let acp_thread = cx
2676        .update(|cx| connection_rc.new_thread(project, cwd, cx))
2677        .await
2678        .expect("new_thread should succeed");
2679
2680    // Get the session_id from the AcpThread
2681    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
2682
2683    // Test model_selector returns Some
2684    let selector_opt = connection.model_selector(&session_id);
2685    assert!(
2686        selector_opt.is_some(),
2687        "agent should always support ModelSelector"
2688    );
2689    let selector = selector_opt.unwrap();
2690
2691    // Test list_models
2692    let listed_models = cx
2693        .update(|cx| selector.list_models(cx))
2694        .await
2695        .expect("list_models should succeed");
2696    let AgentModelList::Grouped(listed_models) = listed_models else {
2697        panic!("Unexpected model list type");
2698    };
2699    assert!(!listed_models.is_empty(), "should have at least one model");
2700    assert_eq!(
2701        listed_models[&AgentModelGroupName("Fake".into())][0]
2702            .id
2703            .0
2704            .as_ref(),
2705        "fake/fake"
2706    );
2707
2708    // Test selected_model returns the default
2709    let model = cx
2710        .update(|cx| selector.selected_model(cx))
2711        .await
2712        .expect("selected_model should succeed");
2713    let model = cx
2714        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
2715        .unwrap();
2716    let model = model.as_fake();
2717    assert_eq!(model.id().0, "fake", "should return default model");
2718
2719    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
2720    cx.run_until_parked();
2721    model.send_last_completion_stream_text_chunk("def");
2722    cx.run_until_parked();
2723    acp_thread.read_with(cx, |thread, cx| {
2724        assert_eq!(
2725            thread.to_markdown(cx),
2726            indoc! {"
2727                ## User
2728
2729                abc
2730
2731                ## Assistant
2732
2733                def
2734
2735            "}
2736        )
2737    });
2738
2739    // Test cancel
2740    cx.update(|cx| connection.cancel(&session_id, cx));
2741    request.await.expect("prompt should fail gracefully");
2742
2743    // Ensure that dropping the ACP thread causes the native thread to be
2744    // dropped as well.
2745    cx.update(|_| drop(acp_thread));
2746    let result = cx
2747        .update(|cx| {
2748            connection.prompt(
2749                Some(acp_thread::UserMessageId::new()),
2750                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
2751                cx,
2752            )
2753        })
2754        .await;
2755    assert_eq!(
2756        result.as_ref().unwrap_err().to_string(),
2757        "Session not found",
2758        "unexpected result: {:?}",
2759        result
2760    );
2761}
2762
2763#[gpui::test]
2764async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
2765    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
2766    thread.update(cx, |thread, _cx| thread.add_tool(ThinkingTool));
2767    let fake_model = model.as_fake();
2768
2769    let mut events = thread
2770        .update(cx, |thread, cx| {
2771            thread.send(UserMessageId::new(), ["Think"], cx)
2772        })
2773        .unwrap();
2774    cx.run_until_parked();
2775
2776    // Simulate streaming partial input.
2777    let input = json!({});
2778    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2779        LanguageModelToolUse {
2780            id: "1".into(),
2781            name: ThinkingTool::name().into(),
2782            raw_input: input.to_string(),
2783            input,
2784            is_input_complete: false,
2785            thought_signature: None,
2786        },
2787    ));
2788
2789    // Input streaming completed
2790    let input = json!({ "content": "Thinking hard!" });
2791    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2792        LanguageModelToolUse {
2793            id: "1".into(),
2794            name: "thinking".into(),
2795            raw_input: input.to_string(),
2796            input,
2797            is_input_complete: true,
2798            thought_signature: None,
2799        },
2800    ));
2801    fake_model.end_last_completion_stream();
2802    cx.run_until_parked();
2803
2804    let tool_call = expect_tool_call(&mut events).await;
2805    assert_eq!(
2806        tool_call,
2807        acp::ToolCall::new("1", "Thinking")
2808            .kind(acp::ToolKind::Think)
2809            .raw_input(json!({}))
2810            .meta(acp::Meta::from_iter([(
2811                "tool_name".into(),
2812                "thinking".into()
2813            )]))
2814    );
2815    let update = expect_tool_call_update_fields(&mut events).await;
2816    assert_eq!(
2817        update,
2818        acp::ToolCallUpdate::new(
2819            "1",
2820            acp::ToolCallUpdateFields::new()
2821                .title("Thinking")
2822                .kind(acp::ToolKind::Think)
2823                .raw_input(json!({ "content": "Thinking hard!"}))
2824        )
2825    );
2826    let update = expect_tool_call_update_fields(&mut events).await;
2827    assert_eq!(
2828        update,
2829        acp::ToolCallUpdate::new(
2830            "1",
2831            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
2832        )
2833    );
2834    let update = expect_tool_call_update_fields(&mut events).await;
2835    assert_eq!(
2836        update,
2837        acp::ToolCallUpdate::new(
2838            "1",
2839            acp::ToolCallUpdateFields::new().content(vec!["Thinking hard!".into()])
2840        )
2841    );
2842    let update = expect_tool_call_update_fields(&mut events).await;
2843    assert_eq!(
2844        update,
2845        acp::ToolCallUpdate::new(
2846            "1",
2847            acp::ToolCallUpdateFields::new()
2848                .status(acp::ToolCallStatus::Completed)
2849                .raw_output("Finished thinking.")
2850        )
2851    );
2852}
2853
2854#[gpui::test]
2855async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
2856    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
2857    let fake_model = model.as_fake();
2858
2859    let mut events = thread
2860        .update(cx, |thread, cx| {
2861            thread.send(UserMessageId::new(), ["Hello!"], cx)
2862        })
2863        .unwrap();
2864    cx.run_until_parked();
2865
2866    fake_model.send_last_completion_stream_text_chunk("Hey!");
2867    fake_model.end_last_completion_stream();
2868
2869    let mut retry_events = Vec::new();
2870    while let Some(Ok(event)) = events.next().await {
2871        match event {
2872            ThreadEvent::Retry(retry_status) => {
2873                retry_events.push(retry_status);
2874            }
2875            ThreadEvent::Stop(..) => break,
2876            _ => {}
2877        }
2878    }
2879
2880    assert_eq!(retry_events.len(), 0);
2881    thread.read_with(cx, |thread, _cx| {
2882        assert_eq!(
2883            thread.to_markdown(),
2884            indoc! {"
2885                ## User
2886
2887                Hello!
2888
2889                ## Assistant
2890
2891                Hey!
2892            "}
2893        )
2894    });
2895}
2896
2897#[gpui::test]
2898async fn test_send_retry_on_error(cx: &mut TestAppContext) {
2899    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
2900    let fake_model = model.as_fake();
2901
2902    let mut events = thread
2903        .update(cx, |thread, cx| {
2904            thread.send(UserMessageId::new(), ["Hello!"], cx)
2905        })
2906        .unwrap();
2907    cx.run_until_parked();
2908
2909    fake_model.send_last_completion_stream_text_chunk("Hey,");
2910    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
2911        provider: LanguageModelProviderName::new("Anthropic"),
2912        retry_after: Some(Duration::from_secs(3)),
2913    });
2914    fake_model.end_last_completion_stream();
2915
2916    cx.executor().advance_clock(Duration::from_secs(3));
2917    cx.run_until_parked();
2918
2919    fake_model.send_last_completion_stream_text_chunk("there!");
2920    fake_model.end_last_completion_stream();
2921    cx.run_until_parked();
2922
2923    let mut retry_events = Vec::new();
2924    while let Some(Ok(event)) = events.next().await {
2925        match event {
2926            ThreadEvent::Retry(retry_status) => {
2927                retry_events.push(retry_status);
2928            }
2929            ThreadEvent::Stop(..) => break,
2930            _ => {}
2931        }
2932    }
2933
2934    assert_eq!(retry_events.len(), 1);
2935    assert!(matches!(
2936        retry_events[0],
2937        acp_thread::RetryStatus { attempt: 1, .. }
2938    ));
2939    thread.read_with(cx, |thread, _cx| {
2940        assert_eq!(
2941            thread.to_markdown(),
2942            indoc! {"
2943                ## User
2944
2945                Hello!
2946
2947                ## Assistant
2948
2949                Hey,
2950
2951                [resume]
2952
2953                ## Assistant
2954
2955                there!
2956            "}
2957        )
2958    });
2959}
2960
2961#[gpui::test]
2962async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
2963    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
2964    let fake_model = model.as_fake();
2965
2966    let events = thread
2967        .update(cx, |thread, cx| {
2968            thread.add_tool(EchoTool);
2969            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
2970        })
2971        .unwrap();
2972    cx.run_until_parked();
2973
2974    let tool_use_1 = LanguageModelToolUse {
2975        id: "tool_1".into(),
2976        name: EchoTool::name().into(),
2977        raw_input: json!({"text": "test"}).to_string(),
2978        input: json!({"text": "test"}),
2979        is_input_complete: true,
2980        thought_signature: None,
2981    };
2982    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2983        tool_use_1.clone(),
2984    ));
2985    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
2986        provider: LanguageModelProviderName::new("Anthropic"),
2987        retry_after: Some(Duration::from_secs(3)),
2988    });
2989    fake_model.end_last_completion_stream();
2990
2991    cx.executor().advance_clock(Duration::from_secs(3));
2992    let completion = fake_model.pending_completions().pop().unwrap();
2993    assert_eq!(
2994        completion.messages[1..],
2995        vec![
2996            LanguageModelRequestMessage {
2997                role: Role::User,
2998                content: vec!["Call the echo tool!".into()],
2999                cache: false,
3000                reasoning_details: None,
3001            },
3002            LanguageModelRequestMessage {
3003                role: Role::Assistant,
3004                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3005                cache: false,
3006                reasoning_details: None,
3007            },
3008            LanguageModelRequestMessage {
3009                role: Role::User,
3010                content: vec![language_model::MessageContent::ToolResult(
3011                    LanguageModelToolResult {
3012                        tool_use_id: tool_use_1.id.clone(),
3013                        tool_name: tool_use_1.name.clone(),
3014                        is_error: false,
3015                        content: "test".into(),
3016                        output: Some("test".into())
3017                    }
3018                )],
3019                cache: true,
3020                reasoning_details: None,
3021            },
3022        ]
3023    );
3024
3025    fake_model.send_last_completion_stream_text_chunk("Done");
3026    fake_model.end_last_completion_stream();
3027    cx.run_until_parked();
3028    events.collect::<Vec<_>>().await;
3029    thread.read_with(cx, |thread, _cx| {
3030        assert_eq!(
3031            thread.last_message(),
3032            Some(Message::Agent(AgentMessage {
3033                content: vec![AgentMessageContent::Text("Done".into())],
3034                tool_results: IndexMap::default(),
3035                reasoning_details: None,
3036            }))
3037        );
3038    })
3039}
3040
3041#[gpui::test]
3042async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3043    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3044    let fake_model = model.as_fake();
3045
3046    let mut events = thread
3047        .update(cx, |thread, cx| {
3048            thread.send(UserMessageId::new(), ["Hello!"], cx)
3049        })
3050        .unwrap();
3051    cx.run_until_parked();
3052
3053    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3054        fake_model.send_last_completion_stream_error(
3055            LanguageModelCompletionError::ServerOverloaded {
3056                provider: LanguageModelProviderName::new("Anthropic"),
3057                retry_after: Some(Duration::from_secs(3)),
3058            },
3059        );
3060        fake_model.end_last_completion_stream();
3061        cx.executor().advance_clock(Duration::from_secs(3));
3062        cx.run_until_parked();
3063    }
3064
3065    let mut errors = Vec::new();
3066    let mut retry_events = Vec::new();
3067    while let Some(event) = events.next().await {
3068        match event {
3069            Ok(ThreadEvent::Retry(retry_status)) => {
3070                retry_events.push(retry_status);
3071            }
3072            Ok(ThreadEvent::Stop(..)) => break,
3073            Err(error) => errors.push(error),
3074            _ => {}
3075        }
3076    }
3077
3078    assert_eq!(
3079        retry_events.len(),
3080        crate::thread::MAX_RETRY_ATTEMPTS as usize
3081    );
3082    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3083        assert_eq!(retry_events[i].attempt, i + 1);
3084    }
3085    assert_eq!(errors.len(), 1);
3086    let error = errors[0]
3087        .downcast_ref::<LanguageModelCompletionError>()
3088        .unwrap();
3089    assert!(matches!(
3090        error,
3091        LanguageModelCompletionError::ServerOverloaded { .. }
3092    ));
3093}
3094
3095/// Filters out the stop events for asserting against in tests
3096fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3097    result_events
3098        .into_iter()
3099        .filter_map(|event| match event.unwrap() {
3100            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3101            _ => None,
3102        })
3103        .collect()
3104}
3105
3106struct ThreadTest {
3107    model: Arc<dyn LanguageModel>,
3108    thread: Entity<Thread>,
3109    project_context: Entity<ProjectContext>,
3110    context_server_store: Entity<ContextServerStore>,
3111    fs: Arc<FakeFs>,
3112}
3113
3114enum TestModel {
3115    Sonnet4,
3116    Fake,
3117}
3118
3119impl TestModel {
3120    fn id(&self) -> LanguageModelId {
3121        match self {
3122            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3123            TestModel::Fake => unreachable!(),
3124        }
3125    }
3126}
3127
3128async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3129    cx.executor().allow_parking();
3130
3131    let fs = FakeFs::new(cx.background_executor.clone());
3132    fs.create_dir(paths::settings_file().parent().unwrap())
3133        .await
3134        .unwrap();
3135    fs.insert_file(
3136        paths::settings_file(),
3137        json!({
3138            "agent": {
3139                "default_profile": "test-profile",
3140                "profiles": {
3141                    "test-profile": {
3142                        "name": "Test Profile",
3143                        "tools": {
3144                            EchoTool::name(): true,
3145                            DelayTool::name(): true,
3146                            WordListTool::name(): true,
3147                            ToolRequiringPermission::name(): true,
3148                            InfiniteTool::name(): true,
3149                            CancellationAwareTool::name(): true,
3150                            ThinkingTool::name(): true,
3151                            "terminal": true,
3152                        }
3153                    }
3154                }
3155            }
3156        })
3157        .to_string()
3158        .into_bytes(),
3159    )
3160    .await;
3161
3162    cx.update(|cx| {
3163        settings::init(cx);
3164
3165        match model {
3166            TestModel::Fake => {}
3167            TestModel::Sonnet4 => {
3168                gpui_tokio::init(cx);
3169                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3170                cx.set_http_client(Arc::new(http_client));
3171                let client = Client::production(cx);
3172                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3173                language_model::init(client.clone(), cx);
3174                language_models::init(user_store, client.clone(), cx);
3175            }
3176        };
3177
3178        watch_settings(fs.clone(), cx);
3179    });
3180
3181    let templates = Templates::new();
3182
3183    fs.insert_tree(path!("/test"), json!({})).await;
3184    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3185
3186    let model = cx
3187        .update(|cx| {
3188            if let TestModel::Fake = model {
3189                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3190            } else {
3191                let model_id = model.id();
3192                let models = LanguageModelRegistry::read_global(cx);
3193                let model = models
3194                    .available_models(cx)
3195                    .find(|model| model.id() == model_id)
3196                    .unwrap();
3197
3198                let provider = models.provider(&model.provider_id()).unwrap();
3199                let authenticated = provider.authenticate(cx);
3200
3201                cx.spawn(async move |_cx| {
3202                    authenticated.await.unwrap();
3203                    model
3204                })
3205            }
3206        })
3207        .await;
3208
3209    let project_context = cx.new(|_cx| ProjectContext::default());
3210    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3211    let context_server_registry =
3212        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3213    let thread = cx.new(|cx| {
3214        Thread::new(
3215            project,
3216            project_context.clone(),
3217            context_server_registry,
3218            templates,
3219            Some(model.clone()),
3220            cx,
3221        )
3222    });
3223    ThreadTest {
3224        model,
3225        thread,
3226        project_context,
3227        context_server_store,
3228        fs,
3229    }
3230}
3231
3232#[cfg(test)]
3233#[ctor::ctor]
3234fn init_logger() {
3235    if std::env::var("RUST_LOG").is_ok() {
3236        env_logger::init();
3237    }
3238}
3239
3240fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
3241    let fs = fs.clone();
3242    cx.spawn({
3243        async move |cx| {
3244            let mut new_settings_content_rx = settings::watch_config_file(
3245                cx.background_executor(),
3246                fs,
3247                paths::settings_file().clone(),
3248            );
3249
3250            while let Some(new_settings_content) = new_settings_content_rx.next().await {
3251                cx.update(|cx| {
3252                    SettingsStore::update_global(cx, |settings, cx| {
3253                        settings.set_user_settings(&new_settings_content, cx)
3254                    })
3255                })
3256                .ok();
3257            }
3258        }
3259    })
3260    .detach();
3261}
3262
3263fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
3264    completion
3265        .tools
3266        .iter()
3267        .map(|tool| tool.name.clone())
3268        .collect()
3269}
3270
3271fn setup_context_server(
3272    name: &'static str,
3273    tools: Vec<context_server::types::Tool>,
3274    context_server_store: &Entity<ContextServerStore>,
3275    cx: &mut TestAppContext,
3276) -> mpsc::UnboundedReceiver<(
3277    context_server::types::CallToolParams,
3278    oneshot::Sender<context_server::types::CallToolResponse>,
3279)> {
3280    cx.update(|cx| {
3281        let mut settings = ProjectSettings::get_global(cx).clone();
3282        settings.context_servers.insert(
3283            name.into(),
3284            project::project_settings::ContextServerSettings::Stdio {
3285                enabled: true,
3286                remote: false,
3287                command: ContextServerCommand {
3288                    path: "somebinary".into(),
3289                    args: Vec::new(),
3290                    env: None,
3291                    timeout: None,
3292                },
3293            },
3294        );
3295        ProjectSettings::override_global(settings, cx);
3296    });
3297
3298    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
3299    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
3300        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
3301            context_server::types::InitializeResponse {
3302                protocol_version: context_server::types::ProtocolVersion(
3303                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
3304                ),
3305                server_info: context_server::types::Implementation {
3306                    name: name.into(),
3307                    version: "1.0.0".to_string(),
3308                },
3309                capabilities: context_server::types::ServerCapabilities {
3310                    tools: Some(context_server::types::ToolsCapabilities {
3311                        list_changed: Some(true),
3312                    }),
3313                    ..Default::default()
3314                },
3315                meta: None,
3316            }
3317        })
3318        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
3319            let tools = tools.clone();
3320            async move {
3321                context_server::types::ListToolsResponse {
3322                    tools,
3323                    next_cursor: None,
3324                    meta: None,
3325                }
3326            }
3327        })
3328        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
3329            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
3330            async move {
3331                let (response_tx, response_rx) = oneshot::channel();
3332                mcp_tool_calls_tx
3333                    .unbounded_send((params, response_tx))
3334                    .unwrap();
3335                response_rx.await.unwrap()
3336            }
3337        });
3338    context_server_store.update(cx, |store, cx| {
3339        store.start_server(
3340            Arc::new(ContextServer::new(
3341                ContextServerId(name.into()),
3342                Arc::new(fake_transport),
3343            )),
3344            cx,
3345        );
3346    });
3347    cx.run_until_parked();
3348    mcp_tool_calls_rx
3349}
3350
3351#[gpui::test]
3352async fn test_tokens_before_message(cx: &mut TestAppContext) {
3353    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3354    let fake_model = model.as_fake();
3355
3356    // First message
3357    let message_1_id = UserMessageId::new();
3358    thread
3359        .update(cx, |thread, cx| {
3360            thread.send(message_1_id.clone(), ["First message"], cx)
3361        })
3362        .unwrap();
3363    cx.run_until_parked();
3364
3365    // Before any response, tokens_before_message should return None for first message
3366    thread.read_with(cx, |thread, _| {
3367        assert_eq!(
3368            thread.tokens_before_message(&message_1_id),
3369            None,
3370            "First message should have no tokens before it"
3371        );
3372    });
3373
3374    // Complete first message with usage
3375    fake_model.send_last_completion_stream_text_chunk("Response 1");
3376    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3377        language_model::TokenUsage {
3378            input_tokens: 100,
3379            output_tokens: 50,
3380            cache_creation_input_tokens: 0,
3381            cache_read_input_tokens: 0,
3382        },
3383    ));
3384    fake_model.end_last_completion_stream();
3385    cx.run_until_parked();
3386
3387    // First message still has no tokens before it
3388    thread.read_with(cx, |thread, _| {
3389        assert_eq!(
3390            thread.tokens_before_message(&message_1_id),
3391            None,
3392            "First message should still have no tokens before it after response"
3393        );
3394    });
3395
3396    // Second message
3397    let message_2_id = UserMessageId::new();
3398    thread
3399        .update(cx, |thread, cx| {
3400            thread.send(message_2_id.clone(), ["Second message"], cx)
3401        })
3402        .unwrap();
3403    cx.run_until_parked();
3404
3405    // Second message should have first message's input tokens before it
3406    thread.read_with(cx, |thread, _| {
3407        assert_eq!(
3408            thread.tokens_before_message(&message_2_id),
3409            Some(100),
3410            "Second message should have 100 tokens before it (from first request)"
3411        );
3412    });
3413
3414    // Complete second message
3415    fake_model.send_last_completion_stream_text_chunk("Response 2");
3416    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3417        language_model::TokenUsage {
3418            input_tokens: 250, // Total for this request (includes previous context)
3419            output_tokens: 75,
3420            cache_creation_input_tokens: 0,
3421            cache_read_input_tokens: 0,
3422        },
3423    ));
3424    fake_model.end_last_completion_stream();
3425    cx.run_until_parked();
3426
3427    // Third message
3428    let message_3_id = UserMessageId::new();
3429    thread
3430        .update(cx, |thread, cx| {
3431            thread.send(message_3_id.clone(), ["Third message"], cx)
3432        })
3433        .unwrap();
3434    cx.run_until_parked();
3435
3436    // Third message should have second message's input tokens (250) before it
3437    thread.read_with(cx, |thread, _| {
3438        assert_eq!(
3439            thread.tokens_before_message(&message_3_id),
3440            Some(250),
3441            "Third message should have 250 tokens before it (from second request)"
3442        );
3443        // Second message should still have 100
3444        assert_eq!(
3445            thread.tokens_before_message(&message_2_id),
3446            Some(100),
3447            "Second message should still have 100 tokens before it"
3448        );
3449        // First message still has none
3450        assert_eq!(
3451            thread.tokens_before_message(&message_1_id),
3452            None,
3453            "First message should still have no tokens before it"
3454        );
3455    });
3456}
3457
3458#[gpui::test]
3459async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
3460    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3461    let fake_model = model.as_fake();
3462
3463    // Set up three messages with responses
3464    let message_1_id = UserMessageId::new();
3465    thread
3466        .update(cx, |thread, cx| {
3467            thread.send(message_1_id.clone(), ["Message 1"], cx)
3468        })
3469        .unwrap();
3470    cx.run_until_parked();
3471    fake_model.send_last_completion_stream_text_chunk("Response 1");
3472    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3473        language_model::TokenUsage {
3474            input_tokens: 100,
3475            output_tokens: 50,
3476            cache_creation_input_tokens: 0,
3477            cache_read_input_tokens: 0,
3478        },
3479    ));
3480    fake_model.end_last_completion_stream();
3481    cx.run_until_parked();
3482
3483    let message_2_id = UserMessageId::new();
3484    thread
3485        .update(cx, |thread, cx| {
3486            thread.send(message_2_id.clone(), ["Message 2"], cx)
3487        })
3488        .unwrap();
3489    cx.run_until_parked();
3490    fake_model.send_last_completion_stream_text_chunk("Response 2");
3491    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3492        language_model::TokenUsage {
3493            input_tokens: 250,
3494            output_tokens: 75,
3495            cache_creation_input_tokens: 0,
3496            cache_read_input_tokens: 0,
3497        },
3498    ));
3499    fake_model.end_last_completion_stream();
3500    cx.run_until_parked();
3501
3502    // Verify initial state
3503    thread.read_with(cx, |thread, _| {
3504        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
3505    });
3506
3507    // Truncate at message 2 (removes message 2 and everything after)
3508    thread
3509        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
3510        .unwrap();
3511    cx.run_until_parked();
3512
3513    // After truncation, message_2_id no longer exists, so lookup should return None
3514    thread.read_with(cx, |thread, _| {
3515        assert_eq!(
3516            thread.tokens_before_message(&message_2_id),
3517            None,
3518            "After truncation, message 2 no longer exists"
3519        );
3520        // Message 1 still exists but has no tokens before it
3521        assert_eq!(
3522            thread.tokens_before_message(&message_1_id),
3523            None,
3524            "First message still has no tokens before it"
3525        );
3526    });
3527}
3528
3529#[gpui::test]
3530async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
3531    init_test(cx);
3532
3533    let fs = FakeFs::new(cx.executor());
3534    fs.insert_tree("/root", json!({})).await;
3535    let project = Project::test(fs, ["/root".as_ref()], cx).await;
3536
3537    // Test 1: Deny rule blocks command
3538    {
3539        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
3540        let environment = Rc::new(FakeThreadEnvironment {
3541            handle: handle.clone(),
3542        });
3543
3544        cx.update(|cx| {
3545            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
3546            settings.tool_permissions.tools.insert(
3547                "terminal".into(),
3548                agent_settings::ToolRules {
3549                    default_mode: settings::ToolPermissionMode::Confirm,
3550                    always_allow: vec![],
3551                    always_deny: vec![
3552                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
3553                    ],
3554                    always_confirm: vec![],
3555                    invalid_patterns: vec![],
3556                },
3557            );
3558            agent_settings::AgentSettings::override_global(settings, cx);
3559        });
3560
3561        #[allow(clippy::arc_with_non_send_sync)]
3562        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
3563        let (event_stream, _rx) = crate::ToolCallEventStream::test();
3564
3565        let task = cx.update(|cx| {
3566            tool.run(
3567                crate::TerminalToolInput {
3568                    command: "rm -rf /".to_string(),
3569                    cd: ".".to_string(),
3570                    timeout_ms: None,
3571                },
3572                event_stream,
3573                cx,
3574            )
3575        });
3576
3577        let result = task.await;
3578        assert!(
3579            result.is_err(),
3580            "expected command to be blocked by deny rule"
3581        );
3582        assert!(
3583            result.unwrap_err().to_string().contains("blocked"),
3584            "error should mention the command was blocked"
3585        );
3586    }
3587
3588    // Test 2: Allow rule skips confirmation (and overrides default_mode: Deny)
3589    {
3590        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_with_immediate_exit(cx, 0)));
3591        let environment = Rc::new(FakeThreadEnvironment {
3592            handle: handle.clone(),
3593        });
3594
3595        cx.update(|cx| {
3596            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
3597            settings.always_allow_tool_actions = false;
3598            settings.tool_permissions.tools.insert(
3599                "terminal".into(),
3600                agent_settings::ToolRules {
3601                    default_mode: settings::ToolPermissionMode::Deny,
3602                    always_allow: vec![
3603                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
3604                    ],
3605                    always_deny: vec![],
3606                    always_confirm: vec![],
3607                    invalid_patterns: vec![],
3608                },
3609            );
3610            agent_settings::AgentSettings::override_global(settings, cx);
3611        });
3612
3613        #[allow(clippy::arc_with_non_send_sync)]
3614        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
3615        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
3616
3617        let task = cx.update(|cx| {
3618            tool.run(
3619                crate::TerminalToolInput {
3620                    command: "echo hello".to_string(),
3621                    cd: ".".to_string(),
3622                    timeout_ms: None,
3623                },
3624                event_stream,
3625                cx,
3626            )
3627        });
3628
3629        let update = rx.expect_update_fields().await;
3630        assert!(
3631            update.content.iter().any(|blocks| {
3632                blocks
3633                    .iter()
3634                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
3635            }),
3636            "expected terminal content (allow rule should skip confirmation and override default deny)"
3637        );
3638
3639        let result = task.await;
3640        assert!(
3641            result.is_ok(),
3642            "expected command to succeed without confirmation"
3643        );
3644    }
3645
3646    // Test 3: always_allow_tool_actions=true overrides always_confirm patterns
3647    {
3648        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_with_immediate_exit(cx, 0)));
3649        let environment = Rc::new(FakeThreadEnvironment {
3650            handle: handle.clone(),
3651        });
3652
3653        cx.update(|cx| {
3654            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
3655            settings.always_allow_tool_actions = true;
3656            settings.tool_permissions.tools.insert(
3657                "terminal".into(),
3658                agent_settings::ToolRules {
3659                    default_mode: settings::ToolPermissionMode::Allow,
3660                    always_allow: vec![],
3661                    always_deny: vec![],
3662                    always_confirm: vec![
3663                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
3664                    ],
3665                    invalid_patterns: vec![],
3666                },
3667            );
3668            agent_settings::AgentSettings::override_global(settings, cx);
3669        });
3670
3671        #[allow(clippy::arc_with_non_send_sync)]
3672        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
3673        let (event_stream, _rx) = crate::ToolCallEventStream::test();
3674
3675        let task = cx.update(|cx| {
3676            tool.run(
3677                crate::TerminalToolInput {
3678                    command: "sudo rm file".to_string(),
3679                    cd: ".".to_string(),
3680                    timeout_ms: None,
3681                },
3682                event_stream,
3683                cx,
3684            )
3685        });
3686
3687        // With always_allow_tool_actions=true, confirm patterns are overridden
3688        task.await
3689            .expect("command should be allowed with always_allow_tool_actions=true");
3690    }
3691
3692    // Test 4: always_allow_tool_actions=true overrides default_mode: Deny
3693    {
3694        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_with_immediate_exit(cx, 0)));
3695        let environment = Rc::new(FakeThreadEnvironment {
3696            handle: handle.clone(),
3697        });
3698
3699        cx.update(|cx| {
3700            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
3701            settings.always_allow_tool_actions = true;
3702            settings.tool_permissions.tools.insert(
3703                "terminal".into(),
3704                agent_settings::ToolRules {
3705                    default_mode: settings::ToolPermissionMode::Deny,
3706                    always_allow: vec![],
3707                    always_deny: vec![],
3708                    always_confirm: vec![],
3709                    invalid_patterns: vec![],
3710                },
3711            );
3712            agent_settings::AgentSettings::override_global(settings, cx);
3713        });
3714
3715        #[allow(clippy::arc_with_non_send_sync)]
3716        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
3717        let (event_stream, _rx) = crate::ToolCallEventStream::test();
3718
3719        let task = cx.update(|cx| {
3720            tool.run(
3721                crate::TerminalToolInput {
3722                    command: "echo hello".to_string(),
3723                    cd: ".".to_string(),
3724                    timeout_ms: None,
3725                },
3726                event_stream,
3727                cx,
3728            )
3729        });
3730
3731        // With always_allow_tool_actions=true, even default_mode: Deny is overridden
3732        task.await
3733            .expect("command should be allowed with always_allow_tool_actions=true");
3734    }
3735}
3736
3737#[gpui::test]
3738async fn test_subagent_tool_is_present_when_feature_flag_enabled(cx: &mut TestAppContext) {
3739    init_test(cx);
3740
3741    cx.update(|cx| {
3742        cx.update_flags(true, vec!["subagents".to_string()]);
3743    });
3744
3745    let fs = FakeFs::new(cx.executor());
3746    fs.insert_tree(path!("/test"), json!({})).await;
3747    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3748    let project_context = cx.new(|_cx| ProjectContext::default());
3749    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3750    let context_server_registry =
3751        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3752    let model = Arc::new(FakeLanguageModel::default());
3753
3754    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
3755    let environment = Rc::new(FakeThreadEnvironment { handle });
3756
3757    let thread = cx.new(|cx| {
3758        let mut thread = Thread::new(
3759            project.clone(),
3760            project_context,
3761            context_server_registry,
3762            Templates::new(),
3763            Some(model),
3764            cx,
3765        );
3766        thread.add_default_tools(environment, cx);
3767        thread
3768    });
3769
3770    thread.read_with(cx, |thread, _| {
3771        assert!(
3772            thread.has_registered_tool("subagent"),
3773            "subagent tool should be present when feature flag is enabled"
3774        );
3775    });
3776}
3777
3778#[gpui::test]
3779async fn test_subagent_thread_inherits_parent_model(cx: &mut TestAppContext) {
3780    init_test(cx);
3781
3782    cx.update(|cx| {
3783        cx.update_flags(true, vec!["subagents".to_string()]);
3784    });
3785
3786    let fs = FakeFs::new(cx.executor());
3787    fs.insert_tree(path!("/test"), json!({})).await;
3788    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3789    let project_context = cx.new(|_cx| ProjectContext::default());
3790    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3791    let context_server_registry =
3792        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3793    let model = Arc::new(FakeLanguageModel::default());
3794
3795    let subagent_context = SubagentContext {
3796        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
3797        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
3798        depth: 1,
3799        summary_prompt: "Summarize".to_string(),
3800        context_low_prompt: "Context low".to_string(),
3801    };
3802
3803    let subagent = cx.new(|cx| {
3804        Thread::new_subagent(
3805            project.clone(),
3806            project_context,
3807            context_server_registry,
3808            Templates::new(),
3809            model.clone(),
3810            subagent_context,
3811            std::collections::BTreeMap::new(),
3812            cx,
3813        )
3814    });
3815
3816    subagent.read_with(cx, |thread, _| {
3817        assert!(thread.is_subagent());
3818        assert_eq!(thread.depth(), 1);
3819        assert!(thread.model().is_some());
3820    });
3821}
3822
3823#[gpui::test]
3824async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
3825    init_test(cx);
3826
3827    cx.update(|cx| {
3828        cx.update_flags(true, vec!["subagents".to_string()]);
3829    });
3830
3831    let fs = FakeFs::new(cx.executor());
3832    fs.insert_tree(path!("/test"), json!({})).await;
3833    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3834    let project_context = cx.new(|_cx| ProjectContext::default());
3835    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3836    let context_server_registry =
3837        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3838    let model = Arc::new(FakeLanguageModel::default());
3839
3840    let subagent_context = SubagentContext {
3841        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
3842        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
3843        depth: MAX_SUBAGENT_DEPTH,
3844        summary_prompt: "Summarize".to_string(),
3845        context_low_prompt: "Context low".to_string(),
3846    };
3847
3848    let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
3849    let environment = Rc::new(FakeThreadEnvironment { handle });
3850
3851    let deep_subagent = cx.new(|cx| {
3852        let mut thread = Thread::new_subagent(
3853            project.clone(),
3854            project_context,
3855            context_server_registry,
3856            Templates::new(),
3857            model.clone(),
3858            subagent_context,
3859            std::collections::BTreeMap::new(),
3860            cx,
3861        );
3862        thread.add_default_tools(environment, cx);
3863        thread
3864    });
3865
3866    deep_subagent.read_with(cx, |thread, _| {
3867        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
3868        assert!(
3869            !thread.has_registered_tool("subagent"),
3870            "subagent tool should not be present at max depth"
3871        );
3872    });
3873}
3874
3875#[gpui::test]
3876async fn test_subagent_receives_task_prompt(cx: &mut TestAppContext) {
3877    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3878    let fake_model = model.as_fake();
3879
3880    cx.update(|cx| {
3881        cx.update_flags(true, vec!["subagents".to_string()]);
3882    });
3883
3884    let subagent_context = SubagentContext {
3885        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
3886        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
3887        depth: 1,
3888        summary_prompt: "Summarize your work".to_string(),
3889        context_low_prompt: "Context low, wrap up".to_string(),
3890    };
3891
3892    let project = thread.read_with(cx, |t, _| t.project.clone());
3893    let project_context = cx.new(|_cx| ProjectContext::default());
3894    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3895    let context_server_registry =
3896        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3897
3898    let subagent = cx.new(|cx| {
3899        Thread::new_subagent(
3900            project.clone(),
3901            project_context,
3902            context_server_registry,
3903            Templates::new(),
3904            model.clone(),
3905            subagent_context,
3906            std::collections::BTreeMap::new(),
3907            cx,
3908        )
3909    });
3910
3911    let task_prompt = "Find all TODO comments in the codebase";
3912    subagent
3913        .update(cx, |thread, cx| thread.submit_user_message(task_prompt, cx))
3914        .unwrap();
3915    cx.run_until_parked();
3916
3917    let pending = fake_model.pending_completions();
3918    assert_eq!(pending.len(), 1, "should have one pending completion");
3919
3920    let messages = &pending[0].messages;
3921    let user_messages: Vec<_> = messages
3922        .iter()
3923        .filter(|m| m.role == language_model::Role::User)
3924        .collect();
3925    assert_eq!(user_messages.len(), 1, "should have one user message");
3926
3927    let content = &user_messages[0].content[0];
3928    assert!(
3929        content.to_str().unwrap().contains("TODO"),
3930        "task prompt should be in user message"
3931    );
3932}
3933
3934#[gpui::test]
3935async fn test_subagent_returns_summary_on_completion(cx: &mut TestAppContext) {
3936    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3937    let fake_model = model.as_fake();
3938
3939    cx.update(|cx| {
3940        cx.update_flags(true, vec!["subagents".to_string()]);
3941    });
3942
3943    let subagent_context = SubagentContext {
3944        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
3945        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
3946        depth: 1,
3947        summary_prompt: "Please summarize what you found".to_string(),
3948        context_low_prompt: "Context low, wrap up".to_string(),
3949    };
3950
3951    let project = thread.read_with(cx, |t, _| t.project.clone());
3952    let project_context = cx.new(|_cx| ProjectContext::default());
3953    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3954    let context_server_registry =
3955        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3956
3957    let subagent = cx.new(|cx| {
3958        Thread::new_subagent(
3959            project.clone(),
3960            project_context,
3961            context_server_registry,
3962            Templates::new(),
3963            model.clone(),
3964            subagent_context,
3965            std::collections::BTreeMap::new(),
3966            cx,
3967        )
3968    });
3969
3970    subagent
3971        .update(cx, |thread, cx| {
3972            thread.submit_user_message("Do some work", cx)
3973        })
3974        .unwrap();
3975    cx.run_until_parked();
3976
3977    fake_model.send_last_completion_stream_text_chunk("I did the work");
3978    fake_model
3979        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
3980    fake_model.end_last_completion_stream();
3981    cx.run_until_parked();
3982
3983    subagent
3984        .update(cx, |thread, cx| thread.request_final_summary(cx))
3985        .unwrap();
3986    cx.run_until_parked();
3987
3988    let pending = fake_model.pending_completions();
3989    assert!(
3990        !pending.is_empty(),
3991        "should have pending completion for summary"
3992    );
3993
3994    let messages = &pending.last().unwrap().messages;
3995    let user_messages: Vec<_> = messages
3996        .iter()
3997        .filter(|m| m.role == language_model::Role::User)
3998        .collect();
3999
4000    let last_user = user_messages.last().unwrap();
4001    assert!(
4002        last_user.content[0].to_str().unwrap().contains("summarize"),
4003        "summary prompt should be sent"
4004    );
4005}
4006
4007#[gpui::test]
4008async fn test_allowed_tools_restricts_subagent_capabilities(cx: &mut TestAppContext) {
4009    init_test(cx);
4010
4011    cx.update(|cx| {
4012        cx.update_flags(true, vec!["subagents".to_string()]);
4013    });
4014
4015    let fs = FakeFs::new(cx.executor());
4016    fs.insert_tree(path!("/test"), json!({})).await;
4017    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4018    let project_context = cx.new(|_cx| ProjectContext::default());
4019    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4020    let context_server_registry =
4021        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4022    let model = Arc::new(FakeLanguageModel::default());
4023
4024    let subagent_context = SubagentContext {
4025        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4026        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4027        depth: 1,
4028        summary_prompt: "Summarize".to_string(),
4029        context_low_prompt: "Context low".to_string(),
4030    };
4031
4032    let subagent = cx.new(|cx| {
4033        let mut thread = Thread::new_subagent(
4034            project.clone(),
4035            project_context,
4036            context_server_registry,
4037            Templates::new(),
4038            model.clone(),
4039            subagent_context,
4040            std::collections::BTreeMap::new(),
4041            cx,
4042        );
4043        thread.add_tool(EchoTool);
4044        thread.add_tool(DelayTool);
4045        thread.add_tool(WordListTool);
4046        thread
4047    });
4048
4049    subagent.read_with(cx, |thread, _| {
4050        assert!(thread.has_registered_tool("echo"));
4051        assert!(thread.has_registered_tool("delay"));
4052        assert!(thread.has_registered_tool("word_list"));
4053    });
4054
4055    let allowed: collections::HashSet<gpui::SharedString> =
4056        vec!["echo".into()].into_iter().collect();
4057
4058    subagent.update(cx, |thread, _cx| {
4059        thread.restrict_tools(&allowed);
4060    });
4061
4062    subagent.read_with(cx, |thread, _| {
4063        assert!(
4064            thread.has_registered_tool("echo"),
4065            "echo should still be available"
4066        );
4067        assert!(
4068            !thread.has_registered_tool("delay"),
4069            "delay should be removed"
4070        );
4071        assert!(
4072            !thread.has_registered_tool("word_list"),
4073            "word_list should be removed"
4074        );
4075    });
4076}
4077
4078#[gpui::test]
4079async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
4080    init_test(cx);
4081
4082    cx.update(|cx| {
4083        cx.update_flags(true, vec!["subagents".to_string()]);
4084    });
4085
4086    let fs = FakeFs::new(cx.executor());
4087    fs.insert_tree(path!("/test"), json!({})).await;
4088    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4089    let project_context = cx.new(|_cx| ProjectContext::default());
4090    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4091    let context_server_registry =
4092        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4093    let model = Arc::new(FakeLanguageModel::default());
4094
4095    let parent = cx.new(|cx| {
4096        Thread::new(
4097            project.clone(),
4098            project_context.clone(),
4099            context_server_registry.clone(),
4100            Templates::new(),
4101            Some(model.clone()),
4102            cx,
4103        )
4104    });
4105
4106    let subagent_context = SubagentContext {
4107        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4108        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4109        depth: 1,
4110        summary_prompt: "Summarize".to_string(),
4111        context_low_prompt: "Context low".to_string(),
4112    };
4113
4114    let subagent = cx.new(|cx| {
4115        Thread::new_subagent(
4116            project.clone(),
4117            project_context.clone(),
4118            context_server_registry.clone(),
4119            Templates::new(),
4120            model.clone(),
4121            subagent_context,
4122            std::collections::BTreeMap::new(),
4123            cx,
4124        )
4125    });
4126
4127    parent.update(cx, |thread, _cx| {
4128        thread.register_running_subagent(subagent.downgrade());
4129    });
4130
4131    subagent
4132        .update(cx, |thread, cx| thread.submit_user_message("Do work", cx))
4133        .unwrap();
4134    cx.run_until_parked();
4135
4136    subagent.read_with(cx, |thread, _| {
4137        assert!(!thread.is_turn_complete(), "subagent should be running");
4138    });
4139
4140    parent.update(cx, |thread, cx| {
4141        thread.cancel(cx).detach();
4142    });
4143
4144    subagent.read_with(cx, |thread, _| {
4145        assert!(
4146            thread.is_turn_complete(),
4147            "subagent should be cancelled when parent cancels"
4148        );
4149    });
4150}
4151
4152#[gpui::test]
4153async fn test_subagent_tool_cancellation(cx: &mut TestAppContext) {
4154    // This test verifies that the subagent tool properly handles user cancellation
4155    // via `event_stream.cancelled_by_user()` and stops all running subagents.
4156    init_test(cx);
4157    always_allow_tools(cx);
4158
4159    cx.update(|cx| {
4160        cx.update_flags(true, vec!["subagents".to_string()]);
4161    });
4162
4163    let fs = FakeFs::new(cx.executor());
4164    fs.insert_tree(path!("/test"), json!({})).await;
4165    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4166    let project_context = cx.new(|_cx| ProjectContext::default());
4167    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4168    let context_server_registry =
4169        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4170    let model = Arc::new(FakeLanguageModel::default());
4171
4172    let parent = cx.new(|cx| {
4173        Thread::new(
4174            project.clone(),
4175            project_context.clone(),
4176            context_server_registry.clone(),
4177            Templates::new(),
4178            Some(model.clone()),
4179            cx,
4180        )
4181    });
4182
4183    let parent_tools: std::collections::BTreeMap<gpui::SharedString, Arc<dyn crate::AnyAgentTool>> =
4184        std::collections::BTreeMap::new();
4185
4186    #[allow(clippy::arc_with_non_send_sync)]
4187    let tool = Arc::new(SubagentTool::new(
4188        parent.downgrade(),
4189        project.clone(),
4190        project_context,
4191        context_server_registry,
4192        Templates::new(),
4193        0,
4194        parent_tools,
4195    ));
4196
4197    let (event_stream, _rx, mut cancellation_tx) =
4198        crate::ToolCallEventStream::test_with_cancellation();
4199
4200    // Start the subagent tool
4201    let task = cx.update(|cx| {
4202        tool.run(
4203            SubagentToolInput {
4204                subagents: vec![crate::SubagentConfig {
4205                    label: "Long running task".to_string(),
4206                    task_prompt: "Do a very long task that takes forever".to_string(),
4207                    summary_prompt: "Summarize".to_string(),
4208                    context_low_prompt: "Context low".to_string(),
4209                    timeout_ms: None,
4210                    allowed_tools: None,
4211                }],
4212            },
4213            event_stream.clone(),
4214            cx,
4215        )
4216    });
4217
4218    cx.run_until_parked();
4219
4220    // Signal cancellation via the event stream
4221    crate::ToolCallEventStream::signal_cancellation_with_sender(&mut cancellation_tx);
4222
4223    // The task should complete promptly with a cancellation error
4224    let timeout = cx.background_executor.timer(Duration::from_secs(5));
4225    let result = futures::select! {
4226        result = task.fuse() => result,
4227        _ = timeout.fuse() => {
4228            panic!("subagent tool did not respond to cancellation within timeout");
4229        }
4230    };
4231
4232    // Verify we got a cancellation error
4233    let err = result.unwrap_err();
4234    assert!(
4235        err.to_string().contains("cancelled by user"),
4236        "expected cancellation error, got: {}",
4237        err
4238    );
4239}
4240
4241#[gpui::test]
4242async fn test_subagent_model_error_returned_as_tool_error(cx: &mut TestAppContext) {
4243    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4244    let fake_model = model.as_fake();
4245
4246    cx.update(|cx| {
4247        cx.update_flags(true, vec!["subagents".to_string()]);
4248    });
4249
4250    let subagent_context = SubagentContext {
4251        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4252        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4253        depth: 1,
4254        summary_prompt: "Summarize".to_string(),
4255        context_low_prompt: "Context low".to_string(),
4256    };
4257
4258    let project = thread.read_with(cx, |t, _| t.project.clone());
4259    let project_context = cx.new(|_cx| ProjectContext::default());
4260    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4261    let context_server_registry =
4262        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4263
4264    let subagent = cx.new(|cx| {
4265        Thread::new_subagent(
4266            project.clone(),
4267            project_context,
4268            context_server_registry,
4269            Templates::new(),
4270            model.clone(),
4271            subagent_context,
4272            std::collections::BTreeMap::new(),
4273            cx,
4274        )
4275    });
4276
4277    subagent
4278        .update(cx, |thread, cx| thread.submit_user_message("Do work", cx))
4279        .unwrap();
4280    cx.run_until_parked();
4281
4282    subagent.read_with(cx, |thread, _| {
4283        assert!(!thread.is_turn_complete(), "turn should be in progress");
4284    });
4285
4286    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::NoApiKey {
4287        provider: LanguageModelProviderName::from("Fake".to_string()),
4288    });
4289    fake_model.end_last_completion_stream();
4290    cx.run_until_parked();
4291
4292    subagent.read_with(cx, |thread, _| {
4293        assert!(
4294            thread.is_turn_complete(),
4295            "turn should be complete after non-retryable error"
4296        );
4297    });
4298}
4299
4300#[gpui::test]
4301async fn test_subagent_timeout_triggers_early_summary(cx: &mut TestAppContext) {
4302    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4303    let fake_model = model.as_fake();
4304
4305    cx.update(|cx| {
4306        cx.update_flags(true, vec!["subagents".to_string()]);
4307    });
4308
4309    let subagent_context = SubagentContext {
4310        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4311        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4312        depth: 1,
4313        summary_prompt: "Summarize your work".to_string(),
4314        context_low_prompt: "Context low, stop and summarize".to_string(),
4315    };
4316
4317    let project = thread.read_with(cx, |t, _| t.project.clone());
4318    let project_context = cx.new(|_cx| ProjectContext::default());
4319    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4320    let context_server_registry =
4321        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4322
4323    let subagent = cx.new(|cx| {
4324        Thread::new_subagent(
4325            project.clone(),
4326            project_context.clone(),
4327            context_server_registry.clone(),
4328            Templates::new(),
4329            model.clone(),
4330            subagent_context.clone(),
4331            std::collections::BTreeMap::new(),
4332            cx,
4333        )
4334    });
4335
4336    subagent.update(cx, |thread, _| {
4337        thread.add_tool(EchoTool);
4338    });
4339
4340    subagent
4341        .update(cx, |thread, cx| {
4342            thread.submit_user_message("Do some work", cx)
4343        })
4344        .unwrap();
4345    cx.run_until_parked();
4346
4347    fake_model.send_last_completion_stream_text_chunk("Working on it...");
4348    fake_model
4349        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
4350    fake_model.end_last_completion_stream();
4351    cx.run_until_parked();
4352
4353    let interrupt_result = subagent.update(cx, |thread, cx| thread.interrupt_for_summary(cx));
4354    assert!(
4355        interrupt_result.is_ok(),
4356        "interrupt_for_summary should succeed"
4357    );
4358
4359    cx.run_until_parked();
4360
4361    let pending = fake_model.pending_completions();
4362    assert!(
4363        !pending.is_empty(),
4364        "should have pending completion for interrupted summary"
4365    );
4366
4367    let messages = &pending.last().unwrap().messages;
4368    let user_messages: Vec<_> = messages
4369        .iter()
4370        .filter(|m| m.role == language_model::Role::User)
4371        .collect();
4372
4373    let last_user = user_messages.last().unwrap();
4374    let content_str = last_user.content[0].to_str().unwrap();
4375    assert!(
4376        content_str.contains("Context low") || content_str.contains("stop and summarize"),
4377        "context_low_prompt should be sent when interrupting: got {:?}",
4378        content_str
4379    );
4380}
4381
4382#[gpui::test]
4383async fn test_context_low_check_returns_true_when_usage_high(cx: &mut TestAppContext) {
4384    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4385    let fake_model = model.as_fake();
4386
4387    cx.update(|cx| {
4388        cx.update_flags(true, vec!["subagents".to_string()]);
4389    });
4390
4391    let subagent_context = SubagentContext {
4392        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4393        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4394        depth: 1,
4395        summary_prompt: "Summarize".to_string(),
4396        context_low_prompt: "Context low".to_string(),
4397    };
4398
4399    let project = thread.read_with(cx, |t, _| t.project.clone());
4400    let project_context = cx.new(|_cx| ProjectContext::default());
4401    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4402    let context_server_registry =
4403        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4404
4405    let subagent = cx.new(|cx| {
4406        Thread::new_subagent(
4407            project.clone(),
4408            project_context,
4409            context_server_registry,
4410            Templates::new(),
4411            model.clone(),
4412            subagent_context,
4413            std::collections::BTreeMap::new(),
4414            cx,
4415        )
4416    });
4417
4418    subagent
4419        .update(cx, |thread, cx| thread.submit_user_message("Do work", cx))
4420        .unwrap();
4421    cx.run_until_parked();
4422
4423    let max_tokens = model.max_token_count();
4424    let high_usage = language_model::TokenUsage {
4425        input_tokens: (max_tokens as f64 * 0.80) as u64,
4426        output_tokens: 0,
4427        cache_creation_input_tokens: 0,
4428        cache_read_input_tokens: 0,
4429    };
4430
4431    fake_model
4432        .send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(high_usage));
4433    fake_model.send_last_completion_stream_text_chunk("Working...");
4434    fake_model
4435        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
4436    fake_model.end_last_completion_stream();
4437    cx.run_until_parked();
4438
4439    let usage = subagent.read_with(cx, |thread, _| thread.latest_token_usage());
4440    assert!(usage.is_some(), "should have token usage after completion");
4441
4442    let usage = usage.unwrap();
4443    let remaining_ratio = 1.0 - (usage.used_tokens as f32 / usage.max_tokens as f32);
4444    assert!(
4445        remaining_ratio <= 0.25,
4446        "remaining ratio should be at or below 25% (got {}%), indicating context is low",
4447        remaining_ratio * 100.0
4448    );
4449}
4450
4451#[gpui::test]
4452async fn test_allowed_tools_rejects_unknown_tool(cx: &mut TestAppContext) {
4453    init_test(cx);
4454
4455    cx.update(|cx| {
4456        cx.update_flags(true, vec!["subagents".to_string()]);
4457    });
4458
4459    let fs = FakeFs::new(cx.executor());
4460    fs.insert_tree(path!("/test"), json!({})).await;
4461    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4462    let project_context = cx.new(|_cx| ProjectContext::default());
4463    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4464    let context_server_registry =
4465        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4466    let model = Arc::new(FakeLanguageModel::default());
4467
4468    let parent = cx.new(|cx| {
4469        let mut thread = Thread::new(
4470            project.clone(),
4471            project_context.clone(),
4472            context_server_registry.clone(),
4473            Templates::new(),
4474            Some(model.clone()),
4475            cx,
4476        );
4477        thread.add_tool(EchoTool);
4478        thread
4479    });
4480
4481    let mut parent_tools: std::collections::BTreeMap<
4482        gpui::SharedString,
4483        Arc<dyn crate::AnyAgentTool>,
4484    > = std::collections::BTreeMap::new();
4485    parent_tools.insert("echo".into(), EchoTool.erase());
4486
4487    #[allow(clippy::arc_with_non_send_sync)]
4488    let tool = Arc::new(SubagentTool::new(
4489        parent.downgrade(),
4490        project,
4491        project_context,
4492        context_server_registry,
4493        Templates::new(),
4494        0,
4495        parent_tools,
4496    ));
4497
4498    let subagent_configs = vec![crate::SubagentConfig {
4499        label: "Test".to_string(),
4500        task_prompt: "Do something".to_string(),
4501        summary_prompt: "Summarize".to_string(),
4502        context_low_prompt: "Context low".to_string(),
4503        timeout_ms: None,
4504        allowed_tools: Some(vec!["nonexistent_tool".to_string()]),
4505    }];
4506    let result = tool.validate_subagents(&subagent_configs);
4507    assert!(result.is_err(), "should reject unknown tool");
4508    let err_msg = result.unwrap_err().to_string();
4509    assert!(
4510        err_msg.contains("nonexistent_tool"),
4511        "error should mention the invalid tool name: {}",
4512        err_msg
4513    );
4514    assert!(
4515        err_msg.contains("do not exist"),
4516        "error should explain the tool does not exist: {}",
4517        err_msg
4518    );
4519}
4520
4521#[gpui::test]
4522async fn test_subagent_empty_response_handled(cx: &mut TestAppContext) {
4523    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4524    let fake_model = model.as_fake();
4525
4526    cx.update(|cx| {
4527        cx.update_flags(true, vec!["subagents".to_string()]);
4528    });
4529
4530    let subagent_context = SubagentContext {
4531        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4532        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4533        depth: 1,
4534        summary_prompt: "Summarize".to_string(),
4535        context_low_prompt: "Context low".to_string(),
4536    };
4537
4538    let project = thread.read_with(cx, |t, _| t.project.clone());
4539    let project_context = cx.new(|_cx| ProjectContext::default());
4540    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4541    let context_server_registry =
4542        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4543
4544    let subagent = cx.new(|cx| {
4545        Thread::new_subagent(
4546            project.clone(),
4547            project_context,
4548            context_server_registry,
4549            Templates::new(),
4550            model.clone(),
4551            subagent_context,
4552            std::collections::BTreeMap::new(),
4553            cx,
4554        )
4555    });
4556
4557    subagent
4558        .update(cx, |thread, cx| thread.submit_user_message("Do work", cx))
4559        .unwrap();
4560    cx.run_until_parked();
4561
4562    fake_model
4563        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
4564    fake_model.end_last_completion_stream();
4565    cx.run_until_parked();
4566
4567    subagent.read_with(cx, |thread, _| {
4568        assert!(
4569            thread.is_turn_complete(),
4570            "turn should complete even with empty response"
4571        );
4572    });
4573}
4574
4575#[gpui::test]
4576async fn test_nested_subagent_at_depth_2_succeeds(cx: &mut TestAppContext) {
4577    init_test(cx);
4578
4579    cx.update(|cx| {
4580        cx.update_flags(true, vec!["subagents".to_string()]);
4581    });
4582
4583    let fs = FakeFs::new(cx.executor());
4584    fs.insert_tree(path!("/test"), json!({})).await;
4585    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4586    let project_context = cx.new(|_cx| ProjectContext::default());
4587    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4588    let context_server_registry =
4589        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4590    let model = Arc::new(FakeLanguageModel::default());
4591
4592    let depth_1_context = SubagentContext {
4593        parent_thread_id: agent_client_protocol::SessionId::new("root-id"),
4594        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-1"),
4595        depth: 1,
4596        summary_prompt: "Summarize".to_string(),
4597        context_low_prompt: "Context low".to_string(),
4598    };
4599
4600    let depth_1_subagent = cx.new(|cx| {
4601        Thread::new_subagent(
4602            project.clone(),
4603            project_context.clone(),
4604            context_server_registry.clone(),
4605            Templates::new(),
4606            model.clone(),
4607            depth_1_context,
4608            std::collections::BTreeMap::new(),
4609            cx,
4610        )
4611    });
4612
4613    depth_1_subagent.read_with(cx, |thread, _| {
4614        assert_eq!(thread.depth(), 1);
4615        assert!(thread.is_subagent());
4616    });
4617
4618    let depth_2_context = SubagentContext {
4619        parent_thread_id: agent_client_protocol::SessionId::new("depth-1-id"),
4620        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-2"),
4621        depth: 2,
4622        summary_prompt: "Summarize depth 2".to_string(),
4623        context_low_prompt: "Context low depth 2".to_string(),
4624    };
4625
4626    let depth_2_subagent = cx.new(|cx| {
4627        Thread::new_subagent(
4628            project.clone(),
4629            project_context.clone(),
4630            context_server_registry.clone(),
4631            Templates::new(),
4632            model.clone(),
4633            depth_2_context,
4634            std::collections::BTreeMap::new(),
4635            cx,
4636        )
4637    });
4638
4639    depth_2_subagent.read_with(cx, |thread, _| {
4640        assert_eq!(thread.depth(), 2);
4641        assert!(thread.is_subagent());
4642    });
4643
4644    depth_2_subagent
4645        .update(cx, |thread, cx| {
4646            thread.submit_user_message("Nested task", cx)
4647        })
4648        .unwrap();
4649    cx.run_until_parked();
4650
4651    let pending = model.as_fake().pending_completions();
4652    assert!(
4653        !pending.is_empty(),
4654        "depth-2 subagent should be able to submit messages"
4655    );
4656}
4657
4658#[gpui::test]
4659async fn test_subagent_uses_tool_and_returns_result(cx: &mut TestAppContext) {
4660    init_test(cx);
4661    always_allow_tools(cx);
4662
4663    cx.update(|cx| {
4664        cx.update_flags(true, vec!["subagents".to_string()]);
4665    });
4666
4667    let fs = FakeFs::new(cx.executor());
4668    fs.insert_tree(path!("/test"), json!({})).await;
4669    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4670    let project_context = cx.new(|_cx| ProjectContext::default());
4671    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4672    let context_server_registry =
4673        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4674    let model = Arc::new(FakeLanguageModel::default());
4675    let fake_model = model.as_fake();
4676
4677    let subagent_context = SubagentContext {
4678        parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4679        tool_use_id: language_model::LanguageModelToolUseId::from("tool-use-id"),
4680        depth: 1,
4681        summary_prompt: "Summarize what you did".to_string(),
4682        context_low_prompt: "Context low".to_string(),
4683    };
4684
4685    let subagent = cx.new(|cx| {
4686        let mut thread = Thread::new_subagent(
4687            project.clone(),
4688            project_context,
4689            context_server_registry,
4690            Templates::new(),
4691            model.clone(),
4692            subagent_context,
4693            std::collections::BTreeMap::new(),
4694            cx,
4695        );
4696        thread.add_tool(EchoTool);
4697        thread
4698    });
4699
4700    subagent.read_with(cx, |thread, _| {
4701        assert!(
4702            thread.has_registered_tool("echo"),
4703            "subagent should have echo tool"
4704        );
4705    });
4706
4707    subagent
4708        .update(cx, |thread, cx| {
4709            thread.submit_user_message("Use the echo tool to echo 'hello world'", cx)
4710        })
4711        .unwrap();
4712    cx.run_until_parked();
4713
4714    let tool_use = LanguageModelToolUse {
4715        id: "tool_call_1".into(),
4716        name: EchoTool::name().into(),
4717        raw_input: json!({"text": "hello world"}).to_string(),
4718        input: json!({"text": "hello world"}),
4719        is_input_complete: true,
4720        thought_signature: None,
4721    };
4722    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use));
4723    fake_model.end_last_completion_stream();
4724    cx.run_until_parked();
4725
4726    let pending = fake_model.pending_completions();
4727    assert!(
4728        !pending.is_empty(),
4729        "should have pending completion after tool use"
4730    );
4731
4732    let last_completion = pending.last().unwrap();
4733    let has_tool_result = last_completion.messages.iter().any(|m| {
4734        m.content
4735            .iter()
4736            .any(|c| matches!(c, MessageContent::ToolResult(_)))
4737    });
4738    assert!(
4739        has_tool_result,
4740        "tool result should be in the messages sent back to the model"
4741    );
4742}
4743
4744#[gpui::test]
4745async fn test_max_parallel_subagents_enforced(cx: &mut TestAppContext) {
4746    init_test(cx);
4747
4748    cx.update(|cx| {
4749        cx.update_flags(true, vec!["subagents".to_string()]);
4750    });
4751
4752    let fs = FakeFs::new(cx.executor());
4753    fs.insert_tree(path!("/test"), json!({})).await;
4754    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4755    let project_context = cx.new(|_cx| ProjectContext::default());
4756    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4757    let context_server_registry =
4758        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4759    let model = Arc::new(FakeLanguageModel::default());
4760
4761    let parent = cx.new(|cx| {
4762        Thread::new(
4763            project.clone(),
4764            project_context.clone(),
4765            context_server_registry.clone(),
4766            Templates::new(),
4767            Some(model.clone()),
4768            cx,
4769        )
4770    });
4771
4772    let mut subagents = Vec::new();
4773    for i in 0..MAX_PARALLEL_SUBAGENTS {
4774        let subagent_context = SubagentContext {
4775            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
4776            tool_use_id: language_model::LanguageModelToolUseId::from(format!("tool-use-{}", i)),
4777            depth: 1,
4778            summary_prompt: "Summarize".to_string(),
4779            context_low_prompt: "Context low".to_string(),
4780        };
4781
4782        let subagent = cx.new(|cx| {
4783            Thread::new_subagent(
4784                project.clone(),
4785                project_context.clone(),
4786                context_server_registry.clone(),
4787                Templates::new(),
4788                model.clone(),
4789                subagent_context,
4790                std::collections::BTreeMap::new(),
4791                cx,
4792            )
4793        });
4794
4795        parent.update(cx, |thread, _cx| {
4796            thread.register_running_subagent(subagent.downgrade());
4797        });
4798        subagents.push(subagent);
4799    }
4800
4801    parent.read_with(cx, |thread, _| {
4802        assert_eq!(
4803            thread.running_subagent_count(),
4804            MAX_PARALLEL_SUBAGENTS,
4805            "should have MAX_PARALLEL_SUBAGENTS registered"
4806        );
4807    });
4808
4809    let parent_tools: std::collections::BTreeMap<gpui::SharedString, Arc<dyn crate::AnyAgentTool>> =
4810        std::collections::BTreeMap::new();
4811
4812    #[allow(clippy::arc_with_non_send_sync)]
4813    let tool = Arc::new(SubagentTool::new(
4814        parent.downgrade(),
4815        project.clone(),
4816        project_context,
4817        context_server_registry,
4818        Templates::new(),
4819        0,
4820        parent_tools,
4821    ));
4822
4823    let (event_stream, _rx) = crate::ToolCallEventStream::test();
4824
4825    let result = cx.update(|cx| {
4826        tool.run(
4827            SubagentToolInput {
4828                subagents: vec![crate::SubagentConfig {
4829                    label: "Test".to_string(),
4830                    task_prompt: "Do something".to_string(),
4831                    summary_prompt: "Summarize".to_string(),
4832                    context_low_prompt: "Context low".to_string(),
4833                    timeout_ms: None,
4834                    allowed_tools: None,
4835                }],
4836            },
4837            event_stream,
4838            cx,
4839        )
4840    });
4841
4842    let err = result.await.unwrap_err();
4843    assert!(
4844        err.to_string().contains("Maximum parallel subagents"),
4845        "should reject when max parallel subagents reached: {}",
4846        err
4847    );
4848
4849    drop(subagents);
4850}
4851
4852#[gpui::test]
4853async fn test_subagent_tool_end_to_end(cx: &mut TestAppContext) {
4854    init_test(cx);
4855    always_allow_tools(cx);
4856
4857    cx.update(|cx| {
4858        cx.update_flags(true, vec!["subagents".to_string()]);
4859    });
4860
4861    let fs = FakeFs::new(cx.executor());
4862    fs.insert_tree(path!("/test"), json!({})).await;
4863    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4864    let project_context = cx.new(|_cx| ProjectContext::default());
4865    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4866    let context_server_registry =
4867        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4868    let model = Arc::new(FakeLanguageModel::default());
4869    let fake_model = model.as_fake();
4870
4871    let parent = cx.new(|cx| {
4872        let mut thread = Thread::new(
4873            project.clone(),
4874            project_context.clone(),
4875            context_server_registry.clone(),
4876            Templates::new(),
4877            Some(model.clone()),
4878            cx,
4879        );
4880        thread.add_tool(EchoTool);
4881        thread
4882    });
4883
4884    let mut parent_tools: std::collections::BTreeMap<
4885        gpui::SharedString,
4886        Arc<dyn crate::AnyAgentTool>,
4887    > = std::collections::BTreeMap::new();
4888    parent_tools.insert("echo".into(), EchoTool.erase());
4889
4890    #[allow(clippy::arc_with_non_send_sync)]
4891    let tool = Arc::new(SubagentTool::new(
4892        parent.downgrade(),
4893        project.clone(),
4894        project_context,
4895        context_server_registry,
4896        Templates::new(),
4897        0,
4898        parent_tools,
4899    ));
4900
4901    let (event_stream, _rx) = crate::ToolCallEventStream::test();
4902
4903    let task = cx.update(|cx| {
4904        tool.run(
4905            SubagentToolInput {
4906                subagents: vec![crate::SubagentConfig {
4907                    label: "Research task".to_string(),
4908                    task_prompt: "Find all TODOs in the codebase".to_string(),
4909                    summary_prompt: "Summarize what you found".to_string(),
4910                    context_low_prompt: "Context low, wrap up".to_string(),
4911                    timeout_ms: None,
4912                    allowed_tools: None,
4913                }],
4914            },
4915            event_stream,
4916            cx,
4917        )
4918    });
4919
4920    cx.run_until_parked();
4921
4922    let pending = fake_model.pending_completions();
4923    assert!(
4924        !pending.is_empty(),
4925        "subagent should have started and sent a completion request"
4926    );
4927
4928    let first_completion = &pending[0];
4929    let has_task_prompt = first_completion.messages.iter().any(|m| {
4930        m.role == language_model::Role::User
4931            && m.content
4932                .iter()
4933                .any(|c| c.to_str().map(|s| s.contains("TODO")).unwrap_or(false))
4934    });
4935    assert!(has_task_prompt, "task prompt should be sent to subagent");
4936
4937    fake_model.send_last_completion_stream_text_chunk("I found 5 TODOs in the codebase.");
4938    fake_model
4939        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
4940    fake_model.end_last_completion_stream();
4941    cx.run_until_parked();
4942
4943    let pending = fake_model.pending_completions();
4944    assert!(
4945        !pending.is_empty(),
4946        "should have pending completion for summary request"
4947    );
4948
4949    let last_completion = pending.last().unwrap();
4950    let has_summary_prompt = last_completion.messages.iter().any(|m| {
4951        m.role == language_model::Role::User
4952            && m.content.iter().any(|c| {
4953                c.to_str()
4954                    .map(|s| s.contains("Summarize") || s.contains("summarize"))
4955                    .unwrap_or(false)
4956            })
4957    });
4958    assert!(
4959        has_summary_prompt,
4960        "summary prompt should be sent after task completion"
4961    );
4962
4963    fake_model.send_last_completion_stream_text_chunk("Summary: Found 5 TODOs across 3 files.");
4964    fake_model
4965        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
4966    fake_model.end_last_completion_stream();
4967    cx.run_until_parked();
4968
4969    let result = task.await;
4970    assert!(result.is_ok(), "subagent tool should complete successfully");
4971
4972    let summary = result.unwrap();
4973    assert!(
4974        summary.contains("Summary") || summary.contains("TODO") || summary.contains("5"),
4975        "summary should contain subagent's response: {}",
4976        summary
4977    );
4978}
4979
4980#[gpui::test]
4981async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
4982    init_test(cx);
4983
4984    let fs = FakeFs::new(cx.executor());
4985    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
4986        .await;
4987    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
4988
4989    cx.update(|cx| {
4990        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4991        settings.tool_permissions.tools.insert(
4992            "edit_file".into(),
4993            agent_settings::ToolRules {
4994                default_mode: settings::ToolPermissionMode::Allow,
4995                always_allow: vec![],
4996                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
4997                always_confirm: vec![],
4998                invalid_patterns: vec![],
4999            },
5000        );
5001        agent_settings::AgentSettings::override_global(settings, cx);
5002    });
5003
5004    let context_server_registry =
5005        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5006    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5007    let templates = crate::Templates::new();
5008    let thread = cx.new(|cx| {
5009        crate::Thread::new(
5010            project.clone(),
5011            cx.new(|_cx| prompt_store::ProjectContext::default()),
5012            context_server_registry,
5013            templates.clone(),
5014            None,
5015            cx,
5016        )
5017    });
5018
5019    #[allow(clippy::arc_with_non_send_sync)]
5020    let tool = Arc::new(crate::EditFileTool::new(
5021        project.clone(),
5022        thread.downgrade(),
5023        language_registry,
5024        templates,
5025    ));
5026    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5027
5028    let task = cx.update(|cx| {
5029        tool.run(
5030            crate::EditFileToolInput {
5031                display_description: "Edit sensitive file".to_string(),
5032                path: "root/sensitive_config.txt".into(),
5033                mode: crate::EditFileMode::Edit,
5034            },
5035            event_stream,
5036            cx,
5037        )
5038    });
5039
5040    let result = task.await;
5041    assert!(result.is_err(), "expected edit to be blocked");
5042    assert!(
5043        result.unwrap_err().to_string().contains("blocked"),
5044        "error should mention the edit was blocked"
5045    );
5046}
5047
5048#[gpui::test]
5049async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5050    init_test(cx);
5051
5052    let fs = FakeFs::new(cx.executor());
5053    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5054        .await;
5055    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5056
5057    cx.update(|cx| {
5058        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5059        settings.tool_permissions.tools.insert(
5060            "delete_path".into(),
5061            agent_settings::ToolRules {
5062                default_mode: settings::ToolPermissionMode::Allow,
5063                always_allow: vec![],
5064                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5065                always_confirm: vec![],
5066                invalid_patterns: vec![],
5067            },
5068        );
5069        agent_settings::AgentSettings::override_global(settings, cx);
5070    });
5071
5072    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5073
5074    #[allow(clippy::arc_with_non_send_sync)]
5075    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5076    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5077
5078    let task = cx.update(|cx| {
5079        tool.run(
5080            crate::DeletePathToolInput {
5081                path: "root/important_data.txt".to_string(),
5082            },
5083            event_stream,
5084            cx,
5085        )
5086    });
5087
5088    let result = task.await;
5089    assert!(result.is_err(), "expected deletion to be blocked");
5090    assert!(
5091        result.unwrap_err().to_string().contains("blocked"),
5092        "error should mention the deletion was blocked"
5093    );
5094}
5095
5096#[gpui::test]
5097async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5098    init_test(cx);
5099
5100    let fs = FakeFs::new(cx.executor());
5101    fs.insert_tree(
5102        "/root",
5103        json!({
5104            "safe.txt": "content",
5105            "protected": {}
5106        }),
5107    )
5108    .await;
5109    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5110
5111    cx.update(|cx| {
5112        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5113        settings.tool_permissions.tools.insert(
5114            "move_path".into(),
5115            agent_settings::ToolRules {
5116                default_mode: settings::ToolPermissionMode::Allow,
5117                always_allow: vec![],
5118                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5119                always_confirm: vec![],
5120                invalid_patterns: vec![],
5121            },
5122        );
5123        agent_settings::AgentSettings::override_global(settings, cx);
5124    });
5125
5126    #[allow(clippy::arc_with_non_send_sync)]
5127    let tool = Arc::new(crate::MovePathTool::new(project));
5128    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5129
5130    let task = cx.update(|cx| {
5131        tool.run(
5132            crate::MovePathToolInput {
5133                source_path: "root/safe.txt".to_string(),
5134                destination_path: "root/protected/safe.txt".to_string(),
5135            },
5136            event_stream,
5137            cx,
5138        )
5139    });
5140
5141    let result = task.await;
5142    assert!(
5143        result.is_err(),
5144        "expected move to be blocked due to destination path"
5145    );
5146    assert!(
5147        result.unwrap_err().to_string().contains("blocked"),
5148        "error should mention the move was blocked"
5149    );
5150}
5151
5152#[gpui::test]
5153async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5154    init_test(cx);
5155
5156    let fs = FakeFs::new(cx.executor());
5157    fs.insert_tree(
5158        "/root",
5159        json!({
5160            "secret.txt": "secret content",
5161            "public": {}
5162        }),
5163    )
5164    .await;
5165    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5166
5167    cx.update(|cx| {
5168        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5169        settings.tool_permissions.tools.insert(
5170            "move_path".into(),
5171            agent_settings::ToolRules {
5172                default_mode: settings::ToolPermissionMode::Allow,
5173                always_allow: vec![],
5174                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5175                always_confirm: vec![],
5176                invalid_patterns: vec![],
5177            },
5178        );
5179        agent_settings::AgentSettings::override_global(settings, cx);
5180    });
5181
5182    #[allow(clippy::arc_with_non_send_sync)]
5183    let tool = Arc::new(crate::MovePathTool::new(project));
5184    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5185
5186    let task = cx.update(|cx| {
5187        tool.run(
5188            crate::MovePathToolInput {
5189                source_path: "root/secret.txt".to_string(),
5190                destination_path: "root/public/not_secret.txt".to_string(),
5191            },
5192            event_stream,
5193            cx,
5194        )
5195    });
5196
5197    let result = task.await;
5198    assert!(
5199        result.is_err(),
5200        "expected move to be blocked due to source path"
5201    );
5202    assert!(
5203        result.unwrap_err().to_string().contains("blocked"),
5204        "error should mention the move was blocked"
5205    );
5206}
5207
5208#[gpui::test]
5209async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5210    init_test(cx);
5211
5212    let fs = FakeFs::new(cx.executor());
5213    fs.insert_tree(
5214        "/root",
5215        json!({
5216            "confidential.txt": "confidential data",
5217            "dest": {}
5218        }),
5219    )
5220    .await;
5221    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5222
5223    cx.update(|cx| {
5224        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5225        settings.tool_permissions.tools.insert(
5226            "copy_path".into(),
5227            agent_settings::ToolRules {
5228                default_mode: settings::ToolPermissionMode::Allow,
5229                always_allow: vec![],
5230                always_deny: vec![
5231                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5232                ],
5233                always_confirm: vec![],
5234                invalid_patterns: vec![],
5235            },
5236        );
5237        agent_settings::AgentSettings::override_global(settings, cx);
5238    });
5239
5240    #[allow(clippy::arc_with_non_send_sync)]
5241    let tool = Arc::new(crate::CopyPathTool::new(project));
5242    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5243
5244    let task = cx.update(|cx| {
5245        tool.run(
5246            crate::CopyPathToolInput {
5247                source_path: "root/confidential.txt".to_string(),
5248                destination_path: "root/dest/copy.txt".to_string(),
5249            },
5250            event_stream,
5251            cx,
5252        )
5253    });
5254
5255    let result = task.await;
5256    assert!(result.is_err(), "expected copy to be blocked");
5257    assert!(
5258        result.unwrap_err().to_string().contains("blocked"),
5259        "error should mention the copy was blocked"
5260    );
5261}
5262
5263#[gpui::test]
5264async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5265    init_test(cx);
5266
5267    let fs = FakeFs::new(cx.executor());
5268    fs.insert_tree(
5269        "/root",
5270        json!({
5271            "normal.txt": "normal content",
5272            "readonly": {
5273                "config.txt": "readonly content"
5274            }
5275        }),
5276    )
5277    .await;
5278    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5279
5280    cx.update(|cx| {
5281        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5282        settings.tool_permissions.tools.insert(
5283            "save_file".into(),
5284            agent_settings::ToolRules {
5285                default_mode: settings::ToolPermissionMode::Allow,
5286                always_allow: vec![],
5287                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
5288                always_confirm: vec![],
5289                invalid_patterns: vec![],
5290            },
5291        );
5292        agent_settings::AgentSettings::override_global(settings, cx);
5293    });
5294
5295    #[allow(clippy::arc_with_non_send_sync)]
5296    let tool = Arc::new(crate::SaveFileTool::new(project));
5297    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5298
5299    let task = cx.update(|cx| {
5300        tool.run(
5301            crate::SaveFileToolInput {
5302                paths: vec![
5303                    std::path::PathBuf::from("root/normal.txt"),
5304                    std::path::PathBuf::from("root/readonly/config.txt"),
5305                ],
5306            },
5307            event_stream,
5308            cx,
5309        )
5310    });
5311
5312    let result = task.await;
5313    assert!(
5314        result.is_err(),
5315        "expected save to be blocked due to denied path"
5316    );
5317    assert!(
5318        result.unwrap_err().to_string().contains("blocked"),
5319        "error should mention the save was blocked"
5320    );
5321}
5322
5323#[gpui::test]
5324async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
5325    init_test(cx);
5326
5327    let fs = FakeFs::new(cx.executor());
5328    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
5329        .await;
5330    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5331
5332    cx.update(|cx| {
5333        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5334        settings.always_allow_tool_actions = false;
5335        settings.tool_permissions.tools.insert(
5336            "save_file".into(),
5337            agent_settings::ToolRules {
5338                default_mode: settings::ToolPermissionMode::Allow,
5339                always_allow: vec![],
5340                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
5341                always_confirm: vec![],
5342                invalid_patterns: vec![],
5343            },
5344        );
5345        agent_settings::AgentSettings::override_global(settings, cx);
5346    });
5347
5348    #[allow(clippy::arc_with_non_send_sync)]
5349    let tool = Arc::new(crate::SaveFileTool::new(project));
5350    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5351
5352    let task = cx.update(|cx| {
5353        tool.run(
5354            crate::SaveFileToolInput {
5355                paths: vec![std::path::PathBuf::from("root/config.secret")],
5356            },
5357            event_stream,
5358            cx,
5359        )
5360    });
5361
5362    let result = task.await;
5363    assert!(result.is_err(), "expected save to be blocked");
5364    assert!(
5365        result.unwrap_err().to_string().contains("blocked"),
5366        "error should mention the save was blocked"
5367    );
5368}
5369
5370#[gpui::test]
5371async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
5372    init_test(cx);
5373
5374    cx.update(|cx| {
5375        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5376        settings.tool_permissions.tools.insert(
5377            "web_search".into(),
5378            agent_settings::ToolRules {
5379                default_mode: settings::ToolPermissionMode::Allow,
5380                always_allow: vec![],
5381                always_deny: vec![
5382                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
5383                ],
5384                always_confirm: vec![],
5385                invalid_patterns: vec![],
5386            },
5387        );
5388        agent_settings::AgentSettings::override_global(settings, cx);
5389    });
5390
5391    #[allow(clippy::arc_with_non_send_sync)]
5392    let tool = Arc::new(crate::WebSearchTool);
5393    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5394
5395    let input: crate::WebSearchToolInput =
5396        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
5397
5398    let task = cx.update(|cx| tool.run(input, event_stream, cx));
5399
5400    let result = task.await;
5401    assert!(result.is_err(), "expected search to be blocked");
5402    assert!(
5403        result.unwrap_err().to_string().contains("blocked"),
5404        "error should mention the search was blocked"
5405    );
5406}
5407
5408#[gpui::test]
5409async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
5410    init_test(cx);
5411
5412    let fs = FakeFs::new(cx.executor());
5413    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
5414        .await;
5415    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5416
5417    cx.update(|cx| {
5418        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5419        settings.always_allow_tool_actions = false;
5420        settings.tool_permissions.tools.insert(
5421            "edit_file".into(),
5422            agent_settings::ToolRules {
5423                default_mode: settings::ToolPermissionMode::Confirm,
5424                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
5425                always_deny: vec![],
5426                always_confirm: vec![],
5427                invalid_patterns: vec![],
5428            },
5429        );
5430        agent_settings::AgentSettings::override_global(settings, cx);
5431    });
5432
5433    let context_server_registry =
5434        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5435    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5436    let templates = crate::Templates::new();
5437    let thread = cx.new(|cx| {
5438        crate::Thread::new(
5439            project.clone(),
5440            cx.new(|_cx| prompt_store::ProjectContext::default()),
5441            context_server_registry,
5442            templates.clone(),
5443            None,
5444            cx,
5445        )
5446    });
5447
5448    #[allow(clippy::arc_with_non_send_sync)]
5449    let tool = Arc::new(crate::EditFileTool::new(
5450        project,
5451        thread.downgrade(),
5452        language_registry,
5453        templates,
5454    ));
5455    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
5456
5457    let _task = cx.update(|cx| {
5458        tool.run(
5459            crate::EditFileToolInput {
5460                display_description: "Edit README".to_string(),
5461                path: "root/README.md".into(),
5462                mode: crate::EditFileMode::Edit,
5463            },
5464            event_stream,
5465            cx,
5466        )
5467    });
5468
5469    cx.run_until_parked();
5470
5471    let event = rx.try_next();
5472    assert!(
5473        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
5474        "expected no authorization request for allowed .md file"
5475    );
5476}
5477
5478#[gpui::test]
5479async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
5480    init_test(cx);
5481
5482    cx.update(|cx| {
5483        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5484        settings.tool_permissions.tools.insert(
5485            "fetch".into(),
5486            agent_settings::ToolRules {
5487                default_mode: settings::ToolPermissionMode::Allow,
5488                always_allow: vec![],
5489                always_deny: vec![
5490                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
5491                ],
5492                always_confirm: vec![],
5493                invalid_patterns: vec![],
5494            },
5495        );
5496        agent_settings::AgentSettings::override_global(settings, cx);
5497    });
5498
5499    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
5500
5501    #[allow(clippy::arc_with_non_send_sync)]
5502    let tool = Arc::new(crate::FetchTool::new(http_client));
5503    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5504
5505    let input: crate::FetchToolInput =
5506        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
5507
5508    let task = cx.update(|cx| tool.run(input, event_stream, cx));
5509
5510    let result = task.await;
5511    assert!(result.is_err(), "expected fetch to be blocked");
5512    assert!(
5513        result.unwrap_err().to_string().contains("blocked"),
5514        "error should mention the fetch was blocked"
5515    );
5516}
5517
5518#[gpui::test]
5519async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
5520    init_test(cx);
5521
5522    cx.update(|cx| {
5523        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5524        settings.always_allow_tool_actions = false;
5525        settings.tool_permissions.tools.insert(
5526            "fetch".into(),
5527            agent_settings::ToolRules {
5528                default_mode: settings::ToolPermissionMode::Confirm,
5529                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
5530                always_deny: vec![],
5531                always_confirm: vec![],
5532                invalid_patterns: vec![],
5533            },
5534        );
5535        agent_settings::AgentSettings::override_global(settings, cx);
5536    });
5537
5538    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
5539
5540    #[allow(clippy::arc_with_non_send_sync)]
5541    let tool = Arc::new(crate::FetchTool::new(http_client));
5542    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
5543
5544    let input: crate::FetchToolInput =
5545        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
5546
5547    let _task = cx.update(|cx| tool.run(input, event_stream, cx));
5548
5549    cx.run_until_parked();
5550
5551    let event = rx.try_next();
5552    assert!(
5553        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
5554        "expected no authorization request for allowed docs.rs URL"
5555    );
5556}
5557
5558#[gpui::test]
5559async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
5560    init_test(cx);
5561    always_allow_tools(cx);
5562
5563    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
5564    let fake_model = model.as_fake();
5565
5566    // Add a tool so we can simulate tool calls
5567    thread.update(cx, |thread, _cx| {
5568        thread.add_tool(EchoTool);
5569    });
5570
5571    // Start a turn by sending a message
5572    let mut events = thread
5573        .update(cx, |thread, cx| {
5574            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
5575        })
5576        .unwrap();
5577    cx.run_until_parked();
5578
5579    // Simulate the model making a tool call
5580    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5581        LanguageModelToolUse {
5582            id: "tool_1".into(),
5583            name: "echo".into(),
5584            raw_input: r#"{"text": "hello"}"#.into(),
5585            input: json!({"text": "hello"}),
5586            is_input_complete: true,
5587            thought_signature: None,
5588        },
5589    ));
5590    fake_model
5591        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
5592
5593    // Queue a message before ending the stream
5594    thread.update(cx, |thread, _cx| {
5595        thread.queue_message(
5596            vec![acp::ContentBlock::Text(acp::TextContent::new(
5597                "This is my queued message".to_string(),
5598            ))],
5599            vec![],
5600        );
5601    });
5602
5603    // Now end the stream - tool will run, and the boundary check should see the queue
5604    fake_model.end_last_completion_stream();
5605
5606    // Collect all events until the turn stops
5607    let all_events = collect_events_until_stop(&mut events, cx).await;
5608
5609    // Verify we received the tool call event
5610    let tool_call_ids: Vec<_> = all_events
5611        .iter()
5612        .filter_map(|e| match e {
5613            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
5614            _ => None,
5615        })
5616        .collect();
5617    assert_eq!(
5618        tool_call_ids,
5619        vec!["tool_1"],
5620        "Should have received a tool call event for our echo tool"
5621    );
5622
5623    // The turn should have stopped with EndTurn
5624    let stop_reasons = stop_events(all_events);
5625    assert_eq!(
5626        stop_reasons,
5627        vec![acp::StopReason::EndTurn],
5628        "Turn should have ended after tool completion due to queued message"
5629    );
5630
5631    // Verify the queued message is still there
5632    thread.update(cx, |thread, _cx| {
5633        let queued = thread.queued_messages();
5634        assert_eq!(queued.len(), 1, "Should still have one queued message");
5635        assert!(matches!(
5636            &queued[0].content[0],
5637            acp::ContentBlock::Text(t) if t.text == "This is my queued message"
5638        ));
5639    });
5640
5641    // Thread should be idle now
5642    thread.update(cx, |thread, _cx| {
5643        assert!(
5644            thread.is_turn_complete(),
5645            "Thread should not be running after turn ends"
5646        );
5647    });
5648}