mod.rs

   1use super::*;
   2use acp_thread::{
   3    AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
   4    UserMessageId,
   5};
   6use agent_client_protocol::{self as acp};
   7use agent_settings::AgentProfileId;
   8use anyhow::Result;
   9use client::{Client, UserStore};
  10use cloud_llm_client::CompletionIntent;
  11use collections::IndexMap;
  12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  13use feature_flags::FeatureFlagAppExt as _;
  14use fs::{FakeFs, Fs};
  15use futures::{
  16    FutureExt as _, StreamExt,
  17    channel::{
  18        mpsc::{self, UnboundedReceiver},
  19        oneshot,
  20    },
  21    future::{Fuse, Shared},
  22};
  23use gpui::{
  24    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  25    http_client::FakeHttpClient,
  26};
  27use indoc::indoc;
  28use language_model::{
  29    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  30    LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  31    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  32    LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
  33    fake_provider::FakeLanguageModel,
  34};
  35use pretty_assertions::assert_eq;
  36use project::{
  37    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  38};
  39use prompt_store::ProjectContext;
  40use reqwest_client::ReqwestClient;
  41use schemars::JsonSchema;
  42use serde::{Deserialize, Serialize};
  43use serde_json::json;
  44use settings::{Settings, SettingsStore};
  45use std::{
  46    path::Path,
  47    pin::Pin,
  48    rc::Rc,
  49    sync::{
  50        Arc,
  51        atomic::{AtomicBool, Ordering},
  52    },
  53    time::Duration,
  54};
  55use util::path;
  56
  57mod edit_file_thread_test;
  58mod test_tools;
  59use test_tools::*;
  60
  61fn init_test(cx: &mut TestAppContext) {
  62    cx.update(|cx| {
  63        let settings_store = SettingsStore::test(cx);
  64        cx.set_global(settings_store);
  65    });
  66}
  67
  68struct FakeTerminalHandle {
  69    killed: Arc<AtomicBool>,
  70    stopped_by_user: Arc<AtomicBool>,
  71    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  72    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  73    output: acp::TerminalOutputResponse,
  74    id: acp::TerminalId,
  75}
  76
  77impl FakeTerminalHandle {
  78    fn new_never_exits(cx: &mut App) -> Self {
  79        let killed = Arc::new(AtomicBool::new(false));
  80        let stopped_by_user = Arc::new(AtomicBool::new(false));
  81
  82        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  83
  84        let wait_for_exit = cx
  85            .spawn(async move |_cx| {
  86                // Wait for the exit signal (sent when kill() is called)
  87                let _ = exit_receiver.await;
  88                acp::TerminalExitStatus::new()
  89            })
  90            .shared();
  91
  92        Self {
  93            killed,
  94            stopped_by_user,
  95            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  96            wait_for_exit,
  97            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  98            id: acp::TerminalId::new("fake_terminal".to_string()),
  99        }
 100    }
 101
 102    fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
 103        let killed = Arc::new(AtomicBool::new(false));
 104        let stopped_by_user = Arc::new(AtomicBool::new(false));
 105        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 106
 107        let wait_for_exit = cx
 108            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 109            .shared();
 110
 111        Self {
 112            killed,
 113            stopped_by_user,
 114            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 115            wait_for_exit,
 116            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 117            id: acp::TerminalId::new("fake_terminal".to_string()),
 118        }
 119    }
 120
 121    fn was_killed(&self) -> bool {
 122        self.killed.load(Ordering::SeqCst)
 123    }
 124
 125    fn set_stopped_by_user(&self, stopped: bool) {
 126        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 127    }
 128
 129    fn signal_exit(&self) {
 130        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 131            let _ = sender.send(());
 132        }
 133    }
 134}
 135
 136impl crate::TerminalHandle for FakeTerminalHandle {
 137    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 138        Ok(self.id.clone())
 139    }
 140
 141    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 142        Ok(self.output.clone())
 143    }
 144
 145    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 146        Ok(self.wait_for_exit.clone())
 147    }
 148
 149    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 150        self.killed.store(true, Ordering::SeqCst);
 151        self.signal_exit();
 152        Ok(())
 153    }
 154
 155    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 156        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 157    }
 158}
 159
 160struct FakeSubagentHandle {
 161    session_id: acp::SessionId,
 162    send_task: Shared<Task<String>>,
 163}
 164
 165impl SubagentHandle for FakeSubagentHandle {
 166    fn id(&self) -> acp::SessionId {
 167        self.session_id.clone()
 168    }
 169
 170    fn num_entries(&self, _cx: &App) -> usize {
 171        unimplemented!()
 172    }
 173
 174    fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
 175        let task = self.send_task.clone();
 176        cx.background_spawn(async move { Ok(task.await) })
 177    }
 178}
 179
 180#[derive(Default)]
 181struct FakeThreadEnvironment {
 182    terminal_handle: Option<Rc<FakeTerminalHandle>>,
 183    subagent_handle: Option<Rc<FakeSubagentHandle>>,
 184}
 185
 186impl FakeThreadEnvironment {
 187    pub fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
 188        Self {
 189            terminal_handle: Some(terminal_handle.into()),
 190            ..self
 191        }
 192    }
 193}
 194
 195impl crate::ThreadEnvironment for FakeThreadEnvironment {
 196    fn create_terminal(
 197        &self,
 198        _command: String,
 199        _cwd: Option<std::path::PathBuf>,
 200        _output_byte_limit: Option<u64>,
 201        _cx: &mut AsyncApp,
 202    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 203        let handle = self
 204            .terminal_handle
 205            .clone()
 206            .expect("Terminal handle not available on FakeThreadEnvironment");
 207        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 208    }
 209
 210    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 211        Ok(self
 212            .subagent_handle
 213            .clone()
 214            .expect("Subagent handle not available on FakeThreadEnvironment")
 215            as Rc<dyn SubagentHandle>)
 216    }
 217}
 218
 219/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 220struct MultiTerminalEnvironment {
 221    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 222}
 223
 224impl MultiTerminalEnvironment {
 225    fn new() -> Self {
 226        Self {
 227            handles: std::cell::RefCell::new(Vec::new()),
 228        }
 229    }
 230
 231    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 232        self.handles.borrow().clone()
 233    }
 234}
 235
 236impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 237    fn create_terminal(
 238        &self,
 239        _command: String,
 240        _cwd: Option<std::path::PathBuf>,
 241        _output_byte_limit: Option<u64>,
 242        cx: &mut AsyncApp,
 243    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 244        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 245        self.handles.borrow_mut().push(handle.clone());
 246        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 247    }
 248
 249    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 250        unimplemented!()
 251    }
 252}
 253
 254fn always_allow_tools(cx: &mut TestAppContext) {
 255    cx.update(|cx| {
 256        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 257        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
 258        agent_settings::AgentSettings::override_global(settings, cx);
 259    });
 260}
 261
 262#[gpui::test]
 263async fn test_echo(cx: &mut TestAppContext) {
 264    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 265    let fake_model = model.as_fake();
 266
 267    let events = thread
 268        .update(cx, |thread, cx| {
 269            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 270        })
 271        .unwrap();
 272    cx.run_until_parked();
 273    fake_model.send_last_completion_stream_text_chunk("Hello");
 274    fake_model
 275        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 276    fake_model.end_last_completion_stream();
 277
 278    let events = events.collect().await;
 279    thread.update(cx, |thread, _cx| {
 280        assert_eq!(
 281            thread.last_received_or_pending_message().unwrap().role(),
 282            Role::Assistant
 283        );
 284        assert_eq!(
 285            thread
 286                .last_received_or_pending_message()
 287                .unwrap()
 288                .to_markdown(),
 289            "Hello\n"
 290        )
 291    });
 292    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 293}
 294
 295#[gpui::test]
 296async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 297    init_test(cx);
 298    always_allow_tools(cx);
 299
 300    let fs = FakeFs::new(cx.executor());
 301    let project = Project::test(fs, [], cx).await;
 302
 303    let environment = Rc::new(cx.update(|cx| {
 304        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 305    }));
 306    let handle = environment.terminal_handle.clone().unwrap();
 307
 308    #[allow(clippy::arc_with_non_send_sync)]
 309    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 310    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 311
 312    let task = cx.update(|cx| {
 313        tool.run(
 314            ToolInput::resolved(crate::TerminalToolInput {
 315                command: "sleep 1000".to_string(),
 316                cd: ".".to_string(),
 317                timeout_ms: Some(5),
 318            }),
 319            event_stream,
 320            cx,
 321        )
 322    });
 323
 324    let update = rx.expect_update_fields().await;
 325    assert!(
 326        update.content.iter().any(|blocks| {
 327            blocks
 328                .iter()
 329                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 330        }),
 331        "expected tool call update to include terminal content"
 332    );
 333
 334    let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
 335
 336    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 337    loop {
 338        if let Some(result) = task_future.as_mut().now_or_never() {
 339            let result = result.expect("terminal tool task should complete");
 340
 341            assert!(
 342                handle.was_killed(),
 343                "expected terminal handle to be killed on timeout"
 344            );
 345            assert!(
 346                result.contains("partial output"),
 347                "expected result to include terminal output, got: {result}"
 348            );
 349            return;
 350        }
 351
 352        if std::time::Instant::now() >= deadline {
 353            panic!("timed out waiting for terminal tool task to complete");
 354        }
 355
 356        cx.run_until_parked();
 357        cx.background_executor.timer(Duration::from_millis(1)).await;
 358    }
 359}
 360
 361#[gpui::test]
 362#[ignore]
 363async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 364    init_test(cx);
 365    always_allow_tools(cx);
 366
 367    let fs = FakeFs::new(cx.executor());
 368    let project = Project::test(fs, [], cx).await;
 369
 370    let environment = Rc::new(cx.update(|cx| {
 371        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 372    }));
 373    let handle = environment.terminal_handle.clone().unwrap();
 374
 375    #[allow(clippy::arc_with_non_send_sync)]
 376    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 377    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 378
 379    let _task = cx.update(|cx| {
 380        tool.run(
 381            ToolInput::resolved(crate::TerminalToolInput {
 382                command: "sleep 1000".to_string(),
 383                cd: ".".to_string(),
 384                timeout_ms: None,
 385            }),
 386            event_stream,
 387            cx,
 388        )
 389    });
 390
 391    let update = rx.expect_update_fields().await;
 392    assert!(
 393        update.content.iter().any(|blocks| {
 394            blocks
 395                .iter()
 396                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 397        }),
 398        "expected tool call update to include terminal content"
 399    );
 400
 401    cx.background_executor
 402        .timer(Duration::from_millis(25))
 403        .await;
 404
 405    assert!(
 406        !handle.was_killed(),
 407        "did not expect terminal handle to be killed without a timeout"
 408    );
 409}
 410
 411#[gpui::test]
 412async fn test_thinking(cx: &mut TestAppContext) {
 413    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 414    let fake_model = model.as_fake();
 415
 416    let events = thread
 417        .update(cx, |thread, cx| {
 418            thread.send(
 419                UserMessageId::new(),
 420                [indoc! {"
 421                    Testing:
 422
 423                    Generate a thinking step where you just think the word 'Think',
 424                    and have your final answer be 'Hello'
 425                "}],
 426                cx,
 427            )
 428        })
 429        .unwrap();
 430    cx.run_until_parked();
 431    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 432        text: "Think".to_string(),
 433        signature: None,
 434    });
 435    fake_model.send_last_completion_stream_text_chunk("Hello");
 436    fake_model
 437        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 438    fake_model.end_last_completion_stream();
 439
 440    let events = events.collect().await;
 441    thread.update(cx, |thread, _cx| {
 442        assert_eq!(
 443            thread.last_received_or_pending_message().unwrap().role(),
 444            Role::Assistant
 445        );
 446        assert_eq!(
 447            thread
 448                .last_received_or_pending_message()
 449                .unwrap()
 450                .to_markdown(),
 451            indoc! {"
 452                <think>Think</think>
 453                Hello
 454            "}
 455        )
 456    });
 457    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 458}
 459
 460#[gpui::test]
 461async fn test_system_prompt(cx: &mut TestAppContext) {
 462    let ThreadTest {
 463        model,
 464        thread,
 465        project_context,
 466        ..
 467    } = setup(cx, TestModel::Fake).await;
 468    let fake_model = model.as_fake();
 469
 470    project_context.update(cx, |project_context, _cx| {
 471        project_context.shell = "test-shell".into()
 472    });
 473    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 474    thread
 475        .update(cx, |thread, cx| {
 476            thread.send(UserMessageId::new(), ["abc"], cx)
 477        })
 478        .unwrap();
 479    cx.run_until_parked();
 480    let mut pending_completions = fake_model.pending_completions();
 481    assert_eq!(
 482        pending_completions.len(),
 483        1,
 484        "unexpected pending completions: {:?}",
 485        pending_completions
 486    );
 487
 488    let pending_completion = pending_completions.pop().unwrap();
 489    assert_eq!(pending_completion.messages[0].role, Role::System);
 490
 491    let system_message = &pending_completion.messages[0];
 492    let system_prompt = system_message.content[0].to_str().unwrap();
 493    assert!(
 494        system_prompt.contains("test-shell"),
 495        "unexpected system message: {:?}",
 496        system_message
 497    );
 498    assert!(
 499        system_prompt.contains("## Fixing Diagnostics"),
 500        "unexpected system message: {:?}",
 501        system_message
 502    );
 503}
 504
 505#[gpui::test]
 506async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 507    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 508    let fake_model = model.as_fake();
 509
 510    thread
 511        .update(cx, |thread, cx| {
 512            thread.send(UserMessageId::new(), ["abc"], cx)
 513        })
 514        .unwrap();
 515    cx.run_until_parked();
 516    let mut pending_completions = fake_model.pending_completions();
 517    assert_eq!(
 518        pending_completions.len(),
 519        1,
 520        "unexpected pending completions: {:?}",
 521        pending_completions
 522    );
 523
 524    let pending_completion = pending_completions.pop().unwrap();
 525    assert_eq!(pending_completion.messages[0].role, Role::System);
 526
 527    let system_message = &pending_completion.messages[0];
 528    let system_prompt = system_message.content[0].to_str().unwrap();
 529    assert!(
 530        !system_prompt.contains("## Tool Use"),
 531        "unexpected system message: {:?}",
 532        system_message
 533    );
 534    assert!(
 535        !system_prompt.contains("## Fixing Diagnostics"),
 536        "unexpected system message: {:?}",
 537        system_message
 538    );
 539}
 540
 541#[gpui::test]
 542async fn test_prompt_caching(cx: &mut TestAppContext) {
 543    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 544    let fake_model = model.as_fake();
 545
 546    // Send initial user message and verify it's cached
 547    thread
 548        .update(cx, |thread, cx| {
 549            thread.send(UserMessageId::new(), ["Message 1"], cx)
 550        })
 551        .unwrap();
 552    cx.run_until_parked();
 553
 554    let completion = fake_model.pending_completions().pop().unwrap();
 555    assert_eq!(
 556        completion.messages[1..],
 557        vec![LanguageModelRequestMessage {
 558            role: Role::User,
 559            content: vec!["Message 1".into()],
 560            cache: true,
 561            reasoning_details: None,
 562        }]
 563    );
 564    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 565        "Response to Message 1".into(),
 566    ));
 567    fake_model.end_last_completion_stream();
 568    cx.run_until_parked();
 569
 570    // Send another user message and verify only the latest is cached
 571    thread
 572        .update(cx, |thread, cx| {
 573            thread.send(UserMessageId::new(), ["Message 2"], cx)
 574        })
 575        .unwrap();
 576    cx.run_until_parked();
 577
 578    let completion = fake_model.pending_completions().pop().unwrap();
 579    assert_eq!(
 580        completion.messages[1..],
 581        vec![
 582            LanguageModelRequestMessage {
 583                role: Role::User,
 584                content: vec!["Message 1".into()],
 585                cache: false,
 586                reasoning_details: None,
 587            },
 588            LanguageModelRequestMessage {
 589                role: Role::Assistant,
 590                content: vec!["Response to Message 1".into()],
 591                cache: false,
 592                reasoning_details: None,
 593            },
 594            LanguageModelRequestMessage {
 595                role: Role::User,
 596                content: vec!["Message 2".into()],
 597                cache: true,
 598                reasoning_details: None,
 599            }
 600        ]
 601    );
 602    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 603        "Response to Message 2".into(),
 604    ));
 605    fake_model.end_last_completion_stream();
 606    cx.run_until_parked();
 607
 608    // Simulate a tool call and verify that the latest tool result is cached
 609    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 610    thread
 611        .update(cx, |thread, cx| {
 612            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 613        })
 614        .unwrap();
 615    cx.run_until_parked();
 616
 617    let tool_use = LanguageModelToolUse {
 618        id: "tool_1".into(),
 619        name: EchoTool::NAME.into(),
 620        raw_input: json!({"text": "test"}).to_string(),
 621        input: json!({"text": "test"}),
 622        is_input_complete: true,
 623        thought_signature: None,
 624    };
 625    fake_model
 626        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 627    fake_model.end_last_completion_stream();
 628    cx.run_until_parked();
 629
 630    let completion = fake_model.pending_completions().pop().unwrap();
 631    let tool_result = LanguageModelToolResult {
 632        tool_use_id: "tool_1".into(),
 633        tool_name: EchoTool::NAME.into(),
 634        is_error: false,
 635        content: "test".into(),
 636        output: Some("test".into()),
 637    };
 638    assert_eq!(
 639        completion.messages[1..],
 640        vec![
 641            LanguageModelRequestMessage {
 642                role: Role::User,
 643                content: vec!["Message 1".into()],
 644                cache: false,
 645                reasoning_details: None,
 646            },
 647            LanguageModelRequestMessage {
 648                role: Role::Assistant,
 649                content: vec!["Response to Message 1".into()],
 650                cache: false,
 651                reasoning_details: None,
 652            },
 653            LanguageModelRequestMessage {
 654                role: Role::User,
 655                content: vec!["Message 2".into()],
 656                cache: false,
 657                reasoning_details: None,
 658            },
 659            LanguageModelRequestMessage {
 660                role: Role::Assistant,
 661                content: vec!["Response to Message 2".into()],
 662                cache: false,
 663                reasoning_details: None,
 664            },
 665            LanguageModelRequestMessage {
 666                role: Role::User,
 667                content: vec!["Use the echo tool".into()],
 668                cache: false,
 669                reasoning_details: None,
 670            },
 671            LanguageModelRequestMessage {
 672                role: Role::Assistant,
 673                content: vec![MessageContent::ToolUse(tool_use)],
 674                cache: false,
 675                reasoning_details: None,
 676            },
 677            LanguageModelRequestMessage {
 678                role: Role::User,
 679                content: vec![MessageContent::ToolResult(tool_result)],
 680                cache: true,
 681                reasoning_details: None,
 682            }
 683        ]
 684    );
 685}
 686
 687#[gpui::test]
 688#[cfg_attr(not(feature = "e2e"), ignore)]
 689async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 690    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 691
 692    // Test a tool call that's likely to complete *before* streaming stops.
 693    let events = thread
 694        .update(cx, |thread, cx| {
 695            thread.add_tool(EchoTool);
 696            thread.send(
 697                UserMessageId::new(),
 698                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 699                cx,
 700            )
 701        })
 702        .unwrap()
 703        .collect()
 704        .await;
 705    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 706
 707    // Test a tool calls that's likely to complete *after* streaming stops.
 708    let events = thread
 709        .update(cx, |thread, cx| {
 710            thread.remove_tool(&EchoTool::NAME);
 711            thread.add_tool(DelayTool);
 712            thread.send(
 713                UserMessageId::new(),
 714                [
 715                    "Now call the delay tool with 200ms.",
 716                    "When the timer goes off, then you echo the output of the tool.",
 717                ],
 718                cx,
 719            )
 720        })
 721        .unwrap()
 722        .collect()
 723        .await;
 724    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 725    thread.update(cx, |thread, _cx| {
 726        assert!(
 727            thread
 728                .last_received_or_pending_message()
 729                .unwrap()
 730                .as_agent_message()
 731                .unwrap()
 732                .content
 733                .iter()
 734                .any(|content| {
 735                    if let AgentMessageContent::Text(text) = content {
 736                        text.contains("Ding")
 737                    } else {
 738                        false
 739                    }
 740                }),
 741            "{}",
 742            thread.to_markdown()
 743        );
 744    });
 745}
 746
 747#[gpui::test]
 748#[cfg_attr(not(feature = "e2e"), ignore)]
 749async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 750    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 751
 752    // Test a tool call that's likely to complete *before* streaming stops.
 753    let mut events = thread
 754        .update(cx, |thread, cx| {
 755            thread.add_tool(WordListTool);
 756            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 757        })
 758        .unwrap();
 759
 760    let mut saw_partial_tool_use = false;
 761    while let Some(event) = events.next().await {
 762        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 763            thread.update(cx, |thread, _cx| {
 764                // Look for a tool use in the thread's last message
 765                let message = thread.last_received_or_pending_message().unwrap();
 766                let agent_message = message.as_agent_message().unwrap();
 767                let last_content = agent_message.content.last().unwrap();
 768                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 769                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 770                    if tool_call.status == acp::ToolCallStatus::Pending {
 771                        if !last_tool_use.is_input_complete
 772                            && last_tool_use.input.get("g").is_none()
 773                        {
 774                            saw_partial_tool_use = true;
 775                        }
 776                    } else {
 777                        last_tool_use
 778                            .input
 779                            .get("a")
 780                            .expect("'a' has streamed because input is now complete");
 781                        last_tool_use
 782                            .input
 783                            .get("g")
 784                            .expect("'g' has streamed because input is now complete");
 785                    }
 786                } else {
 787                    panic!("last content should be a tool use");
 788                }
 789            });
 790        }
 791    }
 792
 793    assert!(
 794        saw_partial_tool_use,
 795        "should see at least one partially streamed tool use in the history"
 796    );
 797}
 798
 799#[gpui::test]
 800async fn test_tool_authorization(cx: &mut TestAppContext) {
 801    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 802    let fake_model = model.as_fake();
 803
 804    let mut events = thread
 805        .update(cx, |thread, cx| {
 806            thread.add_tool(ToolRequiringPermission);
 807            thread.send(UserMessageId::new(), ["abc"], cx)
 808        })
 809        .unwrap();
 810    cx.run_until_parked();
 811    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 812        LanguageModelToolUse {
 813            id: "tool_id_1".into(),
 814            name: ToolRequiringPermission::NAME.into(),
 815            raw_input: "{}".into(),
 816            input: json!({}),
 817            is_input_complete: true,
 818            thought_signature: None,
 819        },
 820    ));
 821    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 822        LanguageModelToolUse {
 823            id: "tool_id_2".into(),
 824            name: ToolRequiringPermission::NAME.into(),
 825            raw_input: "{}".into(),
 826            input: json!({}),
 827            is_input_complete: true,
 828            thought_signature: None,
 829        },
 830    ));
 831    fake_model.end_last_completion_stream();
 832    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 833    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 834
 835    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 836    tool_call_auth_1
 837        .response
 838        .send(acp::PermissionOptionId::new("allow"))
 839        .unwrap();
 840    cx.run_until_parked();
 841
 842    // Reject the second - send "deny" option_id directly since Deny is now a button
 843    tool_call_auth_2
 844        .response
 845        .send(acp::PermissionOptionId::new("deny"))
 846        .unwrap();
 847    cx.run_until_parked();
 848
 849    let completion = fake_model.pending_completions().pop().unwrap();
 850    let message = completion.messages.last().unwrap();
 851    assert_eq!(
 852        message.content,
 853        vec![
 854            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 855                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 856                tool_name: ToolRequiringPermission::NAME.into(),
 857                is_error: false,
 858                content: "Allowed".into(),
 859                output: Some("Allowed".into())
 860            }),
 861            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 862                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 863                tool_name: ToolRequiringPermission::NAME.into(),
 864                is_error: true,
 865                content: "Permission to run tool denied by user".into(),
 866                output: Some("Permission to run tool denied by user".into())
 867            })
 868        ]
 869    );
 870
 871    // Simulate yet another tool call.
 872    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 873        LanguageModelToolUse {
 874            id: "tool_id_3".into(),
 875            name: ToolRequiringPermission::NAME.into(),
 876            raw_input: "{}".into(),
 877            input: json!({}),
 878            is_input_complete: true,
 879            thought_signature: None,
 880        },
 881    ));
 882    fake_model.end_last_completion_stream();
 883
 884    // Respond by always allowing tools - send transformed option_id
 885    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 886    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 887    tool_call_auth_3
 888        .response
 889        .send(acp::PermissionOptionId::new(
 890            "always_allow:tool_requiring_permission",
 891        ))
 892        .unwrap();
 893    cx.run_until_parked();
 894    let completion = fake_model.pending_completions().pop().unwrap();
 895    let message = completion.messages.last().unwrap();
 896    assert_eq!(
 897        message.content,
 898        vec![language_model::MessageContent::ToolResult(
 899            LanguageModelToolResult {
 900                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 901                tool_name: ToolRequiringPermission::NAME.into(),
 902                is_error: false,
 903                content: "Allowed".into(),
 904                output: Some("Allowed".into())
 905            }
 906        )]
 907    );
 908
 909    // Simulate a final tool call, ensuring we don't trigger authorization.
 910    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 911        LanguageModelToolUse {
 912            id: "tool_id_4".into(),
 913            name: ToolRequiringPermission::NAME.into(),
 914            raw_input: "{}".into(),
 915            input: json!({}),
 916            is_input_complete: true,
 917            thought_signature: None,
 918        },
 919    ));
 920    fake_model.end_last_completion_stream();
 921    cx.run_until_parked();
 922    let completion = fake_model.pending_completions().pop().unwrap();
 923    let message = completion.messages.last().unwrap();
 924    assert_eq!(
 925        message.content,
 926        vec![language_model::MessageContent::ToolResult(
 927            LanguageModelToolResult {
 928                tool_use_id: "tool_id_4".into(),
 929                tool_name: ToolRequiringPermission::NAME.into(),
 930                is_error: false,
 931                content: "Allowed".into(),
 932                output: Some("Allowed".into())
 933            }
 934        )]
 935    );
 936}
 937
 938#[gpui::test]
 939async fn test_tool_hallucination(cx: &mut TestAppContext) {
 940    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 941    let fake_model = model.as_fake();
 942
 943    let mut events = thread
 944        .update(cx, |thread, cx| {
 945            thread.send(UserMessageId::new(), ["abc"], cx)
 946        })
 947        .unwrap();
 948    cx.run_until_parked();
 949    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 950        LanguageModelToolUse {
 951            id: "tool_id_1".into(),
 952            name: "nonexistent_tool".into(),
 953            raw_input: "{}".into(),
 954            input: json!({}),
 955            is_input_complete: true,
 956            thought_signature: None,
 957        },
 958    ));
 959    fake_model.end_last_completion_stream();
 960
 961    let tool_call = expect_tool_call(&mut events).await;
 962    assert_eq!(tool_call.title, "nonexistent_tool");
 963    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 964    let update = expect_tool_call_update_fields(&mut events).await;
 965    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 966}
 967
 968async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 969    let event = events
 970        .next()
 971        .await
 972        .expect("no tool call authorization event received")
 973        .unwrap();
 974    match event {
 975        ThreadEvent::ToolCall(tool_call) => tool_call,
 976        event => {
 977            panic!("Unexpected event {event:?}");
 978        }
 979    }
 980}
 981
 982async fn expect_tool_call_update_fields(
 983    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 984) -> acp::ToolCallUpdate {
 985    let event = events
 986        .next()
 987        .await
 988        .expect("no tool call authorization event received")
 989        .unwrap();
 990    match event {
 991        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
 992        event => {
 993            panic!("Unexpected event {event:?}");
 994        }
 995    }
 996}
 997
 998async fn next_tool_call_authorization(
 999    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1000) -> ToolCallAuthorization {
1001    loop {
1002        let event = events
1003            .next()
1004            .await
1005            .expect("no tool call authorization event received")
1006            .unwrap();
1007        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1008            let permission_kinds = tool_call_authorization
1009                .options
1010                .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1011                .map(|option| option.kind);
1012            let allow_once = tool_call_authorization
1013                .options
1014                .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1015                .map(|option| option.kind);
1016
1017            assert_eq!(
1018                permission_kinds,
1019                Some(acp::PermissionOptionKind::AllowAlways)
1020            );
1021            assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1022            return tool_call_authorization;
1023        }
1024    }
1025}
1026
1027#[test]
1028fn test_permission_options_terminal_with_pattern() {
1029    let permission_options = ToolPermissionContext::new(
1030        TerminalTool::NAME,
1031        vec!["cargo build --release".to_string()],
1032    )
1033    .build_permission_options();
1034
1035    let PermissionOptions::Dropdown(choices) = permission_options else {
1036        panic!("Expected dropdown permission options");
1037    };
1038
1039    assert_eq!(choices.len(), 3);
1040    let labels: Vec<&str> = choices
1041        .iter()
1042        .map(|choice| choice.allow.name.as_ref())
1043        .collect();
1044    assert!(labels.contains(&"Always for terminal"));
1045    assert!(labels.contains(&"Always for `cargo build` commands"));
1046    assert!(labels.contains(&"Only this time"));
1047}
1048
1049#[test]
1050fn test_permission_options_terminal_command_with_flag_second_token() {
1051    let permission_options =
1052        ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1053            .build_permission_options();
1054
1055    let PermissionOptions::Dropdown(choices) = permission_options else {
1056        panic!("Expected dropdown permission options");
1057    };
1058
1059    assert_eq!(choices.len(), 3);
1060    let labels: Vec<&str> = choices
1061        .iter()
1062        .map(|choice| choice.allow.name.as_ref())
1063        .collect();
1064    assert!(labels.contains(&"Always for terminal"));
1065    assert!(labels.contains(&"Always for `ls` commands"));
1066    assert!(labels.contains(&"Only this time"));
1067}
1068
1069#[test]
1070fn test_permission_options_terminal_single_word_command() {
1071    let permission_options =
1072        ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1073            .build_permission_options();
1074
1075    let PermissionOptions::Dropdown(choices) = permission_options else {
1076        panic!("Expected dropdown permission options");
1077    };
1078
1079    assert_eq!(choices.len(), 3);
1080    let labels: Vec<&str> = choices
1081        .iter()
1082        .map(|choice| choice.allow.name.as_ref())
1083        .collect();
1084    assert!(labels.contains(&"Always for terminal"));
1085    assert!(labels.contains(&"Always for `whoami` commands"));
1086    assert!(labels.contains(&"Only this time"));
1087}
1088
1089#[test]
1090fn test_permission_options_edit_file_with_path_pattern() {
1091    let permission_options =
1092        ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1093            .build_permission_options();
1094
1095    let PermissionOptions::Dropdown(choices) = permission_options else {
1096        panic!("Expected dropdown permission options");
1097    };
1098
1099    let labels: Vec<&str> = choices
1100        .iter()
1101        .map(|choice| choice.allow.name.as_ref())
1102        .collect();
1103    assert!(labels.contains(&"Always for edit file"));
1104    assert!(labels.contains(&"Always for `src/`"));
1105}
1106
1107#[test]
1108fn test_permission_options_fetch_with_domain_pattern() {
1109    let permission_options =
1110        ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1111            .build_permission_options();
1112
1113    let PermissionOptions::Dropdown(choices) = permission_options else {
1114        panic!("Expected dropdown permission options");
1115    };
1116
1117    let labels: Vec<&str> = choices
1118        .iter()
1119        .map(|choice| choice.allow.name.as_ref())
1120        .collect();
1121    assert!(labels.contains(&"Always for fetch"));
1122    assert!(labels.contains(&"Always for `docs.rs`"));
1123}
1124
1125#[test]
1126fn test_permission_options_without_pattern() {
1127    let permission_options = ToolPermissionContext::new(
1128        TerminalTool::NAME,
1129        vec!["./deploy.sh --production".to_string()],
1130    )
1131    .build_permission_options();
1132
1133    let PermissionOptions::Dropdown(choices) = permission_options else {
1134        panic!("Expected dropdown permission options");
1135    };
1136
1137    assert_eq!(choices.len(), 2);
1138    let labels: Vec<&str> = choices
1139        .iter()
1140        .map(|choice| choice.allow.name.as_ref())
1141        .collect();
1142    assert!(labels.contains(&"Always for terminal"));
1143    assert!(labels.contains(&"Only this time"));
1144    assert!(!labels.iter().any(|label| label.contains("commands")));
1145}
1146
1147#[test]
1148fn test_permission_options_symlink_target_are_flat_once_only() {
1149    let permission_options =
1150        ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1151            .build_permission_options();
1152
1153    let PermissionOptions::Flat(options) = permission_options else {
1154        panic!("Expected flat permission options for symlink target authorization");
1155    };
1156
1157    assert_eq!(options.len(), 2);
1158    assert!(options.iter().any(|option| {
1159        option.option_id.0.as_ref() == "allow"
1160            && option.kind == acp::PermissionOptionKind::AllowOnce
1161    }));
1162    assert!(options.iter().any(|option| {
1163        option.option_id.0.as_ref() == "deny"
1164            && option.kind == acp::PermissionOptionKind::RejectOnce
1165    }));
1166}
1167
1168#[test]
1169fn test_permission_option_ids_for_terminal() {
1170    let permission_options = ToolPermissionContext::new(
1171        TerminalTool::NAME,
1172        vec!["cargo build --release".to_string()],
1173    )
1174    .build_permission_options();
1175
1176    let PermissionOptions::Dropdown(choices) = permission_options else {
1177        panic!("Expected dropdown permission options");
1178    };
1179
1180    let allow_ids: Vec<String> = choices
1181        .iter()
1182        .map(|choice| choice.allow.option_id.0.to_string())
1183        .collect();
1184    let deny_ids: Vec<String> = choices
1185        .iter()
1186        .map(|choice| choice.deny.option_id.0.to_string())
1187        .collect();
1188
1189    assert!(allow_ids.contains(&"always_allow:terminal".to_string()));
1190    assert!(allow_ids.contains(&"allow".to_string()));
1191    assert!(
1192        allow_ids
1193            .iter()
1194            .any(|id| id.starts_with("always_allow_pattern:terminal\n")),
1195        "Missing allow pattern option"
1196    );
1197
1198    assert!(deny_ids.contains(&"always_deny:terminal".to_string()));
1199    assert!(deny_ids.contains(&"deny".to_string()));
1200    assert!(
1201        deny_ids
1202            .iter()
1203            .any(|id| id.starts_with("always_deny_pattern:terminal\n")),
1204        "Missing deny pattern option"
1205    );
1206}
1207
1208#[gpui::test]
1209#[cfg_attr(not(feature = "e2e"), ignore)]
1210async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1211    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1212
1213    // Test concurrent tool calls with different delay times
1214    let events = thread
1215        .update(cx, |thread, cx| {
1216            thread.add_tool(DelayTool);
1217            thread.send(
1218                UserMessageId::new(),
1219                [
1220                    "Call the delay tool twice in the same message.",
1221                    "Once with 100ms. Once with 300ms.",
1222                    "When both timers are complete, describe the outputs.",
1223                ],
1224                cx,
1225            )
1226        })
1227        .unwrap()
1228        .collect()
1229        .await;
1230
1231    let stop_reasons = stop_events(events);
1232    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1233
1234    thread.update(cx, |thread, _cx| {
1235        let last_message = thread.last_received_or_pending_message().unwrap();
1236        let agent_message = last_message.as_agent_message().unwrap();
1237        let text = agent_message
1238            .content
1239            .iter()
1240            .filter_map(|content| {
1241                if let AgentMessageContent::Text(text) = content {
1242                    Some(text.as_str())
1243                } else {
1244                    None
1245                }
1246            })
1247            .collect::<String>();
1248
1249        assert!(text.contains("Ding"));
1250    });
1251}
1252
1253#[gpui::test]
1254async fn test_profiles(cx: &mut TestAppContext) {
1255    let ThreadTest {
1256        model, thread, fs, ..
1257    } = setup(cx, TestModel::Fake).await;
1258    let fake_model = model.as_fake();
1259
1260    thread.update(cx, |thread, _cx| {
1261        thread.add_tool(DelayTool);
1262        thread.add_tool(EchoTool);
1263        thread.add_tool(InfiniteTool);
1264    });
1265
1266    // Override profiles and wait for settings to be loaded.
1267    fs.insert_file(
1268        paths::settings_file(),
1269        json!({
1270            "agent": {
1271                "profiles": {
1272                    "test-1": {
1273                        "name": "Test Profile 1",
1274                        "tools": {
1275                            EchoTool::NAME: true,
1276                            DelayTool::NAME: true,
1277                        }
1278                    },
1279                    "test-2": {
1280                        "name": "Test Profile 2",
1281                        "tools": {
1282                            InfiniteTool::NAME: true,
1283                        }
1284                    }
1285                }
1286            }
1287        })
1288        .to_string()
1289        .into_bytes(),
1290    )
1291    .await;
1292    cx.run_until_parked();
1293
1294    // Test that test-1 profile (default) has echo and delay tools
1295    thread
1296        .update(cx, |thread, cx| {
1297            thread.set_profile(AgentProfileId("test-1".into()), cx);
1298            thread.send(UserMessageId::new(), ["test"], cx)
1299        })
1300        .unwrap();
1301    cx.run_until_parked();
1302
1303    let mut pending_completions = fake_model.pending_completions();
1304    assert_eq!(pending_completions.len(), 1);
1305    let completion = pending_completions.pop().unwrap();
1306    let tool_names: Vec<String> = completion
1307        .tools
1308        .iter()
1309        .map(|tool| tool.name.clone())
1310        .collect();
1311    assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1312    fake_model.end_last_completion_stream();
1313
1314    // Switch to test-2 profile, and verify that it has only the infinite tool.
1315    thread
1316        .update(cx, |thread, cx| {
1317            thread.set_profile(AgentProfileId("test-2".into()), cx);
1318            thread.send(UserMessageId::new(), ["test2"], cx)
1319        })
1320        .unwrap();
1321    cx.run_until_parked();
1322    let mut pending_completions = fake_model.pending_completions();
1323    assert_eq!(pending_completions.len(), 1);
1324    let completion = pending_completions.pop().unwrap();
1325    let tool_names: Vec<String> = completion
1326        .tools
1327        .iter()
1328        .map(|tool| tool.name.clone())
1329        .collect();
1330    assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1331}
1332
1333#[gpui::test]
1334async fn test_mcp_tools(cx: &mut TestAppContext) {
1335    let ThreadTest {
1336        model,
1337        thread,
1338        context_server_store,
1339        fs,
1340        ..
1341    } = setup(cx, TestModel::Fake).await;
1342    let fake_model = model.as_fake();
1343
1344    // Override profiles and wait for settings to be loaded.
1345    fs.insert_file(
1346        paths::settings_file(),
1347        json!({
1348            "agent": {
1349                "tool_permissions": { "default": "allow" },
1350                "profiles": {
1351                    "test": {
1352                        "name": "Test Profile",
1353                        "enable_all_context_servers": true,
1354                        "tools": {
1355                            EchoTool::NAME: true,
1356                        }
1357                    },
1358                }
1359            }
1360        })
1361        .to_string()
1362        .into_bytes(),
1363    )
1364    .await;
1365    cx.run_until_parked();
1366    thread.update(cx, |thread, cx| {
1367        thread.set_profile(AgentProfileId("test".into()), cx)
1368    });
1369
1370    let mut mcp_tool_calls = setup_context_server(
1371        "test_server",
1372        vec![context_server::types::Tool {
1373            name: "echo".into(),
1374            description: None,
1375            input_schema: serde_json::to_value(EchoTool::input_schema(
1376                LanguageModelToolSchemaFormat::JsonSchema,
1377            ))
1378            .unwrap(),
1379            output_schema: None,
1380            annotations: None,
1381        }],
1382        &context_server_store,
1383        cx,
1384    );
1385
1386    let events = thread.update(cx, |thread, cx| {
1387        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1388    });
1389    cx.run_until_parked();
1390
1391    // Simulate the model calling the MCP tool.
1392    let completion = fake_model.pending_completions().pop().unwrap();
1393    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1394    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1395        LanguageModelToolUse {
1396            id: "tool_1".into(),
1397            name: "echo".into(),
1398            raw_input: json!({"text": "test"}).to_string(),
1399            input: json!({"text": "test"}),
1400            is_input_complete: true,
1401            thought_signature: None,
1402        },
1403    ));
1404    fake_model.end_last_completion_stream();
1405    cx.run_until_parked();
1406
1407    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1408    assert_eq!(tool_call_params.name, "echo");
1409    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1410    tool_call_response
1411        .send(context_server::types::CallToolResponse {
1412            content: vec![context_server::types::ToolResponseContent::Text {
1413                text: "test".into(),
1414            }],
1415            is_error: None,
1416            meta: None,
1417            structured_content: None,
1418        })
1419        .unwrap();
1420    cx.run_until_parked();
1421
1422    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1423    fake_model.send_last_completion_stream_text_chunk("Done!");
1424    fake_model.end_last_completion_stream();
1425    events.collect::<Vec<_>>().await;
1426
1427    // Send again after adding the echo tool, ensuring the name collision is resolved.
1428    let events = thread.update(cx, |thread, cx| {
1429        thread.add_tool(EchoTool);
1430        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1431    });
1432    cx.run_until_parked();
1433    let completion = fake_model.pending_completions().pop().unwrap();
1434    assert_eq!(
1435        tool_names_for_completion(&completion),
1436        vec!["echo", "test_server_echo"]
1437    );
1438    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1439        LanguageModelToolUse {
1440            id: "tool_2".into(),
1441            name: "test_server_echo".into(),
1442            raw_input: json!({"text": "mcp"}).to_string(),
1443            input: json!({"text": "mcp"}),
1444            is_input_complete: true,
1445            thought_signature: None,
1446        },
1447    ));
1448    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1449        LanguageModelToolUse {
1450            id: "tool_3".into(),
1451            name: "echo".into(),
1452            raw_input: json!({"text": "native"}).to_string(),
1453            input: json!({"text": "native"}),
1454            is_input_complete: true,
1455            thought_signature: None,
1456        },
1457    ));
1458    fake_model.end_last_completion_stream();
1459    cx.run_until_parked();
1460
1461    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1462    assert_eq!(tool_call_params.name, "echo");
1463    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1464    tool_call_response
1465        .send(context_server::types::CallToolResponse {
1466            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1467            is_error: None,
1468            meta: None,
1469            structured_content: None,
1470        })
1471        .unwrap();
1472    cx.run_until_parked();
1473
1474    // Ensure the tool results were inserted with the correct names.
1475    let completion = fake_model.pending_completions().pop().unwrap();
1476    assert_eq!(
1477        completion.messages.last().unwrap().content,
1478        vec![
1479            MessageContent::ToolResult(LanguageModelToolResult {
1480                tool_use_id: "tool_3".into(),
1481                tool_name: "echo".into(),
1482                is_error: false,
1483                content: "native".into(),
1484                output: Some("native".into()),
1485            },),
1486            MessageContent::ToolResult(LanguageModelToolResult {
1487                tool_use_id: "tool_2".into(),
1488                tool_name: "test_server_echo".into(),
1489                is_error: false,
1490                content: "mcp".into(),
1491                output: Some("mcp".into()),
1492            },),
1493        ]
1494    );
1495    fake_model.end_last_completion_stream();
1496    events.collect::<Vec<_>>().await;
1497}
1498
1499#[gpui::test]
1500async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1501    let ThreadTest {
1502        model,
1503        thread,
1504        context_server_store,
1505        fs,
1506        ..
1507    } = setup(cx, TestModel::Fake).await;
1508    let fake_model = model.as_fake();
1509
1510    // Setup settings to allow MCP tools
1511    fs.insert_file(
1512        paths::settings_file(),
1513        json!({
1514            "agent": {
1515                "always_allow_tool_actions": true,
1516                "profiles": {
1517                    "test": {
1518                        "name": "Test Profile",
1519                        "enable_all_context_servers": true,
1520                        "tools": {}
1521                    },
1522                }
1523            }
1524        })
1525        .to_string()
1526        .into_bytes(),
1527    )
1528    .await;
1529    cx.run_until_parked();
1530    thread.update(cx, |thread, cx| {
1531        thread.set_profile(AgentProfileId("test".into()), cx)
1532    });
1533
1534    // Setup a context server with a tool
1535    let mut mcp_tool_calls = setup_context_server(
1536        "github_server",
1537        vec![context_server::types::Tool {
1538            name: "issue_read".into(),
1539            description: Some("Read a GitHub issue".into()),
1540            input_schema: json!({
1541                "type": "object",
1542                "properties": {
1543                    "issue_url": { "type": "string" }
1544                }
1545            }),
1546            output_schema: None,
1547            annotations: None,
1548        }],
1549        &context_server_store,
1550        cx,
1551    );
1552
1553    // Send a message and have the model call the MCP tool
1554    let events = thread.update(cx, |thread, cx| {
1555        thread
1556            .send(UserMessageId::new(), ["Read issue #47404"], cx)
1557            .unwrap()
1558    });
1559    cx.run_until_parked();
1560
1561    // Verify the MCP tool is available to the model
1562    let completion = fake_model.pending_completions().pop().unwrap();
1563    assert_eq!(
1564        tool_names_for_completion(&completion),
1565        vec!["issue_read"],
1566        "MCP tool should be available"
1567    );
1568
1569    // Simulate the model calling the MCP tool
1570    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1571        LanguageModelToolUse {
1572            id: "tool_1".into(),
1573            name: "issue_read".into(),
1574            raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1575                .to_string(),
1576            input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1577            is_input_complete: true,
1578            thought_signature: None,
1579        },
1580    ));
1581    fake_model.end_last_completion_stream();
1582    cx.run_until_parked();
1583
1584    // The MCP server receives the tool call and responds with content
1585    let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1586    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1587    assert_eq!(tool_call_params.name, "issue_read");
1588    tool_call_response
1589        .send(context_server::types::CallToolResponse {
1590            content: vec![context_server::types::ToolResponseContent::Text {
1591                text: expected_tool_output.into(),
1592            }],
1593            is_error: None,
1594            meta: None,
1595            structured_content: None,
1596        })
1597        .unwrap();
1598    cx.run_until_parked();
1599
1600    // After tool completes, the model continues with a new completion request
1601    // that includes the tool results. We need to respond to this.
1602    let _completion = fake_model.pending_completions().pop().unwrap();
1603    fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1604    fake_model
1605        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1606    fake_model.end_last_completion_stream();
1607    events.collect::<Vec<_>>().await;
1608
1609    // Verify the tool result is stored in the thread by checking the markdown output.
1610    // The tool result is in the first assistant message (not the last one, which is
1611    // the model's response after the tool completed).
1612    thread.update(cx, |thread, _cx| {
1613        let markdown = thread.to_markdown();
1614        assert!(
1615            markdown.contains("**Tool Result**: issue_read"),
1616            "Thread should contain tool result header"
1617        );
1618        assert!(
1619            markdown.contains(expected_tool_output),
1620            "Thread should contain tool output: {}",
1621            expected_tool_output
1622        );
1623    });
1624
1625    // Simulate app restart: disconnect the MCP server.
1626    // After restart, the MCP server won't be connected yet when the thread is replayed.
1627    context_server_store.update(cx, |store, cx| {
1628        let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1629    });
1630    cx.run_until_parked();
1631
1632    // Replay the thread (this is what happens when loading a saved thread)
1633    let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1634
1635    let mut found_tool_call = None;
1636    let mut found_tool_call_update_with_output = None;
1637
1638    while let Some(event) = replay_events.next().await {
1639        let event = event.unwrap();
1640        match &event {
1641            ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1642                found_tool_call = Some(tc.clone());
1643            }
1644            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1645                if update.tool_call_id.to_string() == "tool_1" =>
1646            {
1647                if update.fields.raw_output.is_some() {
1648                    found_tool_call_update_with_output = Some(update.clone());
1649                }
1650            }
1651            _ => {}
1652        }
1653    }
1654
1655    // The tool call should be found
1656    assert!(
1657        found_tool_call.is_some(),
1658        "Tool call should be emitted during replay"
1659    );
1660
1661    assert!(
1662        found_tool_call_update_with_output.is_some(),
1663        "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1664    );
1665
1666    let update = found_tool_call_update_with_output.unwrap();
1667    assert_eq!(
1668        update.fields.raw_output,
1669        Some(expected_tool_output.into()),
1670        "raw_output should contain the saved tool result"
1671    );
1672
1673    // Also verify the status is correct (completed, not failed)
1674    assert_eq!(
1675        update.fields.status,
1676        Some(acp::ToolCallStatus::Completed),
1677        "Tool call status should reflect the original completion status"
1678    );
1679}
1680
1681#[gpui::test]
1682async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1683    let ThreadTest {
1684        model,
1685        thread,
1686        context_server_store,
1687        fs,
1688        ..
1689    } = setup(cx, TestModel::Fake).await;
1690    let fake_model = model.as_fake();
1691
1692    // Set up a profile with all tools enabled
1693    fs.insert_file(
1694        paths::settings_file(),
1695        json!({
1696            "agent": {
1697                "profiles": {
1698                    "test": {
1699                        "name": "Test Profile",
1700                        "enable_all_context_servers": true,
1701                        "tools": {
1702                            EchoTool::NAME: true,
1703                            DelayTool::NAME: true,
1704                            WordListTool::NAME: true,
1705                            ToolRequiringPermission::NAME: true,
1706                            InfiniteTool::NAME: true,
1707                        }
1708                    },
1709                }
1710            }
1711        })
1712        .to_string()
1713        .into_bytes(),
1714    )
1715    .await;
1716    cx.run_until_parked();
1717
1718    thread.update(cx, |thread, cx| {
1719        thread.set_profile(AgentProfileId("test".into()), cx);
1720        thread.add_tool(EchoTool);
1721        thread.add_tool(DelayTool);
1722        thread.add_tool(WordListTool);
1723        thread.add_tool(ToolRequiringPermission);
1724        thread.add_tool(InfiniteTool);
1725    });
1726
1727    // Set up multiple context servers with some overlapping tool names
1728    let _server1_calls = setup_context_server(
1729        "xxx",
1730        vec![
1731            context_server::types::Tool {
1732                name: "echo".into(), // Conflicts with native EchoTool
1733                description: None,
1734                input_schema: serde_json::to_value(EchoTool::input_schema(
1735                    LanguageModelToolSchemaFormat::JsonSchema,
1736                ))
1737                .unwrap(),
1738                output_schema: None,
1739                annotations: None,
1740            },
1741            context_server::types::Tool {
1742                name: "unique_tool_1".into(),
1743                description: None,
1744                input_schema: json!({"type": "object", "properties": {}}),
1745                output_schema: None,
1746                annotations: None,
1747            },
1748        ],
1749        &context_server_store,
1750        cx,
1751    );
1752
1753    let _server2_calls = setup_context_server(
1754        "yyy",
1755        vec![
1756            context_server::types::Tool {
1757                name: "echo".into(), // Also conflicts with native EchoTool
1758                description: None,
1759                input_schema: serde_json::to_value(EchoTool::input_schema(
1760                    LanguageModelToolSchemaFormat::JsonSchema,
1761                ))
1762                .unwrap(),
1763                output_schema: None,
1764                annotations: None,
1765            },
1766            context_server::types::Tool {
1767                name: "unique_tool_2".into(),
1768                description: None,
1769                input_schema: json!({"type": "object", "properties": {}}),
1770                output_schema: None,
1771                annotations: None,
1772            },
1773            context_server::types::Tool {
1774                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1775                description: None,
1776                input_schema: json!({"type": "object", "properties": {}}),
1777                output_schema: None,
1778                annotations: None,
1779            },
1780            context_server::types::Tool {
1781                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1782                description: None,
1783                input_schema: json!({"type": "object", "properties": {}}),
1784                output_schema: None,
1785                annotations: None,
1786            },
1787        ],
1788        &context_server_store,
1789        cx,
1790    );
1791    let _server3_calls = setup_context_server(
1792        "zzz",
1793        vec![
1794            context_server::types::Tool {
1795                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1796                description: None,
1797                input_schema: json!({"type": "object", "properties": {}}),
1798                output_schema: None,
1799                annotations: None,
1800            },
1801            context_server::types::Tool {
1802                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1803                description: None,
1804                input_schema: json!({"type": "object", "properties": {}}),
1805                output_schema: None,
1806                annotations: None,
1807            },
1808            context_server::types::Tool {
1809                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1810                description: None,
1811                input_schema: json!({"type": "object", "properties": {}}),
1812                output_schema: None,
1813                annotations: None,
1814            },
1815        ],
1816        &context_server_store,
1817        cx,
1818    );
1819
1820    // Server with spaces in name - tests snake_case conversion for API compatibility
1821    let _server4_calls = setup_context_server(
1822        "Azure DevOps",
1823        vec![context_server::types::Tool {
1824            name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1825            description: None,
1826            input_schema: serde_json::to_value(EchoTool::input_schema(
1827                LanguageModelToolSchemaFormat::JsonSchema,
1828            ))
1829            .unwrap(),
1830            output_schema: None,
1831            annotations: None,
1832        }],
1833        &context_server_store,
1834        cx,
1835    );
1836
1837    thread
1838        .update(cx, |thread, cx| {
1839            thread.send(UserMessageId::new(), ["Go"], cx)
1840        })
1841        .unwrap();
1842    cx.run_until_parked();
1843    let completion = fake_model.pending_completions().pop().unwrap();
1844    assert_eq!(
1845        tool_names_for_completion(&completion),
1846        vec![
1847            "azure_dev_ops_echo",
1848            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1849            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1850            "delay",
1851            "echo",
1852            "infinite",
1853            "tool_requiring_permission",
1854            "unique_tool_1",
1855            "unique_tool_2",
1856            "word_list",
1857            "xxx_echo",
1858            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1859            "yyy_echo",
1860            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1861        ]
1862    );
1863}
1864
1865#[gpui::test]
1866#[cfg_attr(not(feature = "e2e"), ignore)]
1867async fn test_cancellation(cx: &mut TestAppContext) {
1868    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1869
1870    let mut events = thread
1871        .update(cx, |thread, cx| {
1872            thread.add_tool(InfiniteTool);
1873            thread.add_tool(EchoTool);
1874            thread.send(
1875                UserMessageId::new(),
1876                ["Call the echo tool, then call the infinite tool, then explain their output"],
1877                cx,
1878            )
1879        })
1880        .unwrap();
1881
1882    // Wait until both tools are called.
1883    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1884    let mut echo_id = None;
1885    let mut echo_completed = false;
1886    while let Some(event) = events.next().await {
1887        match event.unwrap() {
1888            ThreadEvent::ToolCall(tool_call) => {
1889                assert_eq!(tool_call.title, expected_tools.remove(0));
1890                if tool_call.title == "Echo" {
1891                    echo_id = Some(tool_call.tool_call_id);
1892                }
1893            }
1894            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1895                acp::ToolCallUpdate {
1896                    tool_call_id,
1897                    fields:
1898                        acp::ToolCallUpdateFields {
1899                            status: Some(acp::ToolCallStatus::Completed),
1900                            ..
1901                        },
1902                    ..
1903                },
1904            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1905                echo_completed = true;
1906            }
1907            _ => {}
1908        }
1909
1910        if expected_tools.is_empty() && echo_completed {
1911            break;
1912        }
1913    }
1914
1915    // Cancel the current send and ensure that the event stream is closed, even
1916    // if one of the tools is still running.
1917    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1918    let events = events.collect::<Vec<_>>().await;
1919    let last_event = events.last();
1920    assert!(
1921        matches!(
1922            last_event,
1923            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1924        ),
1925        "unexpected event {last_event:?}"
1926    );
1927
1928    // Ensure we can still send a new message after cancellation.
1929    let events = thread
1930        .update(cx, |thread, cx| {
1931            thread.send(
1932                UserMessageId::new(),
1933                ["Testing: reply with 'Hello' then stop."],
1934                cx,
1935            )
1936        })
1937        .unwrap()
1938        .collect::<Vec<_>>()
1939        .await;
1940    thread.update(cx, |thread, _cx| {
1941        let message = thread.last_received_or_pending_message().unwrap();
1942        let agent_message = message.as_agent_message().unwrap();
1943        assert_eq!(
1944            agent_message.content,
1945            vec![AgentMessageContent::Text("Hello".to_string())]
1946        );
1947    });
1948    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
1949}
1950
1951#[gpui::test]
1952async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
1953    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1954    always_allow_tools(cx);
1955    let fake_model = model.as_fake();
1956
1957    let environment = Rc::new(cx.update(|cx| {
1958        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
1959    }));
1960    let handle = environment.terminal_handle.clone().unwrap();
1961
1962    let mut events = thread
1963        .update(cx, |thread, cx| {
1964            thread.add_tool(crate::TerminalTool::new(
1965                thread.project().clone(),
1966                environment,
1967            ));
1968            thread.send(UserMessageId::new(), ["run a command"], cx)
1969        })
1970        .unwrap();
1971
1972    cx.run_until_parked();
1973
1974    // Simulate the model calling the terminal tool
1975    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1976        LanguageModelToolUse {
1977            id: "terminal_tool_1".into(),
1978            name: TerminalTool::NAME.into(),
1979            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
1980            input: json!({"command": "sleep 1000", "cd": "."}),
1981            is_input_complete: true,
1982            thought_signature: None,
1983        },
1984    ));
1985    fake_model.end_last_completion_stream();
1986
1987    // Wait for the terminal tool to start running
1988    wait_for_terminal_tool_started(&mut events, cx).await;
1989
1990    // Cancel the thread while the terminal is running
1991    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
1992
1993    // Collect remaining events, driving the executor to let cancellation complete
1994    let remaining_events = collect_events_until_stop(&mut events, cx).await;
1995
1996    // Verify the terminal was killed
1997    assert!(
1998        handle.was_killed(),
1999        "expected terminal handle to be killed on cancellation"
2000    );
2001
2002    // Verify we got a cancellation stop event
2003    assert_eq!(
2004        stop_events(remaining_events),
2005        vec![acp::StopReason::Cancelled],
2006    );
2007
2008    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2009    thread.update(cx, |thread, _cx| {
2010        let message = thread.last_received_or_pending_message().unwrap();
2011        let agent_message = message.as_agent_message().unwrap();
2012
2013        let tool_use = agent_message
2014            .content
2015            .iter()
2016            .find_map(|content| match content {
2017                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2018                _ => None,
2019            })
2020            .expect("expected tool use in agent message");
2021
2022        let tool_result = agent_message
2023            .tool_results
2024            .get(&tool_use.id)
2025            .expect("expected tool result");
2026
2027        let result_text = match &tool_result.content {
2028            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2029            _ => panic!("expected text content in tool result"),
2030        };
2031
2032        // "partial output" comes from FakeTerminalHandle's output field
2033        assert!(
2034            result_text.contains("partial output"),
2035            "expected tool result to contain terminal output, got: {result_text}"
2036        );
2037        // Match the actual format from process_content in terminal_tool.rs
2038        assert!(
2039            result_text.contains("The user stopped this command"),
2040            "expected tool result to indicate user stopped, got: {result_text}"
2041        );
2042    });
2043
2044    // Verify we can send a new message after cancellation
2045    verify_thread_recovery(&thread, &fake_model, cx).await;
2046}
2047
2048#[gpui::test]
2049async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2050    // This test verifies that tools which properly handle cancellation via
2051    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2052    // to cancellation and report that they were cancelled.
2053    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2054    always_allow_tools(cx);
2055    let fake_model = model.as_fake();
2056
2057    let (tool, was_cancelled) = CancellationAwareTool::new();
2058
2059    let mut events = thread
2060        .update(cx, |thread, cx| {
2061            thread.add_tool(tool);
2062            thread.send(
2063                UserMessageId::new(),
2064                ["call the cancellation aware tool"],
2065                cx,
2066            )
2067        })
2068        .unwrap();
2069
2070    cx.run_until_parked();
2071
2072    // Simulate the model calling the cancellation-aware tool
2073    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2074        LanguageModelToolUse {
2075            id: "cancellation_aware_1".into(),
2076            name: "cancellation_aware".into(),
2077            raw_input: r#"{}"#.into(),
2078            input: json!({}),
2079            is_input_complete: true,
2080            thought_signature: None,
2081        },
2082    ));
2083    fake_model.end_last_completion_stream();
2084
2085    cx.run_until_parked();
2086
2087    // Wait for the tool call to be reported
2088    let mut tool_started = false;
2089    let deadline = cx.executor().num_cpus() * 100;
2090    for _ in 0..deadline {
2091        cx.run_until_parked();
2092
2093        while let Some(Some(event)) = events.next().now_or_never() {
2094            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2095                if tool_call.title == "Cancellation Aware Tool" {
2096                    tool_started = true;
2097                    break;
2098                }
2099            }
2100        }
2101
2102        if tool_started {
2103            break;
2104        }
2105
2106        cx.background_executor
2107            .timer(Duration::from_millis(10))
2108            .await;
2109    }
2110    assert!(tool_started, "expected cancellation aware tool to start");
2111
2112    // Cancel the thread and wait for it to complete
2113    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2114
2115    // The cancel task should complete promptly because the tool handles cancellation
2116    let timeout = cx.background_executor.timer(Duration::from_secs(5));
2117    futures::select! {
2118        _ = cancel_task.fuse() => {}
2119        _ = timeout.fuse() => {
2120            panic!("cancel task timed out - tool did not respond to cancellation");
2121        }
2122    }
2123
2124    // Verify the tool detected cancellation via its flag
2125    assert!(
2126        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2127        "tool should have detected cancellation via event_stream.cancelled_by_user()"
2128    );
2129
2130    // Collect remaining events
2131    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2132
2133    // Verify we got a cancellation stop event
2134    assert_eq!(
2135        stop_events(remaining_events),
2136        vec![acp::StopReason::Cancelled],
2137    );
2138
2139    // Verify we can send a new message after cancellation
2140    verify_thread_recovery(&thread, &fake_model, cx).await;
2141}
2142
2143/// Helper to verify thread can recover after cancellation by sending a simple message.
2144async fn verify_thread_recovery(
2145    thread: &Entity<Thread>,
2146    fake_model: &FakeLanguageModel,
2147    cx: &mut TestAppContext,
2148) {
2149    let events = thread
2150        .update(cx, |thread, cx| {
2151            thread.send(
2152                UserMessageId::new(),
2153                ["Testing: reply with 'Hello' then stop."],
2154                cx,
2155            )
2156        })
2157        .unwrap();
2158    cx.run_until_parked();
2159    fake_model.send_last_completion_stream_text_chunk("Hello");
2160    fake_model
2161        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2162    fake_model.end_last_completion_stream();
2163
2164    let events = events.collect::<Vec<_>>().await;
2165    thread.update(cx, |thread, _cx| {
2166        let message = thread.last_received_or_pending_message().unwrap();
2167        let agent_message = message.as_agent_message().unwrap();
2168        assert_eq!(
2169            agent_message.content,
2170            vec![AgentMessageContent::Text("Hello".to_string())]
2171        );
2172    });
2173    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2174}
2175
2176/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2177async fn wait_for_terminal_tool_started(
2178    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2179    cx: &mut TestAppContext,
2180) {
2181    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2182    for _ in 0..deadline {
2183        cx.run_until_parked();
2184
2185        while let Some(Some(event)) = events.next().now_or_never() {
2186            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2187                update,
2188            ))) = &event
2189            {
2190                if update.fields.content.as_ref().is_some_and(|content| {
2191                    content
2192                        .iter()
2193                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2194                }) {
2195                    return;
2196                }
2197            }
2198        }
2199
2200        cx.background_executor
2201            .timer(Duration::from_millis(10))
2202            .await;
2203    }
2204    panic!("terminal tool did not start within the expected time");
2205}
2206
2207/// Collects events until a Stop event is received, driving the executor to completion.
2208async fn collect_events_until_stop(
2209    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2210    cx: &mut TestAppContext,
2211) -> Vec<Result<ThreadEvent>> {
2212    let mut collected = Vec::new();
2213    let deadline = cx.executor().num_cpus() * 200;
2214
2215    for _ in 0..deadline {
2216        cx.executor().advance_clock(Duration::from_millis(10));
2217        cx.run_until_parked();
2218
2219        while let Some(Some(event)) = events.next().now_or_never() {
2220            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2221            collected.push(event);
2222            if is_stop {
2223                return collected;
2224            }
2225        }
2226    }
2227    panic!(
2228        "did not receive Stop event within the expected time; collected {} events",
2229        collected.len()
2230    );
2231}
2232
2233#[gpui::test]
2234async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2235    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2236    always_allow_tools(cx);
2237    let fake_model = model.as_fake();
2238
2239    let environment = Rc::new(cx.update(|cx| {
2240        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2241    }));
2242    let handle = environment.terminal_handle.clone().unwrap();
2243
2244    let message_id = UserMessageId::new();
2245    let mut events = thread
2246        .update(cx, |thread, cx| {
2247            thread.add_tool(crate::TerminalTool::new(
2248                thread.project().clone(),
2249                environment,
2250            ));
2251            thread.send(message_id.clone(), ["run a command"], cx)
2252        })
2253        .unwrap();
2254
2255    cx.run_until_parked();
2256
2257    // Simulate the model calling the terminal tool
2258    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2259        LanguageModelToolUse {
2260            id: "terminal_tool_1".into(),
2261            name: TerminalTool::NAME.into(),
2262            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2263            input: json!({"command": "sleep 1000", "cd": "."}),
2264            is_input_complete: true,
2265            thought_signature: None,
2266        },
2267    ));
2268    fake_model.end_last_completion_stream();
2269
2270    // Wait for the terminal tool to start running
2271    wait_for_terminal_tool_started(&mut events, cx).await;
2272
2273    // Truncate the thread while the terminal is running
2274    thread
2275        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2276        .unwrap();
2277
2278    // Drive the executor to let cancellation complete
2279    let _ = collect_events_until_stop(&mut events, cx).await;
2280
2281    // Verify the terminal was killed
2282    assert!(
2283        handle.was_killed(),
2284        "expected terminal handle to be killed on truncate"
2285    );
2286
2287    // Verify the thread is empty after truncation
2288    thread.update(cx, |thread, _cx| {
2289        assert_eq!(
2290            thread.to_markdown(),
2291            "",
2292            "expected thread to be empty after truncating the only message"
2293        );
2294    });
2295
2296    // Verify we can send a new message after truncation
2297    verify_thread_recovery(&thread, &fake_model, cx).await;
2298}
2299
2300#[gpui::test]
2301async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2302    // Tests that cancellation properly kills all running terminal tools when multiple are active.
2303    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2304    always_allow_tools(cx);
2305    let fake_model = model.as_fake();
2306
2307    let environment = Rc::new(MultiTerminalEnvironment::new());
2308
2309    let mut events = thread
2310        .update(cx, |thread, cx| {
2311            thread.add_tool(crate::TerminalTool::new(
2312                thread.project().clone(),
2313                environment.clone(),
2314            ));
2315            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2316        })
2317        .unwrap();
2318
2319    cx.run_until_parked();
2320
2321    // Simulate the model calling two terminal tools
2322    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2323        LanguageModelToolUse {
2324            id: "terminal_tool_1".into(),
2325            name: TerminalTool::NAME.into(),
2326            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2327            input: json!({"command": "sleep 1000", "cd": "."}),
2328            is_input_complete: true,
2329            thought_signature: None,
2330        },
2331    ));
2332    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2333        LanguageModelToolUse {
2334            id: "terminal_tool_2".into(),
2335            name: TerminalTool::NAME.into(),
2336            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2337            input: json!({"command": "sleep 2000", "cd": "."}),
2338            is_input_complete: true,
2339            thought_signature: None,
2340        },
2341    ));
2342    fake_model.end_last_completion_stream();
2343
2344    // Wait for both terminal tools to start by counting terminal content updates
2345    let mut terminals_started = 0;
2346    let deadline = cx.executor().num_cpus() * 100;
2347    for _ in 0..deadline {
2348        cx.run_until_parked();
2349
2350        while let Some(Some(event)) = events.next().now_or_never() {
2351            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2352                update,
2353            ))) = &event
2354            {
2355                if update.fields.content.as_ref().is_some_and(|content| {
2356                    content
2357                        .iter()
2358                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2359                }) {
2360                    terminals_started += 1;
2361                    if terminals_started >= 2 {
2362                        break;
2363                    }
2364                }
2365            }
2366        }
2367        if terminals_started >= 2 {
2368            break;
2369        }
2370
2371        cx.background_executor
2372            .timer(Duration::from_millis(10))
2373            .await;
2374    }
2375    assert!(
2376        terminals_started >= 2,
2377        "expected 2 terminal tools to start, got {terminals_started}"
2378    );
2379
2380    // Cancel the thread while both terminals are running
2381    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2382
2383    // Collect remaining events
2384    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2385
2386    // Verify both terminal handles were killed
2387    let handles = environment.handles();
2388    assert_eq!(
2389        handles.len(),
2390        2,
2391        "expected 2 terminal handles to be created"
2392    );
2393    assert!(
2394        handles[0].was_killed(),
2395        "expected first terminal handle to be killed on cancellation"
2396    );
2397    assert!(
2398        handles[1].was_killed(),
2399        "expected second terminal handle to be killed on cancellation"
2400    );
2401
2402    // Verify we got a cancellation stop event
2403    assert_eq!(
2404        stop_events(remaining_events),
2405        vec![acp::StopReason::Cancelled],
2406    );
2407}
2408
2409#[gpui::test]
2410async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2411    // Tests that clicking the stop button on the terminal card (as opposed to the main
2412    // cancel button) properly reports user stopped via the was_stopped_by_user path.
2413    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2414    always_allow_tools(cx);
2415    let fake_model = model.as_fake();
2416
2417    let environment = Rc::new(cx.update(|cx| {
2418        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2419    }));
2420    let handle = environment.terminal_handle.clone().unwrap();
2421
2422    let mut events = thread
2423        .update(cx, |thread, cx| {
2424            thread.add_tool(crate::TerminalTool::new(
2425                thread.project().clone(),
2426                environment,
2427            ));
2428            thread.send(UserMessageId::new(), ["run a command"], cx)
2429        })
2430        .unwrap();
2431
2432    cx.run_until_parked();
2433
2434    // Simulate the model calling the terminal tool
2435    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2436        LanguageModelToolUse {
2437            id: "terminal_tool_1".into(),
2438            name: TerminalTool::NAME.into(),
2439            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2440            input: json!({"command": "sleep 1000", "cd": "."}),
2441            is_input_complete: true,
2442            thought_signature: None,
2443        },
2444    ));
2445    fake_model.end_last_completion_stream();
2446
2447    // Wait for the terminal tool to start running
2448    wait_for_terminal_tool_started(&mut events, cx).await;
2449
2450    // Simulate user clicking stop on the terminal card itself.
2451    // This sets the flag and signals exit (simulating what the real UI would do).
2452    handle.set_stopped_by_user(true);
2453    handle.killed.store(true, Ordering::SeqCst);
2454    handle.signal_exit();
2455
2456    // Wait for the tool to complete
2457    cx.run_until_parked();
2458
2459    // The thread continues after tool completion - simulate the model ending its turn
2460    fake_model
2461        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2462    fake_model.end_last_completion_stream();
2463
2464    // Collect remaining events
2465    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2466
2467    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2468    assert_eq!(
2469        stop_events(remaining_events),
2470        vec![acp::StopReason::EndTurn],
2471    );
2472
2473    // Verify the tool result indicates user stopped
2474    thread.update(cx, |thread, _cx| {
2475        let message = thread.last_received_or_pending_message().unwrap();
2476        let agent_message = message.as_agent_message().unwrap();
2477
2478        let tool_use = agent_message
2479            .content
2480            .iter()
2481            .find_map(|content| match content {
2482                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2483                _ => None,
2484            })
2485            .expect("expected tool use in agent message");
2486
2487        let tool_result = agent_message
2488            .tool_results
2489            .get(&tool_use.id)
2490            .expect("expected tool result");
2491
2492        let result_text = match &tool_result.content {
2493            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2494            _ => panic!("expected text content in tool result"),
2495        };
2496
2497        assert!(
2498            result_text.contains("The user stopped this command"),
2499            "expected tool result to indicate user stopped, got: {result_text}"
2500        );
2501    });
2502}
2503
2504#[gpui::test]
2505async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2506    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2507    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2508    always_allow_tools(cx);
2509    let fake_model = model.as_fake();
2510
2511    let environment = Rc::new(cx.update(|cx| {
2512        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2513    }));
2514    let handle = environment.terminal_handle.clone().unwrap();
2515
2516    let mut events = thread
2517        .update(cx, |thread, cx| {
2518            thread.add_tool(crate::TerminalTool::new(
2519                thread.project().clone(),
2520                environment,
2521            ));
2522            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2523        })
2524        .unwrap();
2525
2526    cx.run_until_parked();
2527
2528    // Simulate the model calling the terminal tool with a short timeout
2529    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2530        LanguageModelToolUse {
2531            id: "terminal_tool_1".into(),
2532            name: TerminalTool::NAME.into(),
2533            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2534            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2535            is_input_complete: true,
2536            thought_signature: None,
2537        },
2538    ));
2539    fake_model.end_last_completion_stream();
2540
2541    // Wait for the terminal tool to start running
2542    wait_for_terminal_tool_started(&mut events, cx).await;
2543
2544    // Advance clock past the timeout
2545    cx.executor().advance_clock(Duration::from_millis(200));
2546    cx.run_until_parked();
2547
2548    // The thread continues after tool completion - simulate the model ending its turn
2549    fake_model
2550        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2551    fake_model.end_last_completion_stream();
2552
2553    // Collect remaining events
2554    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2555
2556    // Verify the terminal was killed due to timeout
2557    assert!(
2558        handle.was_killed(),
2559        "expected terminal handle to be killed on timeout"
2560    );
2561
2562    // Verify we got an EndTurn (the tool completed, just with timeout)
2563    assert_eq!(
2564        stop_events(remaining_events),
2565        vec![acp::StopReason::EndTurn],
2566    );
2567
2568    // Verify the tool result indicates timeout, not user stopped
2569    thread.update(cx, |thread, _cx| {
2570        let message = thread.last_received_or_pending_message().unwrap();
2571        let agent_message = message.as_agent_message().unwrap();
2572
2573        let tool_use = agent_message
2574            .content
2575            .iter()
2576            .find_map(|content| match content {
2577                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2578                _ => None,
2579            })
2580            .expect("expected tool use in agent message");
2581
2582        let tool_result = agent_message
2583            .tool_results
2584            .get(&tool_use.id)
2585            .expect("expected tool result");
2586
2587        let result_text = match &tool_result.content {
2588            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2589            _ => panic!("expected text content in tool result"),
2590        };
2591
2592        assert!(
2593            result_text.contains("timed out"),
2594            "expected tool result to indicate timeout, got: {result_text}"
2595        );
2596        assert!(
2597            !result_text.contains("The user stopped"),
2598            "tool result should not mention user stopped when it timed out, got: {result_text}"
2599        );
2600    });
2601}
2602
2603#[gpui::test]
2604async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2605    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2606    let fake_model = model.as_fake();
2607
2608    let events_1 = thread
2609        .update(cx, |thread, cx| {
2610            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2611        })
2612        .unwrap();
2613    cx.run_until_parked();
2614    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2615    cx.run_until_parked();
2616
2617    let events_2 = thread
2618        .update(cx, |thread, cx| {
2619            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2620        })
2621        .unwrap();
2622    cx.run_until_parked();
2623    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2624    fake_model
2625        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2626    fake_model.end_last_completion_stream();
2627
2628    let events_1 = events_1.collect::<Vec<_>>().await;
2629    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2630    let events_2 = events_2.collect::<Vec<_>>().await;
2631    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2632}
2633
2634#[gpui::test]
2635async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2636    // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2637    // the retry loop waits on a timer. If the user switches models and sends a new message
2638    // during that delay, the old turn should exit immediately instead of retrying with the
2639    // stale model.
2640    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2641    let model_a = model.as_fake();
2642
2643    // Start a turn with model_a.
2644    let events_1 = thread
2645        .update(cx, |thread, cx| {
2646            thread.send(UserMessageId::new(), ["Hello"], cx)
2647        })
2648        .unwrap();
2649    cx.run_until_parked();
2650    assert_eq!(model_a.completion_count(), 1);
2651
2652    // Model returns a retryable upstream 500. The turn enters the retry delay.
2653    model_a.send_last_completion_stream_error(
2654        LanguageModelCompletionError::UpstreamProviderError {
2655            message: "Internal server error".to_string(),
2656            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2657            retry_after: None,
2658        },
2659    );
2660    model_a.end_last_completion_stream();
2661    cx.run_until_parked();
2662
2663    // The old completion was consumed; model_a has no pending requests yet because the
2664    // retry timer hasn't fired.
2665    assert_eq!(model_a.completion_count(), 0);
2666
2667    // Switch to model_b and send a new message. This cancels the old turn.
2668    let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2669        "fake", "model-b", "Model B", false,
2670    ));
2671    thread.update(cx, |thread, cx| {
2672        thread.set_model(model_b.clone(), cx);
2673    });
2674    let events_2 = thread
2675        .update(cx, |thread, cx| {
2676            thread.send(UserMessageId::new(), ["Continue"], cx)
2677        })
2678        .unwrap();
2679    cx.run_until_parked();
2680
2681    // model_b should have received its completion request.
2682    assert_eq!(model_b.as_fake().completion_count(), 1);
2683
2684    // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2685    cx.executor().advance_clock(Duration::from_secs(10));
2686    cx.run_until_parked();
2687
2688    // model_a must NOT have received another completion request — the cancelled turn
2689    // should have exited during the retry delay rather than retrying with the old model.
2690    assert_eq!(
2691        model_a.completion_count(),
2692        0,
2693        "old model should not receive a retry request after cancellation"
2694    );
2695
2696    // Complete model_b's turn.
2697    model_b
2698        .as_fake()
2699        .send_last_completion_stream_text_chunk("Done!");
2700    model_b
2701        .as_fake()
2702        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2703    model_b.as_fake().end_last_completion_stream();
2704
2705    let events_1 = events_1.collect::<Vec<_>>().await;
2706    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2707
2708    let events_2 = events_2.collect::<Vec<_>>().await;
2709    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2710}
2711
2712#[gpui::test]
2713async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2714    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2715    let fake_model = model.as_fake();
2716
2717    let events_1 = thread
2718        .update(cx, |thread, cx| {
2719            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2720        })
2721        .unwrap();
2722    cx.run_until_parked();
2723    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2724    fake_model
2725        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2726    fake_model.end_last_completion_stream();
2727    let events_1 = events_1.collect::<Vec<_>>().await;
2728
2729    let events_2 = thread
2730        .update(cx, |thread, cx| {
2731            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2732        })
2733        .unwrap();
2734    cx.run_until_parked();
2735    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2736    fake_model
2737        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2738    fake_model.end_last_completion_stream();
2739    let events_2 = events_2.collect::<Vec<_>>().await;
2740
2741    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2742    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2743}
2744
2745#[gpui::test]
2746async fn test_refusal(cx: &mut TestAppContext) {
2747    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2748    let fake_model = model.as_fake();
2749
2750    let events = thread
2751        .update(cx, |thread, cx| {
2752            thread.send(UserMessageId::new(), ["Hello"], cx)
2753        })
2754        .unwrap();
2755    cx.run_until_parked();
2756    thread.read_with(cx, |thread, _| {
2757        assert_eq!(
2758            thread.to_markdown(),
2759            indoc! {"
2760                ## User
2761
2762                Hello
2763            "}
2764        );
2765    });
2766
2767    fake_model.send_last_completion_stream_text_chunk("Hey!");
2768    cx.run_until_parked();
2769    thread.read_with(cx, |thread, _| {
2770        assert_eq!(
2771            thread.to_markdown(),
2772            indoc! {"
2773                ## User
2774
2775                Hello
2776
2777                ## Assistant
2778
2779                Hey!
2780            "}
2781        );
2782    });
2783
2784    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2785    fake_model
2786        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2787    let events = events.collect::<Vec<_>>().await;
2788    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2789    thread.read_with(cx, |thread, _| {
2790        assert_eq!(thread.to_markdown(), "");
2791    });
2792}
2793
2794#[gpui::test]
2795async fn test_truncate_first_message(cx: &mut TestAppContext) {
2796    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2797    let fake_model = model.as_fake();
2798
2799    let message_id = UserMessageId::new();
2800    thread
2801        .update(cx, |thread, cx| {
2802            thread.send(message_id.clone(), ["Hello"], cx)
2803        })
2804        .unwrap();
2805    cx.run_until_parked();
2806    thread.read_with(cx, |thread, _| {
2807        assert_eq!(
2808            thread.to_markdown(),
2809            indoc! {"
2810                ## User
2811
2812                Hello
2813            "}
2814        );
2815        assert_eq!(thread.latest_token_usage(), None);
2816    });
2817
2818    fake_model.send_last_completion_stream_text_chunk("Hey!");
2819    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2820        language_model::TokenUsage {
2821            input_tokens: 32_000,
2822            output_tokens: 16_000,
2823            cache_creation_input_tokens: 0,
2824            cache_read_input_tokens: 0,
2825        },
2826    ));
2827    cx.run_until_parked();
2828    thread.read_with(cx, |thread, _| {
2829        assert_eq!(
2830            thread.to_markdown(),
2831            indoc! {"
2832                ## User
2833
2834                Hello
2835
2836                ## Assistant
2837
2838                Hey!
2839            "}
2840        );
2841        assert_eq!(
2842            thread.latest_token_usage(),
2843            Some(acp_thread::TokenUsage {
2844                used_tokens: 32_000 + 16_000,
2845                max_tokens: 1_000_000,
2846                max_output_tokens: None,
2847                input_tokens: 32_000,
2848                output_tokens: 16_000,
2849            })
2850        );
2851    });
2852
2853    thread
2854        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2855        .unwrap();
2856    cx.run_until_parked();
2857    thread.read_with(cx, |thread, _| {
2858        assert_eq!(thread.to_markdown(), "");
2859        assert_eq!(thread.latest_token_usage(), None);
2860    });
2861
2862    // Ensure we can still send a new message after truncation.
2863    thread
2864        .update(cx, |thread, cx| {
2865            thread.send(UserMessageId::new(), ["Hi"], cx)
2866        })
2867        .unwrap();
2868    thread.update(cx, |thread, _cx| {
2869        assert_eq!(
2870            thread.to_markdown(),
2871            indoc! {"
2872                ## User
2873
2874                Hi
2875            "}
2876        );
2877    });
2878    cx.run_until_parked();
2879    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2880    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2881        language_model::TokenUsage {
2882            input_tokens: 40_000,
2883            output_tokens: 20_000,
2884            cache_creation_input_tokens: 0,
2885            cache_read_input_tokens: 0,
2886        },
2887    ));
2888    cx.run_until_parked();
2889    thread.read_with(cx, |thread, _| {
2890        assert_eq!(
2891            thread.to_markdown(),
2892            indoc! {"
2893                ## User
2894
2895                Hi
2896
2897                ## Assistant
2898
2899                Ahoy!
2900            "}
2901        );
2902
2903        assert_eq!(
2904            thread.latest_token_usage(),
2905            Some(acp_thread::TokenUsage {
2906                used_tokens: 40_000 + 20_000,
2907                max_tokens: 1_000_000,
2908                max_output_tokens: None,
2909                input_tokens: 40_000,
2910                output_tokens: 20_000,
2911            })
2912        );
2913    });
2914}
2915
2916#[gpui::test]
2917async fn test_truncate_second_message(cx: &mut TestAppContext) {
2918    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2919    let fake_model = model.as_fake();
2920
2921    thread
2922        .update(cx, |thread, cx| {
2923            thread.send(UserMessageId::new(), ["Message 1"], cx)
2924        })
2925        .unwrap();
2926    cx.run_until_parked();
2927    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
2928    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2929        language_model::TokenUsage {
2930            input_tokens: 32_000,
2931            output_tokens: 16_000,
2932            cache_creation_input_tokens: 0,
2933            cache_read_input_tokens: 0,
2934        },
2935    ));
2936    fake_model.end_last_completion_stream();
2937    cx.run_until_parked();
2938
2939    let assert_first_message_state = |cx: &mut TestAppContext| {
2940        thread.clone().read_with(cx, |thread, _| {
2941            assert_eq!(
2942                thread.to_markdown(),
2943                indoc! {"
2944                    ## User
2945
2946                    Message 1
2947
2948                    ## Assistant
2949
2950                    Message 1 response
2951                "}
2952            );
2953
2954            assert_eq!(
2955                thread.latest_token_usage(),
2956                Some(acp_thread::TokenUsage {
2957                    used_tokens: 32_000 + 16_000,
2958                    max_tokens: 1_000_000,
2959                    max_output_tokens: None,
2960                    input_tokens: 32_000,
2961                    output_tokens: 16_000,
2962                })
2963            );
2964        });
2965    };
2966
2967    assert_first_message_state(cx);
2968
2969    let second_message_id = UserMessageId::new();
2970    thread
2971        .update(cx, |thread, cx| {
2972            thread.send(second_message_id.clone(), ["Message 2"], cx)
2973        })
2974        .unwrap();
2975    cx.run_until_parked();
2976
2977    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
2978    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2979        language_model::TokenUsage {
2980            input_tokens: 40_000,
2981            output_tokens: 20_000,
2982            cache_creation_input_tokens: 0,
2983            cache_read_input_tokens: 0,
2984        },
2985    ));
2986    fake_model.end_last_completion_stream();
2987    cx.run_until_parked();
2988
2989    thread.read_with(cx, |thread, _| {
2990        assert_eq!(
2991            thread.to_markdown(),
2992            indoc! {"
2993                ## User
2994
2995                Message 1
2996
2997                ## Assistant
2998
2999                Message 1 response
3000
3001                ## User
3002
3003                Message 2
3004
3005                ## Assistant
3006
3007                Message 2 response
3008            "}
3009        );
3010
3011        assert_eq!(
3012            thread.latest_token_usage(),
3013            Some(acp_thread::TokenUsage {
3014                used_tokens: 40_000 + 20_000,
3015                max_tokens: 1_000_000,
3016                max_output_tokens: None,
3017                input_tokens: 40_000,
3018                output_tokens: 20_000,
3019            })
3020        );
3021    });
3022
3023    thread
3024        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3025        .unwrap();
3026    cx.run_until_parked();
3027
3028    assert_first_message_state(cx);
3029}
3030
3031#[gpui::test]
3032async fn test_title_generation(cx: &mut TestAppContext) {
3033    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3034    let fake_model = model.as_fake();
3035
3036    let summary_model = Arc::new(FakeLanguageModel::default());
3037    thread.update(cx, |thread, cx| {
3038        thread.set_summarization_model(Some(summary_model.clone()), cx)
3039    });
3040
3041    let send = thread
3042        .update(cx, |thread, cx| {
3043            thread.send(UserMessageId::new(), ["Hello"], cx)
3044        })
3045        .unwrap();
3046    cx.run_until_parked();
3047
3048    fake_model.send_last_completion_stream_text_chunk("Hey!");
3049    fake_model.end_last_completion_stream();
3050    cx.run_until_parked();
3051    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "New Thread"));
3052
3053    // Ensure the summary model has been invoked to generate a title.
3054    summary_model.send_last_completion_stream_text_chunk("Hello ");
3055    summary_model.send_last_completion_stream_text_chunk("world\nG");
3056    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3057    summary_model.end_last_completion_stream();
3058    send.collect::<Vec<_>>().await;
3059    cx.run_until_parked();
3060    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3061
3062    // Send another message, ensuring no title is generated this time.
3063    let send = thread
3064        .update(cx, |thread, cx| {
3065            thread.send(UserMessageId::new(), ["Hello again"], cx)
3066        })
3067        .unwrap();
3068    cx.run_until_parked();
3069    fake_model.send_last_completion_stream_text_chunk("Hey again!");
3070    fake_model.end_last_completion_stream();
3071    cx.run_until_parked();
3072    assert_eq!(summary_model.pending_completions(), Vec::new());
3073    send.collect::<Vec<_>>().await;
3074    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3075}
3076
3077#[gpui::test]
3078async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3079    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3080    let fake_model = model.as_fake();
3081
3082    let _events = thread
3083        .update(cx, |thread, cx| {
3084            thread.add_tool(ToolRequiringPermission);
3085            thread.add_tool(EchoTool);
3086            thread.send(UserMessageId::new(), ["Hey!"], cx)
3087        })
3088        .unwrap();
3089    cx.run_until_parked();
3090
3091    let permission_tool_use = LanguageModelToolUse {
3092        id: "tool_id_1".into(),
3093        name: ToolRequiringPermission::NAME.into(),
3094        raw_input: "{}".into(),
3095        input: json!({}),
3096        is_input_complete: true,
3097        thought_signature: None,
3098    };
3099    let echo_tool_use = LanguageModelToolUse {
3100        id: "tool_id_2".into(),
3101        name: EchoTool::NAME.into(),
3102        raw_input: json!({"text": "test"}).to_string(),
3103        input: json!({"text": "test"}),
3104        is_input_complete: true,
3105        thought_signature: None,
3106    };
3107    fake_model.send_last_completion_stream_text_chunk("Hi!");
3108    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3109        permission_tool_use,
3110    ));
3111    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3112        echo_tool_use.clone(),
3113    ));
3114    fake_model.end_last_completion_stream();
3115    cx.run_until_parked();
3116
3117    // Ensure pending tools are skipped when building a request.
3118    let request = thread
3119        .read_with(cx, |thread, cx| {
3120            thread.build_completion_request(CompletionIntent::EditFile, cx)
3121        })
3122        .unwrap();
3123    assert_eq!(
3124        request.messages[1..],
3125        vec![
3126            LanguageModelRequestMessage {
3127                role: Role::User,
3128                content: vec!["Hey!".into()],
3129                cache: true,
3130                reasoning_details: None,
3131            },
3132            LanguageModelRequestMessage {
3133                role: Role::Assistant,
3134                content: vec![
3135                    MessageContent::Text("Hi!".into()),
3136                    MessageContent::ToolUse(echo_tool_use.clone())
3137                ],
3138                cache: false,
3139                reasoning_details: None,
3140            },
3141            LanguageModelRequestMessage {
3142                role: Role::User,
3143                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3144                    tool_use_id: echo_tool_use.id.clone(),
3145                    tool_name: echo_tool_use.name,
3146                    is_error: false,
3147                    content: "test".into(),
3148                    output: Some("test".into())
3149                })],
3150                cache: false,
3151                reasoning_details: None,
3152            },
3153        ],
3154    );
3155}
3156
3157#[gpui::test]
3158async fn test_agent_connection(cx: &mut TestAppContext) {
3159    cx.update(settings::init);
3160    let templates = Templates::new();
3161
3162    // Initialize language model system with test provider
3163    cx.update(|cx| {
3164        gpui_tokio::init(cx);
3165
3166        let http_client = FakeHttpClient::with_404_response();
3167        let clock = Arc::new(clock::FakeSystemClock::new());
3168        let client = Client::new(clock, http_client, cx);
3169        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3170        language_model::init(user_store.clone(), client.clone(), cx);
3171        language_models::init(user_store, client.clone(), cx);
3172        LanguageModelRegistry::test(cx);
3173    });
3174    cx.executor().forbid_parking();
3175
3176    // Create a project for new_thread
3177    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3178    fake_fs.insert_tree(path!("/test"), json!({})).await;
3179    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3180    let cwd = Path::new("/test");
3181    let thread_store = cx.new(|cx| ThreadStore::new(cx));
3182
3183    // Create agent and connection
3184    let agent = cx
3185        .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3186    let connection = NativeAgentConnection(agent.clone());
3187
3188    // Create a thread using new_thread
3189    let connection_rc = Rc::new(connection.clone());
3190    let acp_thread = cx
3191        .update(|cx| connection_rc.new_session(project, cwd, cx))
3192        .await
3193        .expect("new_thread should succeed");
3194
3195    // Get the session_id from the AcpThread
3196    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3197
3198    // Test model_selector returns Some
3199    let selector_opt = connection.model_selector(&session_id);
3200    assert!(
3201        selector_opt.is_some(),
3202        "agent should always support ModelSelector"
3203    );
3204    let selector = selector_opt.unwrap();
3205
3206    // Test list_models
3207    let listed_models = cx
3208        .update(|cx| selector.list_models(cx))
3209        .await
3210        .expect("list_models should succeed");
3211    let AgentModelList::Grouped(listed_models) = listed_models else {
3212        panic!("Unexpected model list type");
3213    };
3214    assert!(!listed_models.is_empty(), "should have at least one model");
3215    assert_eq!(
3216        listed_models[&AgentModelGroupName("Fake".into())][0]
3217            .id
3218            .0
3219            .as_ref(),
3220        "fake/fake"
3221    );
3222
3223    // Test selected_model returns the default
3224    let model = cx
3225        .update(|cx| selector.selected_model(cx))
3226        .await
3227        .expect("selected_model should succeed");
3228    let model = cx
3229        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3230        .unwrap();
3231    let model = model.as_fake();
3232    assert_eq!(model.id().0, "fake", "should return default model");
3233
3234    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3235    cx.run_until_parked();
3236    model.send_last_completion_stream_text_chunk("def");
3237    cx.run_until_parked();
3238    acp_thread.read_with(cx, |thread, cx| {
3239        assert_eq!(
3240            thread.to_markdown(cx),
3241            indoc! {"
3242                ## User
3243
3244                abc
3245
3246                ## Assistant
3247
3248                def
3249
3250            "}
3251        )
3252    });
3253
3254    // Test cancel
3255    cx.update(|cx| connection.cancel(&session_id, cx));
3256    request.await.expect("prompt should fail gracefully");
3257
3258    // Explicitly close the session and drop the ACP thread.
3259    cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3260        .await
3261        .unwrap();
3262    drop(acp_thread);
3263    let result = cx
3264        .update(|cx| {
3265            connection.prompt(
3266                Some(acp_thread::UserMessageId::new()),
3267                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3268                cx,
3269            )
3270        })
3271        .await;
3272    assert_eq!(
3273        result.as_ref().unwrap_err().to_string(),
3274        "Session not found",
3275        "unexpected result: {:?}",
3276        result
3277    );
3278}
3279
3280#[gpui::test]
3281async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3282    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3283    thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3284    let fake_model = model.as_fake();
3285
3286    let mut events = thread
3287        .update(cx, |thread, cx| {
3288            thread.send(UserMessageId::new(), ["Echo something"], cx)
3289        })
3290        .unwrap();
3291    cx.run_until_parked();
3292
3293    // Simulate streaming partial input.
3294    let input = json!({});
3295    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3296        LanguageModelToolUse {
3297            id: "1".into(),
3298            name: EchoTool::NAME.into(),
3299            raw_input: input.to_string(),
3300            input,
3301            is_input_complete: false,
3302            thought_signature: None,
3303        },
3304    ));
3305
3306    // Input streaming completed
3307    let input = json!({ "text": "Hello!" });
3308    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3309        LanguageModelToolUse {
3310            id: "1".into(),
3311            name: "echo".into(),
3312            raw_input: input.to_string(),
3313            input,
3314            is_input_complete: true,
3315            thought_signature: None,
3316        },
3317    ));
3318    fake_model.end_last_completion_stream();
3319    cx.run_until_parked();
3320
3321    let tool_call = expect_tool_call(&mut events).await;
3322    assert_eq!(
3323        tool_call,
3324        acp::ToolCall::new("1", "Echo")
3325            .raw_input(json!({}))
3326            .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3327    );
3328    let update = expect_tool_call_update_fields(&mut events).await;
3329    assert_eq!(
3330        update,
3331        acp::ToolCallUpdate::new(
3332            "1",
3333            acp::ToolCallUpdateFields::new()
3334                .title("Echo")
3335                .kind(acp::ToolKind::Other)
3336                .raw_input(json!({ "text": "Hello!"}))
3337        )
3338    );
3339    let update = expect_tool_call_update_fields(&mut events).await;
3340    assert_eq!(
3341        update,
3342        acp::ToolCallUpdate::new(
3343            "1",
3344            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3345        )
3346    );
3347    let update = expect_tool_call_update_fields(&mut events).await;
3348    assert_eq!(
3349        update,
3350        acp::ToolCallUpdate::new(
3351            "1",
3352            acp::ToolCallUpdateFields::new()
3353                .status(acp::ToolCallStatus::Completed)
3354                .raw_output("Hello!")
3355        )
3356    );
3357}
3358
3359#[gpui::test]
3360async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3361    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3362    let fake_model = model.as_fake();
3363
3364    let mut events = thread
3365        .update(cx, |thread, cx| {
3366            thread.send(UserMessageId::new(), ["Hello!"], cx)
3367        })
3368        .unwrap();
3369    cx.run_until_parked();
3370
3371    fake_model.send_last_completion_stream_text_chunk("Hey!");
3372    fake_model.end_last_completion_stream();
3373
3374    let mut retry_events = Vec::new();
3375    while let Some(Ok(event)) = events.next().await {
3376        match event {
3377            ThreadEvent::Retry(retry_status) => {
3378                retry_events.push(retry_status);
3379            }
3380            ThreadEvent::Stop(..) => break,
3381            _ => {}
3382        }
3383    }
3384
3385    assert_eq!(retry_events.len(), 0);
3386    thread.read_with(cx, |thread, _cx| {
3387        assert_eq!(
3388            thread.to_markdown(),
3389            indoc! {"
3390                ## User
3391
3392                Hello!
3393
3394                ## Assistant
3395
3396                Hey!
3397            "}
3398        )
3399    });
3400}
3401
3402#[gpui::test]
3403async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3404    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3405    let fake_model = model.as_fake();
3406
3407    let mut events = thread
3408        .update(cx, |thread, cx| {
3409            thread.send(UserMessageId::new(), ["Hello!"], cx)
3410        })
3411        .unwrap();
3412    cx.run_until_parked();
3413
3414    fake_model.send_last_completion_stream_text_chunk("Hey,");
3415    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3416        provider: LanguageModelProviderName::new("Anthropic"),
3417        retry_after: Some(Duration::from_secs(3)),
3418    });
3419    fake_model.end_last_completion_stream();
3420
3421    cx.executor().advance_clock(Duration::from_secs(3));
3422    cx.run_until_parked();
3423
3424    fake_model.send_last_completion_stream_text_chunk("there!");
3425    fake_model.end_last_completion_stream();
3426    cx.run_until_parked();
3427
3428    let mut retry_events = Vec::new();
3429    while let Some(Ok(event)) = events.next().await {
3430        match event {
3431            ThreadEvent::Retry(retry_status) => {
3432                retry_events.push(retry_status);
3433            }
3434            ThreadEvent::Stop(..) => break,
3435            _ => {}
3436        }
3437    }
3438
3439    assert_eq!(retry_events.len(), 1);
3440    assert!(matches!(
3441        retry_events[0],
3442        acp_thread::RetryStatus { attempt: 1, .. }
3443    ));
3444    thread.read_with(cx, |thread, _cx| {
3445        assert_eq!(
3446            thread.to_markdown(),
3447            indoc! {"
3448                ## User
3449
3450                Hello!
3451
3452                ## Assistant
3453
3454                Hey,
3455
3456                [resume]
3457
3458                ## Assistant
3459
3460                there!
3461            "}
3462        )
3463    });
3464}
3465
3466#[gpui::test]
3467async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3468    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3469    let fake_model = model.as_fake();
3470
3471    let events = thread
3472        .update(cx, |thread, cx| {
3473            thread.add_tool(EchoTool);
3474            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3475        })
3476        .unwrap();
3477    cx.run_until_parked();
3478
3479    let tool_use_1 = LanguageModelToolUse {
3480        id: "tool_1".into(),
3481        name: EchoTool::NAME.into(),
3482        raw_input: json!({"text": "test"}).to_string(),
3483        input: json!({"text": "test"}),
3484        is_input_complete: true,
3485        thought_signature: None,
3486    };
3487    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3488        tool_use_1.clone(),
3489    ));
3490    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3491        provider: LanguageModelProviderName::new("Anthropic"),
3492        retry_after: Some(Duration::from_secs(3)),
3493    });
3494    fake_model.end_last_completion_stream();
3495
3496    cx.executor().advance_clock(Duration::from_secs(3));
3497    let completion = fake_model.pending_completions().pop().unwrap();
3498    assert_eq!(
3499        completion.messages[1..],
3500        vec![
3501            LanguageModelRequestMessage {
3502                role: Role::User,
3503                content: vec!["Call the echo tool!".into()],
3504                cache: false,
3505                reasoning_details: None,
3506            },
3507            LanguageModelRequestMessage {
3508                role: Role::Assistant,
3509                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3510                cache: false,
3511                reasoning_details: None,
3512            },
3513            LanguageModelRequestMessage {
3514                role: Role::User,
3515                content: vec![language_model::MessageContent::ToolResult(
3516                    LanguageModelToolResult {
3517                        tool_use_id: tool_use_1.id.clone(),
3518                        tool_name: tool_use_1.name.clone(),
3519                        is_error: false,
3520                        content: "test".into(),
3521                        output: Some("test".into())
3522                    }
3523                )],
3524                cache: true,
3525                reasoning_details: None,
3526            },
3527        ]
3528    );
3529
3530    fake_model.send_last_completion_stream_text_chunk("Done");
3531    fake_model.end_last_completion_stream();
3532    cx.run_until_parked();
3533    events.collect::<Vec<_>>().await;
3534    thread.read_with(cx, |thread, _cx| {
3535        assert_eq!(
3536            thread.last_received_or_pending_message(),
3537            Some(Message::Agent(AgentMessage {
3538                content: vec![AgentMessageContent::Text("Done".into())],
3539                tool_results: IndexMap::default(),
3540                reasoning_details: None,
3541            }))
3542        );
3543    })
3544}
3545
3546#[gpui::test]
3547async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3548    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3549    let fake_model = model.as_fake();
3550
3551    let mut events = thread
3552        .update(cx, |thread, cx| {
3553            thread.send(UserMessageId::new(), ["Hello!"], cx)
3554        })
3555        .unwrap();
3556    cx.run_until_parked();
3557
3558    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3559        fake_model.send_last_completion_stream_error(
3560            LanguageModelCompletionError::ServerOverloaded {
3561                provider: LanguageModelProviderName::new("Anthropic"),
3562                retry_after: Some(Duration::from_secs(3)),
3563            },
3564        );
3565        fake_model.end_last_completion_stream();
3566        cx.executor().advance_clock(Duration::from_secs(3));
3567        cx.run_until_parked();
3568    }
3569
3570    let mut errors = Vec::new();
3571    let mut retry_events = Vec::new();
3572    while let Some(event) = events.next().await {
3573        match event {
3574            Ok(ThreadEvent::Retry(retry_status)) => {
3575                retry_events.push(retry_status);
3576            }
3577            Ok(ThreadEvent::Stop(..)) => break,
3578            Err(error) => errors.push(error),
3579            _ => {}
3580        }
3581    }
3582
3583    assert_eq!(
3584        retry_events.len(),
3585        crate::thread::MAX_RETRY_ATTEMPTS as usize
3586    );
3587    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3588        assert_eq!(retry_events[i].attempt, i + 1);
3589    }
3590    assert_eq!(errors.len(), 1);
3591    let error = errors[0]
3592        .downcast_ref::<LanguageModelCompletionError>()
3593        .unwrap();
3594    assert!(matches!(
3595        error,
3596        LanguageModelCompletionError::ServerOverloaded { .. }
3597    ));
3598}
3599
3600#[gpui::test]
3601async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3602    cx: &mut TestAppContext,
3603) {
3604    init_test(cx);
3605    always_allow_tools(cx);
3606
3607    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3608    let fake_model = model.as_fake();
3609
3610    thread.update(cx, |thread, _cx| {
3611        thread.add_tool(StreamingEchoTool::new());
3612    });
3613
3614    let _events = thread
3615        .update(cx, |thread, cx| {
3616            thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3617        })
3618        .unwrap();
3619    cx.run_until_parked();
3620
3621    // Send a partial tool use (is_input_complete = false), simulating the LLM
3622    // streaming input for a tool.
3623    let tool_use = LanguageModelToolUse {
3624        id: "tool_1".into(),
3625        name: "streaming_echo".into(),
3626        raw_input: r#"{"text": "partial"}"#.into(),
3627        input: json!({"text": "partial"}),
3628        is_input_complete: false,
3629        thought_signature: None,
3630    };
3631    fake_model
3632        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3633    cx.run_until_parked();
3634
3635    // Send a stream error WITHOUT ever sending is_input_complete = true.
3636    // Before the fix, this would deadlock: the tool waits for more partials
3637    // (or cancellation), run_turn_internal waits for the tool, and the sender
3638    // keeping the channel open lives inside RunningTurn.
3639    fake_model.send_last_completion_stream_error(
3640        LanguageModelCompletionError::UpstreamProviderError {
3641            message: "Internal server error".to_string(),
3642            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3643            retry_after: None,
3644        },
3645    );
3646    fake_model.end_last_completion_stream();
3647
3648    // Advance past the retry delay so run_turn_internal retries.
3649    cx.executor().advance_clock(Duration::from_secs(5));
3650    cx.run_until_parked();
3651
3652    // The retry request should contain the streaming tool's error result,
3653    // proving the tool terminated and its result was forwarded.
3654    let completion = fake_model
3655        .pending_completions()
3656        .pop()
3657        .expect("No running turn");
3658    assert_eq!(
3659        completion.messages[1..],
3660        vec![
3661            LanguageModelRequestMessage {
3662                role: Role::User,
3663                content: vec!["Use the streaming_echo tool".into()],
3664                cache: false,
3665                reasoning_details: None,
3666            },
3667            LanguageModelRequestMessage {
3668                role: Role::Assistant,
3669                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3670                cache: false,
3671                reasoning_details: None,
3672            },
3673            LanguageModelRequestMessage {
3674                role: Role::User,
3675                content: vec![language_model::MessageContent::ToolResult(
3676                    LanguageModelToolResult {
3677                        tool_use_id: tool_use.id.clone(),
3678                        tool_name: tool_use.name,
3679                        is_error: true,
3680                        content: "Failed to receive tool input: tool input was not fully received"
3681                            .into(),
3682                        output: Some(
3683                            "Failed to receive tool input: tool input was not fully received"
3684                                .into()
3685                        ),
3686                    }
3687                )],
3688                cache: true,
3689                reasoning_details: None,
3690            },
3691        ]
3692    );
3693
3694    // Finish the retry round so the turn completes cleanly.
3695    fake_model.send_last_completion_stream_text_chunk("Done");
3696    fake_model.end_last_completion_stream();
3697    cx.run_until_parked();
3698
3699    thread.read_with(cx, |thread, _cx| {
3700        assert!(
3701            thread.is_turn_complete(),
3702            "Thread should not be stuck; the turn should have completed",
3703        );
3704    });
3705}
3706
3707/// Filters out the stop events for asserting against in tests
3708fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3709    result_events
3710        .into_iter()
3711        .filter_map(|event| match event.unwrap() {
3712            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3713            _ => None,
3714        })
3715        .collect()
3716}
3717
3718struct ThreadTest {
3719    model: Arc<dyn LanguageModel>,
3720    thread: Entity<Thread>,
3721    project_context: Entity<ProjectContext>,
3722    context_server_store: Entity<ContextServerStore>,
3723    fs: Arc<FakeFs>,
3724}
3725
3726enum TestModel {
3727    Sonnet4,
3728    Fake,
3729}
3730
3731impl TestModel {
3732    fn id(&self) -> LanguageModelId {
3733        match self {
3734            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3735            TestModel::Fake => unreachable!(),
3736        }
3737    }
3738}
3739
3740async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3741    cx.executor().allow_parking();
3742
3743    let fs = FakeFs::new(cx.background_executor.clone());
3744    fs.create_dir(paths::settings_file().parent().unwrap())
3745        .await
3746        .unwrap();
3747    fs.insert_file(
3748        paths::settings_file(),
3749        json!({
3750            "agent": {
3751                "default_profile": "test-profile",
3752                "profiles": {
3753                    "test-profile": {
3754                        "name": "Test Profile",
3755                        "tools": {
3756                            EchoTool::NAME: true,
3757                            DelayTool::NAME: true,
3758                            WordListTool::NAME: true,
3759                            ToolRequiringPermission::NAME: true,
3760                            InfiniteTool::NAME: true,
3761                            CancellationAwareTool::NAME: true,
3762                            StreamingEchoTool::NAME: true,
3763                            StreamingFailingEchoTool::NAME: true,
3764                            TerminalTool::NAME: true,
3765                        }
3766                    }
3767                }
3768            }
3769        })
3770        .to_string()
3771        .into_bytes(),
3772    )
3773    .await;
3774
3775    cx.update(|cx| {
3776        settings::init(cx);
3777
3778        match model {
3779            TestModel::Fake => {}
3780            TestModel::Sonnet4 => {
3781                gpui_tokio::init(cx);
3782                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3783                cx.set_http_client(Arc::new(http_client));
3784                let client = Client::production(cx);
3785                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3786                language_model::init(user_store.clone(), client.clone(), cx);
3787                language_models::init(user_store, client.clone(), cx);
3788            }
3789        };
3790
3791        watch_settings(fs.clone(), cx);
3792    });
3793
3794    let templates = Templates::new();
3795
3796    fs.insert_tree(path!("/test"), json!({})).await;
3797    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3798
3799    let model = cx
3800        .update(|cx| {
3801            if let TestModel::Fake = model {
3802                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3803            } else {
3804                let model_id = model.id();
3805                let models = LanguageModelRegistry::read_global(cx);
3806                let model = models
3807                    .available_models(cx)
3808                    .find(|model| model.id() == model_id)
3809                    .unwrap();
3810
3811                let provider = models.provider(&model.provider_id()).unwrap();
3812                let authenticated = provider.authenticate(cx);
3813
3814                cx.spawn(async move |_cx| {
3815                    authenticated.await.unwrap();
3816                    model
3817                })
3818            }
3819        })
3820        .await;
3821
3822    let project_context = cx.new(|_cx| ProjectContext::default());
3823    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3824    let context_server_registry =
3825        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3826    let thread = cx.new(|cx| {
3827        Thread::new(
3828            project,
3829            project_context.clone(),
3830            context_server_registry,
3831            templates,
3832            Some(model.clone()),
3833            cx,
3834        )
3835    });
3836    ThreadTest {
3837        model,
3838        thread,
3839        project_context,
3840        context_server_store,
3841        fs,
3842    }
3843}
3844
3845#[cfg(test)]
3846#[ctor::ctor]
3847fn init_logger() {
3848    if std::env::var("RUST_LOG").is_ok() {
3849        env_logger::init();
3850    }
3851}
3852
3853fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
3854    let fs = fs.clone();
3855    cx.spawn({
3856        async move |cx| {
3857            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
3858                cx.background_executor(),
3859                fs,
3860                paths::settings_file().clone(),
3861            );
3862            let _watcher_task = watcher_task;
3863
3864            while let Some(new_settings_content) = new_settings_content_rx.next().await {
3865                cx.update(|cx| {
3866                    SettingsStore::update_global(cx, |settings, cx| {
3867                        settings.set_user_settings(&new_settings_content, cx)
3868                    })
3869                })
3870                .ok();
3871            }
3872        }
3873    })
3874    .detach();
3875}
3876
3877fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
3878    completion
3879        .tools
3880        .iter()
3881        .map(|tool| tool.name.clone())
3882        .collect()
3883}
3884
3885fn setup_context_server(
3886    name: &'static str,
3887    tools: Vec<context_server::types::Tool>,
3888    context_server_store: &Entity<ContextServerStore>,
3889    cx: &mut TestAppContext,
3890) -> mpsc::UnboundedReceiver<(
3891    context_server::types::CallToolParams,
3892    oneshot::Sender<context_server::types::CallToolResponse>,
3893)> {
3894    cx.update(|cx| {
3895        let mut settings = ProjectSettings::get_global(cx).clone();
3896        settings.context_servers.insert(
3897            name.into(),
3898            project::project_settings::ContextServerSettings::Stdio {
3899                enabled: true,
3900                remote: false,
3901                command: ContextServerCommand {
3902                    path: "somebinary".into(),
3903                    args: Vec::new(),
3904                    env: None,
3905                    timeout: None,
3906                },
3907            },
3908        );
3909        ProjectSettings::override_global(settings, cx);
3910    });
3911
3912    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
3913    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
3914        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
3915            context_server::types::InitializeResponse {
3916                protocol_version: context_server::types::ProtocolVersion(
3917                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
3918                ),
3919                server_info: context_server::types::Implementation {
3920                    name: name.into(),
3921                    version: "1.0.0".to_string(),
3922                },
3923                capabilities: context_server::types::ServerCapabilities {
3924                    tools: Some(context_server::types::ToolsCapabilities {
3925                        list_changed: Some(true),
3926                    }),
3927                    ..Default::default()
3928                },
3929                meta: None,
3930            }
3931        })
3932        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
3933            let tools = tools.clone();
3934            async move {
3935                context_server::types::ListToolsResponse {
3936                    tools,
3937                    next_cursor: None,
3938                    meta: None,
3939                }
3940            }
3941        })
3942        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
3943            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
3944            async move {
3945                let (response_tx, response_rx) = oneshot::channel();
3946                mcp_tool_calls_tx
3947                    .unbounded_send((params, response_tx))
3948                    .unwrap();
3949                response_rx.await.unwrap()
3950            }
3951        });
3952    context_server_store.update(cx, |store, cx| {
3953        store.start_server(
3954            Arc::new(ContextServer::new(
3955                ContextServerId(name.into()),
3956                Arc::new(fake_transport),
3957            )),
3958            cx,
3959        );
3960    });
3961    cx.run_until_parked();
3962    mcp_tool_calls_rx
3963}
3964
3965#[gpui::test]
3966async fn test_tokens_before_message(cx: &mut TestAppContext) {
3967    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3968    let fake_model = model.as_fake();
3969
3970    // First message
3971    let message_1_id = UserMessageId::new();
3972    thread
3973        .update(cx, |thread, cx| {
3974            thread.send(message_1_id.clone(), ["First message"], cx)
3975        })
3976        .unwrap();
3977    cx.run_until_parked();
3978
3979    // Before any response, tokens_before_message should return None for first message
3980    thread.read_with(cx, |thread, _| {
3981        assert_eq!(
3982            thread.tokens_before_message(&message_1_id),
3983            None,
3984            "First message should have no tokens before it"
3985        );
3986    });
3987
3988    // Complete first message with usage
3989    fake_model.send_last_completion_stream_text_chunk("Response 1");
3990    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3991        language_model::TokenUsage {
3992            input_tokens: 100,
3993            output_tokens: 50,
3994            cache_creation_input_tokens: 0,
3995            cache_read_input_tokens: 0,
3996        },
3997    ));
3998    fake_model.end_last_completion_stream();
3999    cx.run_until_parked();
4000
4001    // First message still has no tokens before it
4002    thread.read_with(cx, |thread, _| {
4003        assert_eq!(
4004            thread.tokens_before_message(&message_1_id),
4005            None,
4006            "First message should still have no tokens before it after response"
4007        );
4008    });
4009
4010    // Second message
4011    let message_2_id = UserMessageId::new();
4012    thread
4013        .update(cx, |thread, cx| {
4014            thread.send(message_2_id.clone(), ["Second message"], cx)
4015        })
4016        .unwrap();
4017    cx.run_until_parked();
4018
4019    // Second message should have first message's input tokens before it
4020    thread.read_with(cx, |thread, _| {
4021        assert_eq!(
4022            thread.tokens_before_message(&message_2_id),
4023            Some(100),
4024            "Second message should have 100 tokens before it (from first request)"
4025        );
4026    });
4027
4028    // Complete second message
4029    fake_model.send_last_completion_stream_text_chunk("Response 2");
4030    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4031        language_model::TokenUsage {
4032            input_tokens: 250, // Total for this request (includes previous context)
4033            output_tokens: 75,
4034            cache_creation_input_tokens: 0,
4035            cache_read_input_tokens: 0,
4036        },
4037    ));
4038    fake_model.end_last_completion_stream();
4039    cx.run_until_parked();
4040
4041    // Third message
4042    let message_3_id = UserMessageId::new();
4043    thread
4044        .update(cx, |thread, cx| {
4045            thread.send(message_3_id.clone(), ["Third message"], cx)
4046        })
4047        .unwrap();
4048    cx.run_until_parked();
4049
4050    // Third message should have second message's input tokens (250) before it
4051    thread.read_with(cx, |thread, _| {
4052        assert_eq!(
4053            thread.tokens_before_message(&message_3_id),
4054            Some(250),
4055            "Third message should have 250 tokens before it (from second request)"
4056        );
4057        // Second message should still have 100
4058        assert_eq!(
4059            thread.tokens_before_message(&message_2_id),
4060            Some(100),
4061            "Second message should still have 100 tokens before it"
4062        );
4063        // First message still has none
4064        assert_eq!(
4065            thread.tokens_before_message(&message_1_id),
4066            None,
4067            "First message should still have no tokens before it"
4068        );
4069    });
4070}
4071
4072#[gpui::test]
4073async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4074    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4075    let fake_model = model.as_fake();
4076
4077    // Set up three messages with responses
4078    let message_1_id = UserMessageId::new();
4079    thread
4080        .update(cx, |thread, cx| {
4081            thread.send(message_1_id.clone(), ["Message 1"], cx)
4082        })
4083        .unwrap();
4084    cx.run_until_parked();
4085    fake_model.send_last_completion_stream_text_chunk("Response 1");
4086    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4087        language_model::TokenUsage {
4088            input_tokens: 100,
4089            output_tokens: 50,
4090            cache_creation_input_tokens: 0,
4091            cache_read_input_tokens: 0,
4092        },
4093    ));
4094    fake_model.end_last_completion_stream();
4095    cx.run_until_parked();
4096
4097    let message_2_id = UserMessageId::new();
4098    thread
4099        .update(cx, |thread, cx| {
4100            thread.send(message_2_id.clone(), ["Message 2"], cx)
4101        })
4102        .unwrap();
4103    cx.run_until_parked();
4104    fake_model.send_last_completion_stream_text_chunk("Response 2");
4105    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4106        language_model::TokenUsage {
4107            input_tokens: 250,
4108            output_tokens: 75,
4109            cache_creation_input_tokens: 0,
4110            cache_read_input_tokens: 0,
4111        },
4112    ));
4113    fake_model.end_last_completion_stream();
4114    cx.run_until_parked();
4115
4116    // Verify initial state
4117    thread.read_with(cx, |thread, _| {
4118        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4119    });
4120
4121    // Truncate at message 2 (removes message 2 and everything after)
4122    thread
4123        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4124        .unwrap();
4125    cx.run_until_parked();
4126
4127    // After truncation, message_2_id no longer exists, so lookup should return None
4128    thread.read_with(cx, |thread, _| {
4129        assert_eq!(
4130            thread.tokens_before_message(&message_2_id),
4131            None,
4132            "After truncation, message 2 no longer exists"
4133        );
4134        // Message 1 still exists but has no tokens before it
4135        assert_eq!(
4136            thread.tokens_before_message(&message_1_id),
4137            None,
4138            "First message still has no tokens before it"
4139        );
4140    });
4141}
4142
4143#[gpui::test]
4144async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4145    init_test(cx);
4146
4147    let fs = FakeFs::new(cx.executor());
4148    fs.insert_tree("/root", json!({})).await;
4149    let project = Project::test(fs, ["/root".as_ref()], cx).await;
4150
4151    // Test 1: Deny rule blocks command
4152    {
4153        let environment = Rc::new(cx.update(|cx| {
4154            FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4155        }));
4156
4157        cx.update(|cx| {
4158            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4159            settings.tool_permissions.tools.insert(
4160                TerminalTool::NAME.into(),
4161                agent_settings::ToolRules {
4162                    default: Some(settings::ToolPermissionMode::Confirm),
4163                    always_allow: vec![],
4164                    always_deny: vec![
4165                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4166                    ],
4167                    always_confirm: vec![],
4168                    invalid_patterns: vec![],
4169                },
4170            );
4171            agent_settings::AgentSettings::override_global(settings, cx);
4172        });
4173
4174        #[allow(clippy::arc_with_non_send_sync)]
4175        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4176        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4177
4178        let task = cx.update(|cx| {
4179            tool.run(
4180                ToolInput::resolved(crate::TerminalToolInput {
4181                    command: "rm -rf /".to_string(),
4182                    cd: ".".to_string(),
4183                    timeout_ms: None,
4184                }),
4185                event_stream,
4186                cx,
4187            )
4188        });
4189
4190        let result = task.await;
4191        assert!(
4192            result.is_err(),
4193            "expected command to be blocked by deny rule"
4194        );
4195        let err_msg = result.unwrap_err().to_lowercase();
4196        assert!(
4197            err_msg.contains("blocked"),
4198            "error should mention the command was blocked"
4199        );
4200    }
4201
4202    // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4203    {
4204        let environment = Rc::new(cx.update(|cx| {
4205            FakeThreadEnvironment::default()
4206                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4207        }));
4208
4209        cx.update(|cx| {
4210            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4211            settings.tool_permissions.tools.insert(
4212                TerminalTool::NAME.into(),
4213                agent_settings::ToolRules {
4214                    default: Some(settings::ToolPermissionMode::Deny),
4215                    always_allow: vec![
4216                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4217                    ],
4218                    always_deny: vec![],
4219                    always_confirm: vec![],
4220                    invalid_patterns: vec![],
4221                },
4222            );
4223            agent_settings::AgentSettings::override_global(settings, cx);
4224        });
4225
4226        #[allow(clippy::arc_with_non_send_sync)]
4227        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4228        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4229
4230        let task = cx.update(|cx| {
4231            tool.run(
4232                ToolInput::resolved(crate::TerminalToolInput {
4233                    command: "echo hello".to_string(),
4234                    cd: ".".to_string(),
4235                    timeout_ms: None,
4236                }),
4237                event_stream,
4238                cx,
4239            )
4240        });
4241
4242        let update = rx.expect_update_fields().await;
4243        assert!(
4244            update.content.iter().any(|blocks| {
4245                blocks
4246                    .iter()
4247                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4248            }),
4249            "expected terminal content (allow rule should skip confirmation and override default deny)"
4250        );
4251
4252        let result = task.await;
4253        assert!(
4254            result.is_ok(),
4255            "expected command to succeed without confirmation"
4256        );
4257    }
4258
4259    // Test 3: global default: allow does NOT override always_confirm patterns
4260    {
4261        let environment = Rc::new(cx.update(|cx| {
4262            FakeThreadEnvironment::default()
4263                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4264        }));
4265
4266        cx.update(|cx| {
4267            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4268            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4269            settings.tool_permissions.tools.insert(
4270                TerminalTool::NAME.into(),
4271                agent_settings::ToolRules {
4272                    default: Some(settings::ToolPermissionMode::Allow),
4273                    always_allow: vec![],
4274                    always_deny: vec![],
4275                    always_confirm: vec![
4276                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4277                    ],
4278                    invalid_patterns: vec![],
4279                },
4280            );
4281            agent_settings::AgentSettings::override_global(settings, cx);
4282        });
4283
4284        #[allow(clippy::arc_with_non_send_sync)]
4285        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4286        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4287
4288        let _task = cx.update(|cx| {
4289            tool.run(
4290                ToolInput::resolved(crate::TerminalToolInput {
4291                    command: "sudo rm file".to_string(),
4292                    cd: ".".to_string(),
4293                    timeout_ms: None,
4294                }),
4295                event_stream,
4296                cx,
4297            )
4298        });
4299
4300        // With global default: allow, confirm patterns are still respected
4301        // The expect_authorization() call will panic if no authorization is requested,
4302        // which validates that the confirm pattern still triggers confirmation
4303        let _auth = rx.expect_authorization().await;
4304
4305        drop(_task);
4306    }
4307
4308    // Test 4: tool-specific default: deny is respected even with global default: allow
4309    {
4310        let environment = Rc::new(cx.update(|cx| {
4311            FakeThreadEnvironment::default()
4312                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4313        }));
4314
4315        cx.update(|cx| {
4316            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4317            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4318            settings.tool_permissions.tools.insert(
4319                TerminalTool::NAME.into(),
4320                agent_settings::ToolRules {
4321                    default: Some(settings::ToolPermissionMode::Deny),
4322                    always_allow: vec![],
4323                    always_deny: vec![],
4324                    always_confirm: vec![],
4325                    invalid_patterns: vec![],
4326                },
4327            );
4328            agent_settings::AgentSettings::override_global(settings, cx);
4329        });
4330
4331        #[allow(clippy::arc_with_non_send_sync)]
4332        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4333        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4334
4335        let task = cx.update(|cx| {
4336            tool.run(
4337                ToolInput::resolved(crate::TerminalToolInput {
4338                    command: "echo hello".to_string(),
4339                    cd: ".".to_string(),
4340                    timeout_ms: None,
4341                }),
4342                event_stream,
4343                cx,
4344            )
4345        });
4346
4347        // tool-specific default: deny is respected even with global default: allow
4348        let result = task.await;
4349        assert!(
4350            result.is_err(),
4351            "expected command to be blocked by tool-specific deny default"
4352        );
4353        let err_msg = result.unwrap_err().to_lowercase();
4354        assert!(
4355            err_msg.contains("disabled"),
4356            "error should mention the tool is disabled, got: {err_msg}"
4357        );
4358    }
4359}
4360
4361#[gpui::test]
4362async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4363    init_test(cx);
4364    cx.update(|cx| {
4365        LanguageModelRegistry::test(cx);
4366    });
4367    cx.update(|cx| {
4368        cx.update_flags(true, vec!["subagents".to_string()]);
4369    });
4370
4371    let fs = FakeFs::new(cx.executor());
4372    fs.insert_tree(
4373        "/",
4374        json!({
4375            "a": {
4376                "b.md": "Lorem"
4377            }
4378        }),
4379    )
4380    .await;
4381    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4382    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4383    let agent = cx.update(|cx| {
4384        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4385    });
4386    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4387
4388    let acp_thread = cx
4389        .update(|cx| {
4390            connection
4391                .clone()
4392                .new_session(project.clone(), Path::new(""), cx)
4393        })
4394        .await
4395        .unwrap();
4396    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4397    let thread = agent.read_with(cx, |agent, _| {
4398        agent.sessions.get(&session_id).unwrap().thread.clone()
4399    });
4400    let model = Arc::new(FakeLanguageModel::default());
4401
4402    // Ensure empty threads are not saved, even if they get mutated.
4403    thread.update(cx, |thread, cx| {
4404        thread.set_model(model.clone(), cx);
4405    });
4406    cx.run_until_parked();
4407
4408    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4409    cx.run_until_parked();
4410    model.send_last_completion_stream_text_chunk("spawning subagent");
4411    let subagent_tool_input = SpawnAgentToolInput {
4412        label: "label".to_string(),
4413        message: "subagent task prompt".to_string(),
4414        session_id: None,
4415    };
4416    let subagent_tool_use = LanguageModelToolUse {
4417        id: "subagent_1".into(),
4418        name: SpawnAgentTool::NAME.into(),
4419        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4420        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4421        is_input_complete: true,
4422        thought_signature: None,
4423    };
4424    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4425        subagent_tool_use,
4426    ));
4427    model.end_last_completion_stream();
4428
4429    cx.run_until_parked();
4430
4431    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4432        thread
4433            .running_subagent_ids(cx)
4434            .get(0)
4435            .expect("subagent thread should be running")
4436            .clone()
4437    });
4438
4439    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4440        agent
4441            .sessions
4442            .get(&subagent_session_id)
4443            .expect("subagent session should exist")
4444            .acp_thread
4445            .clone()
4446    });
4447
4448    model.send_last_completion_stream_text_chunk("subagent task response");
4449    model.end_last_completion_stream();
4450
4451    cx.run_until_parked();
4452
4453    assert_eq!(
4454        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4455        indoc! {"
4456            ## User
4457
4458            subagent task prompt
4459
4460            ## Assistant
4461
4462            subagent task response
4463
4464        "}
4465    );
4466
4467    model.send_last_completion_stream_text_chunk("Response");
4468    model.end_last_completion_stream();
4469
4470    send.await.unwrap();
4471
4472    assert_eq!(
4473        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4474        indoc! {r#"
4475            ## User
4476
4477            Prompt
4478
4479            ## Assistant
4480
4481            spawning subagent
4482
4483            **Tool Call: label**
4484            Status: Completed
4485
4486            subagent task response
4487
4488            ## Assistant
4489
4490            Response
4491
4492        "#},
4493    );
4494}
4495
4496#[gpui::test]
4497async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4498    init_test(cx);
4499    cx.update(|cx| {
4500        LanguageModelRegistry::test(cx);
4501    });
4502    cx.update(|cx| {
4503        cx.update_flags(true, vec!["subagents".to_string()]);
4504    });
4505
4506    let fs = FakeFs::new(cx.executor());
4507    fs.insert_tree(
4508        "/",
4509        json!({
4510            "a": {
4511                "b.md": "Lorem"
4512            }
4513        }),
4514    )
4515    .await;
4516    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4517    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4518    let agent = cx.update(|cx| {
4519        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4520    });
4521    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4522
4523    let acp_thread = cx
4524        .update(|cx| {
4525            connection
4526                .clone()
4527                .new_session(project.clone(), Path::new(""), cx)
4528        })
4529        .await
4530        .unwrap();
4531    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4532    let thread = agent.read_with(cx, |agent, _| {
4533        agent.sessions.get(&session_id).unwrap().thread.clone()
4534    });
4535    let model = Arc::new(FakeLanguageModel::default());
4536
4537    // Ensure empty threads are not saved, even if they get mutated.
4538    thread.update(cx, |thread, cx| {
4539        thread.set_model(model.clone(), cx);
4540    });
4541    cx.run_until_parked();
4542
4543    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4544    cx.run_until_parked();
4545    model.send_last_completion_stream_text_chunk("spawning subagent");
4546    let subagent_tool_input = SpawnAgentToolInput {
4547        label: "label".to_string(),
4548        message: "subagent task prompt".to_string(),
4549        session_id: None,
4550    };
4551    let subagent_tool_use = LanguageModelToolUse {
4552        id: "subagent_1".into(),
4553        name: SpawnAgentTool::NAME.into(),
4554        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4555        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4556        is_input_complete: true,
4557        thought_signature: None,
4558    };
4559    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4560        subagent_tool_use,
4561    ));
4562    model.end_last_completion_stream();
4563
4564    cx.run_until_parked();
4565
4566    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4567        thread
4568            .running_subagent_ids(cx)
4569            .get(0)
4570            .expect("subagent thread should be running")
4571            .clone()
4572    });
4573
4574    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4575        agent
4576            .sessions
4577            .get(&subagent_session_id)
4578            .expect("subagent session should exist")
4579            .acp_thread
4580            .clone()
4581    });
4582
4583    model.send_last_completion_stream_text_chunk("subagent task response 1");
4584    model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4585        text: "thinking more about the subagent task".into(),
4586        signature: None,
4587    });
4588    model.send_last_completion_stream_text_chunk("subagent task response 2");
4589    model.end_last_completion_stream();
4590
4591    cx.run_until_parked();
4592
4593    assert_eq!(
4594        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4595        indoc! {"
4596            ## User
4597
4598            subagent task prompt
4599
4600            ## Assistant
4601
4602            subagent task response 1
4603
4604            <thinking>
4605            thinking more about the subagent task
4606            </thinking>
4607
4608            subagent task response 2
4609
4610        "}
4611    );
4612
4613    model.send_last_completion_stream_text_chunk("Response");
4614    model.end_last_completion_stream();
4615
4616    send.await.unwrap();
4617
4618    assert_eq!(
4619        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4620        indoc! {r#"
4621            ## User
4622
4623            Prompt
4624
4625            ## Assistant
4626
4627            spawning subagent
4628
4629            **Tool Call: label**
4630            Status: Completed
4631
4632            subagent task response 1
4633
4634            subagent task response 2
4635
4636            ## Assistant
4637
4638            Response
4639
4640        "#},
4641    );
4642}
4643
4644#[gpui::test]
4645async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4646    init_test(cx);
4647    cx.update(|cx| {
4648        LanguageModelRegistry::test(cx);
4649    });
4650    cx.update(|cx| {
4651        cx.update_flags(true, vec!["subagents".to_string()]);
4652    });
4653
4654    let fs = FakeFs::new(cx.executor());
4655    fs.insert_tree(
4656        "/",
4657        json!({
4658            "a": {
4659                "b.md": "Lorem"
4660            }
4661        }),
4662    )
4663    .await;
4664    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4665    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4666    let agent = cx.update(|cx| {
4667        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4668    });
4669    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4670
4671    let acp_thread = cx
4672        .update(|cx| {
4673            connection
4674                .clone()
4675                .new_session(project.clone(), Path::new(""), cx)
4676        })
4677        .await
4678        .unwrap();
4679    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4680    let thread = agent.read_with(cx, |agent, _| {
4681        agent.sessions.get(&session_id).unwrap().thread.clone()
4682    });
4683    let model = Arc::new(FakeLanguageModel::default());
4684
4685    // Ensure empty threads are not saved, even if they get mutated.
4686    thread.update(cx, |thread, cx| {
4687        thread.set_model(model.clone(), cx);
4688    });
4689    cx.run_until_parked();
4690
4691    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4692    cx.run_until_parked();
4693    model.send_last_completion_stream_text_chunk("spawning subagent");
4694    let subagent_tool_input = SpawnAgentToolInput {
4695        label: "label".to_string(),
4696        message: "subagent task prompt".to_string(),
4697        session_id: None,
4698    };
4699    let subagent_tool_use = LanguageModelToolUse {
4700        id: "subagent_1".into(),
4701        name: SpawnAgentTool::NAME.into(),
4702        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4703        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4704        is_input_complete: true,
4705        thought_signature: None,
4706    };
4707    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4708        subagent_tool_use,
4709    ));
4710    model.end_last_completion_stream();
4711
4712    cx.run_until_parked();
4713
4714    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4715        thread
4716            .running_subagent_ids(cx)
4717            .get(0)
4718            .expect("subagent thread should be running")
4719            .clone()
4720    });
4721    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4722        agent
4723            .sessions
4724            .get(&subagent_session_id)
4725            .expect("subagent session should exist")
4726            .acp_thread
4727            .clone()
4728    });
4729
4730    // model.send_last_completion_stream_text_chunk("subagent task response");
4731    // model.end_last_completion_stream();
4732
4733    // cx.run_until_parked();
4734
4735    acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4736
4737    cx.run_until_parked();
4738
4739    send.await.unwrap();
4740
4741    acp_thread.read_with(cx, |thread, cx| {
4742        assert_eq!(thread.status(), ThreadStatus::Idle);
4743        assert_eq!(
4744            thread.to_markdown(cx),
4745            indoc! {"
4746                ## User
4747
4748                Prompt
4749
4750                ## Assistant
4751
4752                spawning subagent
4753
4754                **Tool Call: label**
4755                Status: Canceled
4756
4757            "}
4758        );
4759    });
4760    subagent_acp_thread.read_with(cx, |thread, cx| {
4761        assert_eq!(thread.status(), ThreadStatus::Idle);
4762        assert_eq!(
4763            thread.to_markdown(cx),
4764            indoc! {"
4765                ## User
4766
4767                subagent task prompt
4768
4769            "}
4770        );
4771    });
4772}
4773
4774#[gpui::test]
4775async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4776    init_test(cx);
4777    cx.update(|cx| {
4778        LanguageModelRegistry::test(cx);
4779    });
4780    cx.update(|cx| {
4781        cx.update_flags(true, vec!["subagents".to_string()]);
4782    });
4783
4784    let fs = FakeFs::new(cx.executor());
4785    fs.insert_tree(
4786        "/",
4787        json!({
4788            "a": {
4789                "b.md": "Lorem"
4790            }
4791        }),
4792    )
4793    .await;
4794    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4795    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4796    let agent = cx.update(|cx| {
4797        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4798    });
4799    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4800
4801    let acp_thread = cx
4802        .update(|cx| {
4803            connection
4804                .clone()
4805                .new_session(project.clone(), Path::new(""), cx)
4806        })
4807        .await
4808        .unwrap();
4809    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4810    let thread = agent.read_with(cx, |agent, _| {
4811        agent.sessions.get(&session_id).unwrap().thread.clone()
4812    });
4813    let model = Arc::new(FakeLanguageModel::default());
4814
4815    thread.update(cx, |thread, cx| {
4816        thread.set_model(model.clone(), cx);
4817    });
4818    cx.run_until_parked();
4819
4820    // === First turn: create subagent ===
4821    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
4822    cx.run_until_parked();
4823    model.send_last_completion_stream_text_chunk("spawning subagent");
4824    let subagent_tool_input = SpawnAgentToolInput {
4825        label: "initial task".to_string(),
4826        message: "do the first task".to_string(),
4827        session_id: None,
4828    };
4829    let subagent_tool_use = LanguageModelToolUse {
4830        id: "subagent_1".into(),
4831        name: SpawnAgentTool::NAME.into(),
4832        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4833        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4834        is_input_complete: true,
4835        thought_signature: None,
4836    };
4837    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4838        subagent_tool_use,
4839    ));
4840    model.end_last_completion_stream();
4841
4842    cx.run_until_parked();
4843
4844    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4845        thread
4846            .running_subagent_ids(cx)
4847            .get(0)
4848            .expect("subagent thread should be running")
4849            .clone()
4850    });
4851
4852    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4853        agent
4854            .sessions
4855            .get(&subagent_session_id)
4856            .expect("subagent session should exist")
4857            .acp_thread
4858            .clone()
4859    });
4860
4861    // Subagent responds
4862    model.send_last_completion_stream_text_chunk("first task response");
4863    model.end_last_completion_stream();
4864
4865    cx.run_until_parked();
4866
4867    // Parent model responds to complete first turn
4868    model.send_last_completion_stream_text_chunk("First response");
4869    model.end_last_completion_stream();
4870
4871    send.await.unwrap();
4872
4873    // Verify subagent is no longer running
4874    thread.read_with(cx, |thread, cx| {
4875        assert!(
4876            thread.running_subagent_ids(cx).is_empty(),
4877            "subagent should not be running after completion"
4878        );
4879    });
4880
4881    // === Second turn: resume subagent with session_id ===
4882    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
4883    cx.run_until_parked();
4884    model.send_last_completion_stream_text_chunk("resuming subagent");
4885    let resume_tool_input = SpawnAgentToolInput {
4886        label: "follow-up task".to_string(),
4887        message: "do the follow-up task".to_string(),
4888        session_id: Some(subagent_session_id.clone()),
4889    };
4890    let resume_tool_use = LanguageModelToolUse {
4891        id: "subagent_2".into(),
4892        name: SpawnAgentTool::NAME.into(),
4893        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
4894        input: serde_json::to_value(&resume_tool_input).unwrap(),
4895        is_input_complete: true,
4896        thought_signature: None,
4897    };
4898    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
4899    model.end_last_completion_stream();
4900
4901    cx.run_until_parked();
4902
4903    // Subagent should be running again with the same session
4904    thread.read_with(cx, |thread, cx| {
4905        let running = thread.running_subagent_ids(cx);
4906        assert_eq!(running.len(), 1, "subagent should be running");
4907        assert_eq!(running[0], subagent_session_id, "should be same session");
4908    });
4909
4910    // Subagent responds to follow-up
4911    model.send_last_completion_stream_text_chunk("follow-up task response");
4912    model.end_last_completion_stream();
4913
4914    cx.run_until_parked();
4915
4916    // Parent model responds to complete second turn
4917    model.send_last_completion_stream_text_chunk("Second response");
4918    model.end_last_completion_stream();
4919
4920    send2.await.unwrap();
4921
4922    // Verify subagent is no longer running
4923    thread.read_with(cx, |thread, cx| {
4924        assert!(
4925            thread.running_subagent_ids(cx).is_empty(),
4926            "subagent should not be running after resume completion"
4927        );
4928    });
4929
4930    // Verify the subagent's acp thread has both conversation turns
4931    assert_eq!(
4932        subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4933        indoc! {"
4934            ## User
4935
4936            do the first task
4937
4938            ## Assistant
4939
4940            first task response
4941
4942            ## User
4943
4944            do the follow-up task
4945
4946            ## Assistant
4947
4948            follow-up task response
4949
4950        "}
4951    );
4952}
4953
4954#[gpui::test]
4955async fn test_subagent_tool_is_present_when_feature_flag_enabled(cx: &mut TestAppContext) {
4956    init_test(cx);
4957
4958    cx.update(|cx| {
4959        cx.update_flags(true, vec!["subagents".to_string()]);
4960    });
4961
4962    let fs = FakeFs::new(cx.executor());
4963    fs.insert_tree(path!("/test"), json!({})).await;
4964    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4965    let project_context = cx.new(|_cx| ProjectContext::default());
4966    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4967    let context_server_registry =
4968        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4969    let model = Arc::new(FakeLanguageModel::default());
4970
4971    let environment = Rc::new(cx.update(|cx| {
4972        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4973    }));
4974
4975    let thread = cx.new(|cx| {
4976        let mut thread = Thread::new(
4977            project.clone(),
4978            project_context,
4979            context_server_registry,
4980            Templates::new(),
4981            Some(model),
4982            cx,
4983        );
4984        thread.add_default_tools(environment, cx);
4985        thread
4986    });
4987
4988    thread.read_with(cx, |thread, _| {
4989        assert!(
4990            thread.has_registered_tool(SpawnAgentTool::NAME),
4991            "subagent tool should be present when feature flag is enabled"
4992        );
4993    });
4994}
4995
4996#[gpui::test]
4997async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
4998    init_test(cx);
4999
5000    cx.update(|cx| {
5001        cx.update_flags(true, vec!["subagents".to_string()]);
5002    });
5003
5004    let fs = FakeFs::new(cx.executor());
5005    fs.insert_tree(path!("/test"), json!({})).await;
5006    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5007    let project_context = cx.new(|_cx| ProjectContext::default());
5008    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5009    let context_server_registry =
5010        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5011    let model = Arc::new(FakeLanguageModel::default());
5012
5013    let parent_thread = cx.new(|cx| {
5014        Thread::new(
5015            project.clone(),
5016            project_context,
5017            context_server_registry,
5018            Templates::new(),
5019            Some(model.clone()),
5020            cx,
5021        )
5022    });
5023
5024    let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5025    subagent_thread.read_with(cx, |subagent_thread, cx| {
5026        assert!(subagent_thread.is_subagent());
5027        assert_eq!(subagent_thread.depth(), 1);
5028        assert_eq!(
5029            subagent_thread.model().map(|model| model.id()),
5030            Some(model.id())
5031        );
5032        assert_eq!(
5033            subagent_thread.parent_thread_id(),
5034            Some(parent_thread.read(cx).id().clone())
5035        );
5036    });
5037}
5038
5039#[gpui::test]
5040async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5041    init_test(cx);
5042
5043    cx.update(|cx| {
5044        cx.update_flags(true, vec!["subagents".to_string()]);
5045    });
5046
5047    let fs = FakeFs::new(cx.executor());
5048    fs.insert_tree(path!("/test"), json!({})).await;
5049    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5050    let project_context = cx.new(|_cx| ProjectContext::default());
5051    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5052    let context_server_registry =
5053        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5054    let model = Arc::new(FakeLanguageModel::default());
5055    let environment = Rc::new(cx.update(|cx| {
5056        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5057    }));
5058
5059    let deep_parent_thread = cx.new(|cx| {
5060        let mut thread = Thread::new(
5061            project.clone(),
5062            project_context,
5063            context_server_registry,
5064            Templates::new(),
5065            Some(model.clone()),
5066            cx,
5067        );
5068        thread.set_subagent_context(SubagentContext {
5069            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5070            depth: MAX_SUBAGENT_DEPTH - 1,
5071        });
5072        thread
5073    });
5074    let deep_subagent_thread = cx.new(|cx| {
5075        let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5076        thread.add_default_tools(environment, cx);
5077        thread
5078    });
5079
5080    deep_subagent_thread.read_with(cx, |thread, _| {
5081        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5082        assert!(
5083            !thread.has_registered_tool(SpawnAgentTool::NAME),
5084            "subagent tool should not be present at max depth"
5085        );
5086    });
5087}
5088
5089#[gpui::test]
5090async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5091    init_test(cx);
5092
5093    cx.update(|cx| {
5094        cx.update_flags(true, vec!["subagents".to_string()]);
5095    });
5096
5097    let fs = FakeFs::new(cx.executor());
5098    fs.insert_tree(path!("/test"), json!({})).await;
5099    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5100    let project_context = cx.new(|_cx| ProjectContext::default());
5101    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5102    let context_server_registry =
5103        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5104    let model = Arc::new(FakeLanguageModel::default());
5105
5106    let parent = cx.new(|cx| {
5107        Thread::new(
5108            project.clone(),
5109            project_context.clone(),
5110            context_server_registry.clone(),
5111            Templates::new(),
5112            Some(model.clone()),
5113            cx,
5114        )
5115    });
5116
5117    let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5118
5119    parent.update(cx, |thread, _cx| {
5120        thread.register_running_subagent(subagent.downgrade());
5121    });
5122
5123    subagent
5124        .update(cx, |thread, cx| {
5125            thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5126        })
5127        .unwrap();
5128    cx.run_until_parked();
5129
5130    subagent.read_with(cx, |thread, _| {
5131        assert!(!thread.is_turn_complete(), "subagent should be running");
5132    });
5133
5134    parent.update(cx, |thread, cx| {
5135        thread.cancel(cx).detach();
5136    });
5137
5138    subagent.read_with(cx, |thread, _| {
5139        assert!(
5140            thread.is_turn_complete(),
5141            "subagent should be cancelled when parent cancels"
5142        );
5143    });
5144}
5145
5146#[gpui::test]
5147async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5148    init_test(cx);
5149    cx.update(|cx| {
5150        LanguageModelRegistry::test(cx);
5151    });
5152    cx.update(|cx| {
5153        cx.update_flags(true, vec!["subagents".to_string()]);
5154    });
5155
5156    let fs = FakeFs::new(cx.executor());
5157    fs.insert_tree(
5158        "/",
5159        json!({
5160            "a": {
5161                "b.md": "Lorem"
5162            }
5163        }),
5164    )
5165    .await;
5166    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5167    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5168    let agent = cx.update(|cx| {
5169        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5170    });
5171    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5172
5173    let acp_thread = cx
5174        .update(|cx| {
5175            connection
5176                .clone()
5177                .new_session(project.clone(), Path::new(""), cx)
5178        })
5179        .await
5180        .unwrap();
5181    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5182    let thread = agent.read_with(cx, |agent, _| {
5183        agent.sessions.get(&session_id).unwrap().thread.clone()
5184    });
5185    let model = Arc::new(FakeLanguageModel::default());
5186
5187    thread.update(cx, |thread, cx| {
5188        thread.set_model(model.clone(), cx);
5189    });
5190    cx.run_until_parked();
5191
5192    // Start the parent turn
5193    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5194    cx.run_until_parked();
5195    model.send_last_completion_stream_text_chunk("spawning subagent");
5196    let subagent_tool_input = SpawnAgentToolInput {
5197        label: "label".to_string(),
5198        message: "subagent task prompt".to_string(),
5199        session_id: None,
5200    };
5201    let subagent_tool_use = LanguageModelToolUse {
5202        id: "subagent_1".into(),
5203        name: SpawnAgentTool::NAME.into(),
5204        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5205        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5206        is_input_complete: true,
5207        thought_signature: None,
5208    };
5209    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5210        subagent_tool_use,
5211    ));
5212    model.end_last_completion_stream();
5213
5214    cx.run_until_parked();
5215
5216    // Verify subagent is running
5217    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5218        thread
5219            .running_subagent_ids(cx)
5220            .get(0)
5221            .expect("subagent thread should be running")
5222            .clone()
5223    });
5224
5225    // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5226    model.send_last_completion_stream_text_chunk("partial work");
5227    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5228        TokenUsage {
5229            input_tokens: 850_000,
5230            output_tokens: 0,
5231            cache_creation_input_tokens: 0,
5232            cache_read_input_tokens: 0,
5233        },
5234    ));
5235
5236    cx.run_until_parked();
5237
5238    // The subagent should no longer be running
5239    thread.read_with(cx, |thread, cx| {
5240        assert!(
5241            thread.running_subagent_ids(cx).is_empty(),
5242            "subagent should be stopped after context window warning"
5243        );
5244    });
5245
5246    // The parent model should get a new completion request to respond to the tool error
5247    model.send_last_completion_stream_text_chunk("Response after warning");
5248    model.end_last_completion_stream();
5249
5250    send.await.unwrap();
5251
5252    // Verify the parent thread shows the warning error in the tool call
5253    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5254    assert!(
5255        markdown.contains("nearing the end of its context window"),
5256        "tool output should contain context window warning message, got:\n{markdown}"
5257    );
5258    assert!(
5259        markdown.contains("Status: Failed"),
5260        "tool call should have Failed status, got:\n{markdown}"
5261    );
5262
5263    // Verify the subagent session still exists (can be resumed)
5264    agent.read_with(cx, |agent, _cx| {
5265        assert!(
5266            agent.sessions.contains_key(&subagent_session_id),
5267            "subagent session should still exist for potential resume"
5268        );
5269    });
5270}
5271
5272#[gpui::test]
5273async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5274    init_test(cx);
5275    cx.update(|cx| {
5276        LanguageModelRegistry::test(cx);
5277    });
5278    cx.update(|cx| {
5279        cx.update_flags(true, vec!["subagents".to_string()]);
5280    });
5281
5282    let fs = FakeFs::new(cx.executor());
5283    fs.insert_tree(
5284        "/",
5285        json!({
5286            "a": {
5287                "b.md": "Lorem"
5288            }
5289        }),
5290    )
5291    .await;
5292    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5293    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5294    let agent = cx.update(|cx| {
5295        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5296    });
5297    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5298
5299    let acp_thread = cx
5300        .update(|cx| {
5301            connection
5302                .clone()
5303                .new_session(project.clone(), Path::new(""), cx)
5304        })
5305        .await
5306        .unwrap();
5307    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5308    let thread = agent.read_with(cx, |agent, _| {
5309        agent.sessions.get(&session_id).unwrap().thread.clone()
5310    });
5311    let model = Arc::new(FakeLanguageModel::default());
5312
5313    thread.update(cx, |thread, cx| {
5314        thread.set_model(model.clone(), cx);
5315    });
5316    cx.run_until_parked();
5317
5318    // === First turn: create subagent, trigger context window warning ===
5319    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5320    cx.run_until_parked();
5321    model.send_last_completion_stream_text_chunk("spawning subagent");
5322    let subagent_tool_input = SpawnAgentToolInput {
5323        label: "initial task".to_string(),
5324        message: "do the first task".to_string(),
5325        session_id: None,
5326    };
5327    let subagent_tool_use = LanguageModelToolUse {
5328        id: "subagent_1".into(),
5329        name: SpawnAgentTool::NAME.into(),
5330        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5331        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5332        is_input_complete: true,
5333        thought_signature: None,
5334    };
5335    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5336        subagent_tool_use,
5337    ));
5338    model.end_last_completion_stream();
5339
5340    cx.run_until_parked();
5341
5342    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5343        thread
5344            .running_subagent_ids(cx)
5345            .get(0)
5346            .expect("subagent thread should be running")
5347            .clone()
5348    });
5349
5350    // Subagent sends a usage update that crosses the warning threshold.
5351    // This triggers Normal→Warning, stopping the subagent.
5352    model.send_last_completion_stream_text_chunk("partial work");
5353    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5354        TokenUsage {
5355            input_tokens: 850_000,
5356            output_tokens: 0,
5357            cache_creation_input_tokens: 0,
5358            cache_read_input_tokens: 0,
5359        },
5360    ));
5361
5362    cx.run_until_parked();
5363
5364    // Verify the first turn was stopped with a context window warning
5365    thread.read_with(cx, |thread, cx| {
5366        assert!(
5367            thread.running_subagent_ids(cx).is_empty(),
5368            "subagent should be stopped after context window warning"
5369        );
5370    });
5371
5372    // Parent model responds to complete first turn
5373    model.send_last_completion_stream_text_chunk("First response");
5374    model.end_last_completion_stream();
5375
5376    send.await.unwrap();
5377
5378    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5379    assert!(
5380        markdown.contains("nearing the end of its context window"),
5381        "first turn should have context window warning, got:\n{markdown}"
5382    );
5383
5384    // === Second turn: resume the same subagent (now at Warning level) ===
5385    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5386    cx.run_until_parked();
5387    model.send_last_completion_stream_text_chunk("resuming subagent");
5388    let resume_tool_input = SpawnAgentToolInput {
5389        label: "follow-up task".to_string(),
5390        message: "do the follow-up task".to_string(),
5391        session_id: Some(subagent_session_id.clone()),
5392    };
5393    let resume_tool_use = LanguageModelToolUse {
5394        id: "subagent_2".into(),
5395        name: SpawnAgentTool::NAME.into(),
5396        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5397        input: serde_json::to_value(&resume_tool_input).unwrap(),
5398        is_input_complete: true,
5399        thought_signature: None,
5400    };
5401    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5402    model.end_last_completion_stream();
5403
5404    cx.run_until_parked();
5405
5406    // Subagent responds with tokens still at warning level (no worse).
5407    // Since ratio_before_prompt was already Warning, this should NOT
5408    // trigger the context window warning again.
5409    model.send_last_completion_stream_text_chunk("follow-up task response");
5410    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5411        TokenUsage {
5412            input_tokens: 870_000,
5413            output_tokens: 0,
5414            cache_creation_input_tokens: 0,
5415            cache_read_input_tokens: 0,
5416        },
5417    ));
5418    model.end_last_completion_stream();
5419
5420    cx.run_until_parked();
5421
5422    // Parent model responds to complete second turn
5423    model.send_last_completion_stream_text_chunk("Second response");
5424    model.end_last_completion_stream();
5425
5426    send2.await.unwrap();
5427
5428    // The resumed subagent should have completed normally since the ratio
5429    // didn't transition (it was Warning before and stayed at Warning)
5430    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5431    assert!(
5432        markdown.contains("follow-up task response"),
5433        "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5434    );
5435    // The second tool call should NOT have a context window warning
5436    let second_tool_pos = markdown
5437        .find("follow-up task")
5438        .expect("should find follow-up tool call");
5439    let after_second_tool = &markdown[second_tool_pos..];
5440    assert!(
5441        !after_second_tool.contains("nearing the end of its context window"),
5442        "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5443    );
5444}
5445
5446#[gpui::test]
5447async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5448    init_test(cx);
5449    cx.update(|cx| {
5450        LanguageModelRegistry::test(cx);
5451    });
5452    cx.update(|cx| {
5453        cx.update_flags(true, vec!["subagents".to_string()]);
5454    });
5455
5456    let fs = FakeFs::new(cx.executor());
5457    fs.insert_tree(
5458        "/",
5459        json!({
5460            "a": {
5461                "b.md": "Lorem"
5462            }
5463        }),
5464    )
5465    .await;
5466    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5467    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5468    let agent = cx.update(|cx| {
5469        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5470    });
5471    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5472
5473    let acp_thread = cx
5474        .update(|cx| {
5475            connection
5476                .clone()
5477                .new_session(project.clone(), Path::new(""), cx)
5478        })
5479        .await
5480        .unwrap();
5481    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5482    let thread = agent.read_with(cx, |agent, _| {
5483        agent.sessions.get(&session_id).unwrap().thread.clone()
5484    });
5485    let model = Arc::new(FakeLanguageModel::default());
5486
5487    thread.update(cx, |thread, cx| {
5488        thread.set_model(model.clone(), cx);
5489    });
5490    cx.run_until_parked();
5491
5492    // Start the parent turn
5493    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5494    cx.run_until_parked();
5495    model.send_last_completion_stream_text_chunk("spawning subagent");
5496    let subagent_tool_input = SpawnAgentToolInput {
5497        label: "label".to_string(),
5498        message: "subagent task prompt".to_string(),
5499        session_id: None,
5500    };
5501    let subagent_tool_use = LanguageModelToolUse {
5502        id: "subagent_1".into(),
5503        name: SpawnAgentTool::NAME.into(),
5504        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5505        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5506        is_input_complete: true,
5507        thought_signature: None,
5508    };
5509    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5510        subagent_tool_use,
5511    ));
5512    model.end_last_completion_stream();
5513
5514    cx.run_until_parked();
5515
5516    // Verify subagent is running
5517    thread.read_with(cx, |thread, cx| {
5518        assert!(
5519            !thread.running_subagent_ids(cx).is_empty(),
5520            "subagent should be running"
5521        );
5522    });
5523
5524    // The subagent's model returns a non-retryable error
5525    model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5526        tokens: None,
5527    });
5528
5529    cx.run_until_parked();
5530
5531    // The subagent should no longer be running
5532    thread.read_with(cx, |thread, cx| {
5533        assert!(
5534            thread.running_subagent_ids(cx).is_empty(),
5535            "subagent should not be running after error"
5536        );
5537    });
5538
5539    // The parent model should get a new completion request to respond to the tool error
5540    model.send_last_completion_stream_text_chunk("Response after error");
5541    model.end_last_completion_stream();
5542
5543    send.await.unwrap();
5544
5545    // Verify the parent thread shows the error in the tool call
5546    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5547    assert!(
5548        markdown.contains("Status: Failed"),
5549        "tool call should have Failed status after model error, got:\n{markdown}"
5550    );
5551}
5552
5553#[gpui::test]
5554async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5555    init_test(cx);
5556
5557    let fs = FakeFs::new(cx.executor());
5558    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5559        .await;
5560    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5561
5562    cx.update(|cx| {
5563        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5564        settings.tool_permissions.tools.insert(
5565            EditFileTool::NAME.into(),
5566            agent_settings::ToolRules {
5567                default: Some(settings::ToolPermissionMode::Allow),
5568                always_allow: vec![],
5569                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5570                always_confirm: vec![],
5571                invalid_patterns: vec![],
5572            },
5573        );
5574        agent_settings::AgentSettings::override_global(settings, cx);
5575    });
5576
5577    let context_server_registry =
5578        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5579    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5580    let templates = crate::Templates::new();
5581    let thread = cx.new(|cx| {
5582        crate::Thread::new(
5583            project.clone(),
5584            cx.new(|_cx| prompt_store::ProjectContext::default()),
5585            context_server_registry,
5586            templates.clone(),
5587            None,
5588            cx,
5589        )
5590    });
5591
5592    #[allow(clippy::arc_with_non_send_sync)]
5593    let tool = Arc::new(crate::EditFileTool::new(
5594        project.clone(),
5595        thread.downgrade(),
5596        language_registry,
5597        templates,
5598    ));
5599    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5600
5601    let task = cx.update(|cx| {
5602        tool.run(
5603            ToolInput::resolved(crate::EditFileToolInput {
5604                display_description: "Edit sensitive file".to_string(),
5605                path: "root/sensitive_config.txt".into(),
5606                mode: crate::EditFileMode::Edit,
5607            }),
5608            event_stream,
5609            cx,
5610        )
5611    });
5612
5613    let result = task.await;
5614    assert!(result.is_err(), "expected edit to be blocked");
5615    assert!(
5616        result.unwrap_err().to_string().contains("blocked"),
5617        "error should mention the edit was blocked"
5618    );
5619}
5620
5621#[gpui::test]
5622async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5623    init_test(cx);
5624
5625    let fs = FakeFs::new(cx.executor());
5626    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5627        .await;
5628    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5629
5630    cx.update(|cx| {
5631        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5632        settings.tool_permissions.tools.insert(
5633            DeletePathTool::NAME.into(),
5634            agent_settings::ToolRules {
5635                default: Some(settings::ToolPermissionMode::Allow),
5636                always_allow: vec![],
5637                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5638                always_confirm: vec![],
5639                invalid_patterns: vec![],
5640            },
5641        );
5642        agent_settings::AgentSettings::override_global(settings, cx);
5643    });
5644
5645    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5646
5647    #[allow(clippy::arc_with_non_send_sync)]
5648    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5649    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5650
5651    let task = cx.update(|cx| {
5652        tool.run(
5653            ToolInput::resolved(crate::DeletePathToolInput {
5654                path: "root/important_data.txt".to_string(),
5655            }),
5656            event_stream,
5657            cx,
5658        )
5659    });
5660
5661    let result = task.await;
5662    assert!(result.is_err(), "expected deletion to be blocked");
5663    assert!(
5664        result.unwrap_err().contains("blocked"),
5665        "error should mention the deletion was blocked"
5666    );
5667}
5668
5669#[gpui::test]
5670async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5671    init_test(cx);
5672
5673    let fs = FakeFs::new(cx.executor());
5674    fs.insert_tree(
5675        "/root",
5676        json!({
5677            "safe.txt": "content",
5678            "protected": {}
5679        }),
5680    )
5681    .await;
5682    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5683
5684    cx.update(|cx| {
5685        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5686        settings.tool_permissions.tools.insert(
5687            MovePathTool::NAME.into(),
5688            agent_settings::ToolRules {
5689                default: Some(settings::ToolPermissionMode::Allow),
5690                always_allow: vec![],
5691                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5692                always_confirm: vec![],
5693                invalid_patterns: vec![],
5694            },
5695        );
5696        agent_settings::AgentSettings::override_global(settings, cx);
5697    });
5698
5699    #[allow(clippy::arc_with_non_send_sync)]
5700    let tool = Arc::new(crate::MovePathTool::new(project));
5701    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5702
5703    let task = cx.update(|cx| {
5704        tool.run(
5705            ToolInput::resolved(crate::MovePathToolInput {
5706                source_path: "root/safe.txt".to_string(),
5707                destination_path: "root/protected/safe.txt".to_string(),
5708            }),
5709            event_stream,
5710            cx,
5711        )
5712    });
5713
5714    let result = task.await;
5715    assert!(
5716        result.is_err(),
5717        "expected move to be blocked due to destination path"
5718    );
5719    assert!(
5720        result.unwrap_err().contains("blocked"),
5721        "error should mention the move was blocked"
5722    );
5723}
5724
5725#[gpui::test]
5726async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5727    init_test(cx);
5728
5729    let fs = FakeFs::new(cx.executor());
5730    fs.insert_tree(
5731        "/root",
5732        json!({
5733            "secret.txt": "secret content",
5734            "public": {}
5735        }),
5736    )
5737    .await;
5738    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5739
5740    cx.update(|cx| {
5741        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5742        settings.tool_permissions.tools.insert(
5743            MovePathTool::NAME.into(),
5744            agent_settings::ToolRules {
5745                default: Some(settings::ToolPermissionMode::Allow),
5746                always_allow: vec![],
5747                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5748                always_confirm: vec![],
5749                invalid_patterns: vec![],
5750            },
5751        );
5752        agent_settings::AgentSettings::override_global(settings, cx);
5753    });
5754
5755    #[allow(clippy::arc_with_non_send_sync)]
5756    let tool = Arc::new(crate::MovePathTool::new(project));
5757    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5758
5759    let task = cx.update(|cx| {
5760        tool.run(
5761            ToolInput::resolved(crate::MovePathToolInput {
5762                source_path: "root/secret.txt".to_string(),
5763                destination_path: "root/public/not_secret.txt".to_string(),
5764            }),
5765            event_stream,
5766            cx,
5767        )
5768    });
5769
5770    let result = task.await;
5771    assert!(
5772        result.is_err(),
5773        "expected move to be blocked due to source path"
5774    );
5775    assert!(
5776        result.unwrap_err().contains("blocked"),
5777        "error should mention the move was blocked"
5778    );
5779}
5780
5781#[gpui::test]
5782async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5783    init_test(cx);
5784
5785    let fs = FakeFs::new(cx.executor());
5786    fs.insert_tree(
5787        "/root",
5788        json!({
5789            "confidential.txt": "confidential data",
5790            "dest": {}
5791        }),
5792    )
5793    .await;
5794    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5795
5796    cx.update(|cx| {
5797        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5798        settings.tool_permissions.tools.insert(
5799            CopyPathTool::NAME.into(),
5800            agent_settings::ToolRules {
5801                default: Some(settings::ToolPermissionMode::Allow),
5802                always_allow: vec![],
5803                always_deny: vec![
5804                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5805                ],
5806                always_confirm: vec![],
5807                invalid_patterns: vec![],
5808            },
5809        );
5810        agent_settings::AgentSettings::override_global(settings, cx);
5811    });
5812
5813    #[allow(clippy::arc_with_non_send_sync)]
5814    let tool = Arc::new(crate::CopyPathTool::new(project));
5815    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5816
5817    let task = cx.update(|cx| {
5818        tool.run(
5819            ToolInput::resolved(crate::CopyPathToolInput {
5820                source_path: "root/confidential.txt".to_string(),
5821                destination_path: "root/dest/copy.txt".to_string(),
5822            }),
5823            event_stream,
5824            cx,
5825        )
5826    });
5827
5828    let result = task.await;
5829    assert!(result.is_err(), "expected copy to be blocked");
5830    assert!(
5831        result.unwrap_err().contains("blocked"),
5832        "error should mention the copy was blocked"
5833    );
5834}
5835
5836#[gpui::test]
5837async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5838    init_test(cx);
5839
5840    let fs = FakeFs::new(cx.executor());
5841    fs.insert_tree(
5842        "/root",
5843        json!({
5844            "normal.txt": "normal content",
5845            "readonly": {
5846                "config.txt": "readonly content"
5847            }
5848        }),
5849    )
5850    .await;
5851    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5852
5853    cx.update(|cx| {
5854        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5855        settings.tool_permissions.tools.insert(
5856            SaveFileTool::NAME.into(),
5857            agent_settings::ToolRules {
5858                default: Some(settings::ToolPermissionMode::Allow),
5859                always_allow: vec![],
5860                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
5861                always_confirm: vec![],
5862                invalid_patterns: vec![],
5863            },
5864        );
5865        agent_settings::AgentSettings::override_global(settings, cx);
5866    });
5867
5868    #[allow(clippy::arc_with_non_send_sync)]
5869    let tool = Arc::new(crate::SaveFileTool::new(project));
5870    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5871
5872    let task = cx.update(|cx| {
5873        tool.run(
5874            ToolInput::resolved(crate::SaveFileToolInput {
5875                paths: vec![
5876                    std::path::PathBuf::from("root/normal.txt"),
5877                    std::path::PathBuf::from("root/readonly/config.txt"),
5878                ],
5879            }),
5880            event_stream,
5881            cx,
5882        )
5883    });
5884
5885    let result = task.await;
5886    assert!(
5887        result.is_err(),
5888        "expected save to be blocked due to denied path"
5889    );
5890    assert!(
5891        result.unwrap_err().contains("blocked"),
5892        "error should mention the save was blocked"
5893    );
5894}
5895
5896#[gpui::test]
5897async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
5898    init_test(cx);
5899
5900    let fs = FakeFs::new(cx.executor());
5901    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
5902        .await;
5903    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5904
5905    cx.update(|cx| {
5906        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5907        settings.tool_permissions.tools.insert(
5908            SaveFileTool::NAME.into(),
5909            agent_settings::ToolRules {
5910                default: Some(settings::ToolPermissionMode::Allow),
5911                always_allow: vec![],
5912                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
5913                always_confirm: vec![],
5914                invalid_patterns: vec![],
5915            },
5916        );
5917        agent_settings::AgentSettings::override_global(settings, cx);
5918    });
5919
5920    #[allow(clippy::arc_with_non_send_sync)]
5921    let tool = Arc::new(crate::SaveFileTool::new(project));
5922    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5923
5924    let task = cx.update(|cx| {
5925        tool.run(
5926            ToolInput::resolved(crate::SaveFileToolInput {
5927                paths: vec![std::path::PathBuf::from("root/config.secret")],
5928            }),
5929            event_stream,
5930            cx,
5931        )
5932    });
5933
5934    let result = task.await;
5935    assert!(result.is_err(), "expected save to be blocked");
5936    assert!(
5937        result.unwrap_err().contains("blocked"),
5938        "error should mention the save was blocked"
5939    );
5940}
5941
5942#[gpui::test]
5943async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
5944    init_test(cx);
5945
5946    cx.update(|cx| {
5947        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5948        settings.tool_permissions.tools.insert(
5949            WebSearchTool::NAME.into(),
5950            agent_settings::ToolRules {
5951                default: Some(settings::ToolPermissionMode::Allow),
5952                always_allow: vec![],
5953                always_deny: vec![
5954                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
5955                ],
5956                always_confirm: vec![],
5957                invalid_patterns: vec![],
5958            },
5959        );
5960        agent_settings::AgentSettings::override_global(settings, cx);
5961    });
5962
5963    #[allow(clippy::arc_with_non_send_sync)]
5964    let tool = Arc::new(crate::WebSearchTool);
5965    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5966
5967    let input: crate::WebSearchToolInput =
5968        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
5969
5970    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
5971
5972    let result = task.await;
5973    assert!(result.is_err(), "expected search to be blocked");
5974    match result.unwrap_err() {
5975        crate::WebSearchToolOutput::Error { error } => {
5976            assert!(
5977                error.contains("blocked"),
5978                "error should mention the search was blocked"
5979            );
5980        }
5981        other => panic!("expected Error variant, got: {other:?}"),
5982    }
5983}
5984
5985#[gpui::test]
5986async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
5987    init_test(cx);
5988
5989    let fs = FakeFs::new(cx.executor());
5990    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
5991        .await;
5992    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5993
5994    cx.update(|cx| {
5995        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5996        settings.tool_permissions.tools.insert(
5997            EditFileTool::NAME.into(),
5998            agent_settings::ToolRules {
5999                default: Some(settings::ToolPermissionMode::Confirm),
6000                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6001                always_deny: vec![],
6002                always_confirm: vec![],
6003                invalid_patterns: vec![],
6004            },
6005        );
6006        agent_settings::AgentSettings::override_global(settings, cx);
6007    });
6008
6009    let context_server_registry =
6010        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6011    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6012    let templates = crate::Templates::new();
6013    let thread = cx.new(|cx| {
6014        crate::Thread::new(
6015            project.clone(),
6016            cx.new(|_cx| prompt_store::ProjectContext::default()),
6017            context_server_registry,
6018            templates.clone(),
6019            None,
6020            cx,
6021        )
6022    });
6023
6024    #[allow(clippy::arc_with_non_send_sync)]
6025    let tool = Arc::new(crate::EditFileTool::new(
6026        project,
6027        thread.downgrade(),
6028        language_registry,
6029        templates,
6030    ));
6031    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6032
6033    let _task = cx.update(|cx| {
6034        tool.run(
6035            ToolInput::resolved(crate::EditFileToolInput {
6036                display_description: "Edit README".to_string(),
6037                path: "root/README.md".into(),
6038                mode: crate::EditFileMode::Edit,
6039            }),
6040            event_stream,
6041            cx,
6042        )
6043    });
6044
6045    cx.run_until_parked();
6046
6047    let event = rx.try_next();
6048    assert!(
6049        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6050        "expected no authorization request for allowed .md file"
6051    );
6052}
6053
6054#[gpui::test]
6055async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6056    init_test(cx);
6057
6058    let fs = FakeFs::new(cx.executor());
6059    fs.insert_tree(
6060        "/root",
6061        json!({
6062            ".zed": {
6063                "settings.json": "{}"
6064            },
6065            "README.md": "# Hello"
6066        }),
6067    )
6068    .await;
6069    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6070
6071    cx.update(|cx| {
6072        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6073        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6074        agent_settings::AgentSettings::override_global(settings, cx);
6075    });
6076
6077    let context_server_registry =
6078        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6079    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6080    let templates = crate::Templates::new();
6081    let thread = cx.new(|cx| {
6082        crate::Thread::new(
6083            project.clone(),
6084            cx.new(|_cx| prompt_store::ProjectContext::default()),
6085            context_server_registry,
6086            templates.clone(),
6087            None,
6088            cx,
6089        )
6090    });
6091
6092    #[allow(clippy::arc_with_non_send_sync)]
6093    let tool = Arc::new(crate::EditFileTool::new(
6094        project,
6095        thread.downgrade(),
6096        language_registry,
6097        templates,
6098    ));
6099
6100    // Editing a file inside .zed/ should still prompt even with global default: allow,
6101    // because local settings paths are sensitive and require confirmation regardless.
6102    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6103    let _task = cx.update(|cx| {
6104        tool.run(
6105            ToolInput::resolved(crate::EditFileToolInput {
6106                display_description: "Edit local settings".to_string(),
6107                path: "root/.zed/settings.json".into(),
6108                mode: crate::EditFileMode::Edit,
6109            }),
6110            event_stream,
6111            cx,
6112        )
6113    });
6114
6115    let _update = rx.expect_update_fields().await;
6116    let _auth = rx.expect_authorization().await;
6117}
6118
6119#[gpui::test]
6120async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6121    init_test(cx);
6122
6123    cx.update(|cx| {
6124        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6125        settings.tool_permissions.tools.insert(
6126            FetchTool::NAME.into(),
6127            agent_settings::ToolRules {
6128                default: Some(settings::ToolPermissionMode::Allow),
6129                always_allow: vec![],
6130                always_deny: vec![
6131                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6132                ],
6133                always_confirm: vec![],
6134                invalid_patterns: vec![],
6135            },
6136        );
6137        agent_settings::AgentSettings::override_global(settings, cx);
6138    });
6139
6140    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6141
6142    #[allow(clippy::arc_with_non_send_sync)]
6143    let tool = Arc::new(crate::FetchTool::new(http_client));
6144    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6145
6146    let input: crate::FetchToolInput =
6147        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6148
6149    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6150
6151    let result = task.await;
6152    assert!(result.is_err(), "expected fetch to be blocked");
6153    assert!(
6154        result.unwrap_err().contains("blocked"),
6155        "error should mention the fetch was blocked"
6156    );
6157}
6158
6159#[gpui::test]
6160async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6161    init_test(cx);
6162
6163    cx.update(|cx| {
6164        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6165        settings.tool_permissions.tools.insert(
6166            FetchTool::NAME.into(),
6167            agent_settings::ToolRules {
6168                default: Some(settings::ToolPermissionMode::Confirm),
6169                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6170                always_deny: vec![],
6171                always_confirm: vec![],
6172                invalid_patterns: vec![],
6173            },
6174        );
6175        agent_settings::AgentSettings::override_global(settings, cx);
6176    });
6177
6178    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6179
6180    #[allow(clippy::arc_with_non_send_sync)]
6181    let tool = Arc::new(crate::FetchTool::new(http_client));
6182    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6183
6184    let input: crate::FetchToolInput =
6185        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6186
6187    let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6188
6189    cx.run_until_parked();
6190
6191    let event = rx.try_next();
6192    assert!(
6193        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6194        "expected no authorization request for allowed docs.rs URL"
6195    );
6196}
6197
6198#[gpui::test]
6199async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6200    init_test(cx);
6201    always_allow_tools(cx);
6202
6203    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6204    let fake_model = model.as_fake();
6205
6206    // Add a tool so we can simulate tool calls
6207    thread.update(cx, |thread, _cx| {
6208        thread.add_tool(EchoTool);
6209    });
6210
6211    // Start a turn by sending a message
6212    let mut events = thread
6213        .update(cx, |thread, cx| {
6214            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6215        })
6216        .unwrap();
6217    cx.run_until_parked();
6218
6219    // Simulate the model making a tool call
6220    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6221        LanguageModelToolUse {
6222            id: "tool_1".into(),
6223            name: "echo".into(),
6224            raw_input: r#"{"text": "hello"}"#.into(),
6225            input: json!({"text": "hello"}),
6226            is_input_complete: true,
6227            thought_signature: None,
6228        },
6229    ));
6230    fake_model
6231        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6232
6233    // Signal that a message is queued before ending the stream
6234    thread.update(cx, |thread, _cx| {
6235        thread.set_has_queued_message(true);
6236    });
6237
6238    // Now end the stream - tool will run, and the boundary check should see the queue
6239    fake_model.end_last_completion_stream();
6240
6241    // Collect all events until the turn stops
6242    let all_events = collect_events_until_stop(&mut events, cx).await;
6243
6244    // Verify we received the tool call event
6245    let tool_call_ids: Vec<_> = all_events
6246        .iter()
6247        .filter_map(|e| match e {
6248            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6249            _ => None,
6250        })
6251        .collect();
6252    assert_eq!(
6253        tool_call_ids,
6254        vec!["tool_1"],
6255        "Should have received a tool call event for our echo tool"
6256    );
6257
6258    // The turn should have stopped with EndTurn
6259    let stop_reasons = stop_events(all_events);
6260    assert_eq!(
6261        stop_reasons,
6262        vec![acp::StopReason::EndTurn],
6263        "Turn should have ended after tool completion due to queued message"
6264    );
6265
6266    // Verify the queued message flag is still set
6267    thread.update(cx, |thread, _cx| {
6268        assert!(
6269            thread.has_queued_message(),
6270            "Should still have queued message flag set"
6271        );
6272    });
6273
6274    // Thread should be idle now
6275    thread.update(cx, |thread, _cx| {
6276        assert!(
6277            thread.is_turn_complete(),
6278            "Thread should not be running after turn ends"
6279        );
6280    });
6281}
6282
6283#[gpui::test]
6284async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6285    init_test(cx);
6286    always_allow_tools(cx);
6287
6288    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6289    let fake_model = model.as_fake();
6290
6291    thread.update(cx, |thread, _cx| {
6292        thread.add_tool(StreamingFailingEchoTool {
6293            receive_chunks_until_failure: 1,
6294        });
6295    });
6296
6297    let _events = thread
6298        .update(cx, |thread, cx| {
6299            thread.send(
6300                UserMessageId::new(),
6301                ["Use the streaming_failing_echo tool"],
6302                cx,
6303            )
6304        })
6305        .unwrap();
6306    cx.run_until_parked();
6307
6308    let tool_use = LanguageModelToolUse {
6309        id: "call_1".into(),
6310        name: StreamingFailingEchoTool::NAME.into(),
6311        raw_input: "hello".into(),
6312        input: json!({}),
6313        is_input_complete: false,
6314        thought_signature: None,
6315    };
6316
6317    fake_model
6318        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6319
6320    cx.run_until_parked();
6321
6322    let completions = fake_model.pending_completions();
6323    let last_completion = completions.last().unwrap();
6324
6325    assert_eq!(
6326        last_completion.messages[1..],
6327        vec![
6328            LanguageModelRequestMessage {
6329                role: Role::User,
6330                content: vec!["Use the streaming_failing_echo tool".into()],
6331                cache: false,
6332                reasoning_details: None,
6333            },
6334            LanguageModelRequestMessage {
6335                role: Role::Assistant,
6336                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6337                cache: false,
6338                reasoning_details: None,
6339            },
6340            LanguageModelRequestMessage {
6341                role: Role::User,
6342                content: vec![language_model::MessageContent::ToolResult(
6343                    LanguageModelToolResult {
6344                        tool_use_id: tool_use.id.clone(),
6345                        tool_name: tool_use.name,
6346                        is_error: true,
6347                        content: "failed".into(),
6348                        output: Some("failed".into()),
6349                    }
6350                )],
6351                cache: true,
6352                reasoning_details: None,
6353            },
6354        ]
6355    );
6356}
6357
6358#[gpui::test]
6359async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6360    init_test(cx);
6361    always_allow_tools(cx);
6362
6363    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6364    let fake_model = model.as_fake();
6365
6366    let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6367        oneshot::channel();
6368
6369    thread.update(cx, |thread, _cx| {
6370        thread.add_tool(
6371            StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6372        );
6373        thread.add_tool(StreamingFailingEchoTool {
6374            receive_chunks_until_failure: 1,
6375        });
6376    });
6377
6378    let _events = thread
6379        .update(cx, |thread, cx| {
6380            thread.send(
6381                UserMessageId::new(),
6382                ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6383                cx,
6384            )
6385        })
6386        .unwrap();
6387    cx.run_until_parked();
6388
6389    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6390        LanguageModelToolUse {
6391            id: "call_1".into(),
6392            name: StreamingEchoTool::NAME.into(),
6393            raw_input: "hello".into(),
6394            input: json!({ "text": "hello" }),
6395            is_input_complete: false,
6396            thought_signature: None,
6397        },
6398    ));
6399    let first_tool_use = LanguageModelToolUse {
6400        id: "call_1".into(),
6401        name: StreamingEchoTool::NAME.into(),
6402        raw_input: "hello world".into(),
6403        input: json!({ "text": "hello world" }),
6404        is_input_complete: true,
6405        thought_signature: None,
6406    };
6407    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6408        first_tool_use.clone(),
6409    ));
6410    let second_tool_use = LanguageModelToolUse {
6411        name: StreamingFailingEchoTool::NAME.into(),
6412        raw_input: "hello".into(),
6413        input: json!({ "text": "hello" }),
6414        is_input_complete: false,
6415        thought_signature: None,
6416        id: "call_2".into(),
6417    };
6418    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6419        second_tool_use.clone(),
6420    ));
6421
6422    cx.run_until_parked();
6423
6424    complete_streaming_echo_tool_call_tx.send(()).unwrap();
6425
6426    cx.run_until_parked();
6427
6428    let completions = fake_model.pending_completions();
6429    let last_completion = completions.last().unwrap();
6430
6431    assert_eq!(
6432        last_completion.messages[1..],
6433        vec![
6434            LanguageModelRequestMessage {
6435                role: Role::User,
6436                content: vec![
6437                    "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6438                ],
6439                cache: false,
6440                reasoning_details: None,
6441            },
6442            LanguageModelRequestMessage {
6443                role: Role::Assistant,
6444                content: vec![
6445                    language_model::MessageContent::ToolUse(first_tool_use.clone()),
6446                    language_model::MessageContent::ToolUse(second_tool_use.clone())
6447                ],
6448                cache: false,
6449                reasoning_details: None,
6450            },
6451            LanguageModelRequestMessage {
6452                role: Role::User,
6453                content: vec![
6454                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6455                        tool_use_id: second_tool_use.id.clone(),
6456                        tool_name: second_tool_use.name,
6457                        is_error: true,
6458                        content: "failed".into(),
6459                        output: Some("failed".into()),
6460                    }),
6461                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6462                        tool_use_id: first_tool_use.id.clone(),
6463                        tool_name: first_tool_use.name,
6464                        is_error: false,
6465                        content: "hello world".into(),
6466                        output: Some("hello world".into()),
6467                    }),
6468                ],
6469                cache: true,
6470                reasoning_details: None,
6471            },
6472        ]
6473    );
6474}