mod.rs

   1use super::*;
   2use acp_thread::{
   3    AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
   4    UserMessageId,
   5};
   6use agent_client_protocol::{self as acp};
   7use agent_settings::AgentProfileId;
   8use anyhow::Result;
   9use client::{Client, RefreshLlmTokenListener, UserStore};
  10use collections::IndexMap;
  11use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  12use feature_flags::FeatureFlagAppExt as _;
  13use fs::{FakeFs, Fs};
  14use futures::{
  15    FutureExt as _, StreamExt,
  16    channel::{
  17        mpsc::{self, UnboundedReceiver},
  18        oneshot,
  19    },
  20    future::{Fuse, Shared},
  21};
  22use gpui::{
  23    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  24    http_client::FakeHttpClient,
  25};
  26use indoc::indoc;
  27use language_model::{
  28    CompletionIntent, LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent,
  29    LanguageModelId, LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  30    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  31    LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
  32    fake_provider::FakeLanguageModel,
  33};
  34use pretty_assertions::assert_eq;
  35use project::{
  36    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  37};
  38use prompt_store::ProjectContext;
  39use reqwest_client::ReqwestClient;
  40use schemars::JsonSchema;
  41use serde::{Deserialize, Serialize};
  42use serde_json::json;
  43use settings::{Settings, SettingsStore};
  44use std::{
  45    path::Path,
  46    pin::Pin,
  47    rc::Rc,
  48    sync::{
  49        Arc,
  50        atomic::{AtomicBool, AtomicUsize, Ordering},
  51    },
  52    time::Duration,
  53};
  54use util::path;
  55
  56mod edit_file_thread_test;
  57mod test_tools;
  58use test_tools::*;
  59
  60pub(crate) fn init_test(cx: &mut TestAppContext) {
  61    cx.update(|cx| {
  62        let settings_store = SettingsStore::test(cx);
  63        cx.set_global(settings_store);
  64    });
  65}
  66
  67pub(crate) struct FakeTerminalHandle {
  68    killed: Arc<AtomicBool>,
  69    stopped_by_user: Arc<AtomicBool>,
  70    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  71    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  72    output: acp::TerminalOutputResponse,
  73    id: acp::TerminalId,
  74}
  75
  76impl FakeTerminalHandle {
  77    pub(crate) fn new_never_exits(cx: &mut App) -> Self {
  78        let killed = Arc::new(AtomicBool::new(false));
  79        let stopped_by_user = Arc::new(AtomicBool::new(false));
  80
  81        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  82
  83        let wait_for_exit = cx
  84            .spawn(async move |_cx| {
  85                // Wait for the exit signal (sent when kill() is called)
  86                let _ = exit_receiver.await;
  87                acp::TerminalExitStatus::new()
  88            })
  89            .shared();
  90
  91        Self {
  92            killed,
  93            stopped_by_user,
  94            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  95            wait_for_exit,
  96            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  97            id: acp::TerminalId::new("fake_terminal".to_string()),
  98        }
  99    }
 100
 101    pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
 102        let killed = Arc::new(AtomicBool::new(false));
 103        let stopped_by_user = Arc::new(AtomicBool::new(false));
 104        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 105
 106        let wait_for_exit = cx
 107            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 108            .shared();
 109
 110        Self {
 111            killed,
 112            stopped_by_user,
 113            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 114            wait_for_exit,
 115            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 116            id: acp::TerminalId::new("fake_terminal".to_string()),
 117        }
 118    }
 119
 120    pub(crate) fn was_killed(&self) -> bool {
 121        self.killed.load(Ordering::SeqCst)
 122    }
 123
 124    pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
 125        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 126    }
 127
 128    pub(crate) fn signal_exit(&self) {
 129        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 130            let _ = sender.send(());
 131        }
 132    }
 133}
 134
 135impl crate::TerminalHandle for FakeTerminalHandle {
 136    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 137        Ok(self.id.clone())
 138    }
 139
 140    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 141        Ok(self.output.clone())
 142    }
 143
 144    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 145        Ok(self.wait_for_exit.clone())
 146    }
 147
 148    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 149        self.killed.store(true, Ordering::SeqCst);
 150        self.signal_exit();
 151        Ok(())
 152    }
 153
 154    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 155        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 156    }
 157}
 158
 159struct FakeSubagentHandle {
 160    thread_id: acp_thread::ThreadId,
 161    session_id: acp::SessionId,
 162    send_task: Shared<Task<String>>,
 163}
 164
 165impl SubagentHandle for FakeSubagentHandle {
 166    fn id(&self) -> acp_thread::ThreadId {
 167        self.thread_id
 168    }
 169
 170    fn session_id(&self, _cx: &App) -> acp::SessionId {
 171        self.session_id.clone()
 172    }
 173
 174    fn num_entries(&self, _cx: &App) -> usize {
 175        unimplemented!()
 176    }
 177
 178    fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
 179        let task = self.send_task.clone();
 180        cx.background_spawn(async move { Ok(task.await) })
 181    }
 182}
 183
 184#[derive(Default)]
 185pub(crate) struct FakeThreadEnvironment {
 186    terminal_handle: Option<Rc<FakeTerminalHandle>>,
 187    subagent_handle: Option<Rc<FakeSubagentHandle>>,
 188    terminal_creations: Arc<AtomicUsize>,
 189}
 190
 191impl FakeThreadEnvironment {
 192    pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
 193        Self {
 194            terminal_handle: Some(terminal_handle.into()),
 195            ..self
 196        }
 197    }
 198
 199    pub(crate) fn terminal_creation_count(&self) -> usize {
 200        self.terminal_creations.load(Ordering::SeqCst)
 201    }
 202}
 203
 204impl crate::ThreadEnvironment for FakeThreadEnvironment {
 205    fn create_terminal(
 206        &self,
 207        _command: String,
 208        _cwd: Option<std::path::PathBuf>,
 209        _output_byte_limit: Option<u64>,
 210        _cx: &mut AsyncApp,
 211    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 212        self.terminal_creations.fetch_add(1, Ordering::SeqCst);
 213        let handle = self
 214            .terminal_handle
 215            .clone()
 216            .expect("Terminal handle not available on FakeThreadEnvironment");
 217        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 218    }
 219
 220    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 221        Ok(self
 222            .subagent_handle
 223            .clone()
 224            .expect("Subagent handle not available on FakeThreadEnvironment")
 225            as Rc<dyn SubagentHandle>)
 226    }
 227}
 228
 229/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 230struct MultiTerminalEnvironment {
 231    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 232}
 233
 234impl MultiTerminalEnvironment {
 235    fn new() -> Self {
 236        Self {
 237            handles: std::cell::RefCell::new(Vec::new()),
 238        }
 239    }
 240
 241    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 242        self.handles.borrow().clone()
 243    }
 244}
 245
 246impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 247    fn create_terminal(
 248        &self,
 249        _command: String,
 250        _cwd: Option<std::path::PathBuf>,
 251        _output_byte_limit: Option<u64>,
 252        cx: &mut AsyncApp,
 253    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 254        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 255        self.handles.borrow_mut().push(handle.clone());
 256        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 257    }
 258
 259    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 260        unimplemented!()
 261    }
 262}
 263
 264fn always_allow_tools(cx: &mut TestAppContext) {
 265    cx.update(|cx| {
 266        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 267        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
 268        agent_settings::AgentSettings::override_global(settings, cx);
 269    });
 270}
 271
 272#[gpui::test]
 273async fn test_echo(cx: &mut TestAppContext) {
 274    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 275    let fake_model = model.as_fake();
 276
 277    let events = thread
 278        .update(cx, |thread, cx| {
 279            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 280        })
 281        .unwrap();
 282    cx.run_until_parked();
 283    fake_model.send_last_completion_stream_text_chunk("Hello");
 284    fake_model
 285        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 286    fake_model.end_last_completion_stream();
 287
 288    let events = events.collect().await;
 289    thread.update(cx, |thread, _cx| {
 290        assert_eq!(
 291            thread.last_received_or_pending_message().unwrap().role(),
 292            Role::Assistant
 293        );
 294        assert_eq!(
 295            thread
 296                .last_received_or_pending_message()
 297                .unwrap()
 298                .to_markdown(),
 299            "Hello\n"
 300        )
 301    });
 302    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 303}
 304
 305#[gpui::test]
 306async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 307    init_test(cx);
 308    always_allow_tools(cx);
 309
 310    let fs = FakeFs::new(cx.executor());
 311    let project = Project::test(fs, [], cx).await;
 312
 313    let environment = Rc::new(cx.update(|cx| {
 314        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 315    }));
 316    let handle = environment.terminal_handle.clone().unwrap();
 317
 318    #[allow(clippy::arc_with_non_send_sync)]
 319    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 320    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 321
 322    let task = cx.update(|cx| {
 323        tool.run(
 324            ToolInput::resolved(crate::TerminalToolInput {
 325                command: "sleep 1000".to_string(),
 326                cd: ".".to_string(),
 327                timeout_ms: Some(5),
 328            }),
 329            event_stream,
 330            cx,
 331        )
 332    });
 333
 334    let update = rx.expect_update_fields().await;
 335    assert!(
 336        update.content.iter().any(|blocks| {
 337            blocks
 338                .iter()
 339                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 340        }),
 341        "expected tool call update to include terminal content"
 342    );
 343
 344    let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
 345
 346    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 347    loop {
 348        if let Some(result) = task_future.as_mut().now_or_never() {
 349            let result = result.expect("terminal tool task should complete");
 350
 351            assert!(
 352                handle.was_killed(),
 353                "expected terminal handle to be killed on timeout"
 354            );
 355            assert!(
 356                result.contains("partial output"),
 357                "expected result to include terminal output, got: {result}"
 358            );
 359            return;
 360        }
 361
 362        if std::time::Instant::now() >= deadline {
 363            panic!("timed out waiting for terminal tool task to complete");
 364        }
 365
 366        cx.run_until_parked();
 367        cx.background_executor.timer(Duration::from_millis(1)).await;
 368    }
 369}
 370
 371#[gpui::test]
 372#[ignore]
 373async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 374    init_test(cx);
 375    always_allow_tools(cx);
 376
 377    let fs = FakeFs::new(cx.executor());
 378    let project = Project::test(fs, [], cx).await;
 379
 380    let environment = Rc::new(cx.update(|cx| {
 381        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 382    }));
 383    let handle = environment.terminal_handle.clone().unwrap();
 384
 385    #[allow(clippy::arc_with_non_send_sync)]
 386    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 387    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 388
 389    let _task = cx.update(|cx| {
 390        tool.run(
 391            ToolInput::resolved(crate::TerminalToolInput {
 392                command: "sleep 1000".to_string(),
 393                cd: ".".to_string(),
 394                timeout_ms: None,
 395            }),
 396            event_stream,
 397            cx,
 398        )
 399    });
 400
 401    let update = rx.expect_update_fields().await;
 402    assert!(
 403        update.content.iter().any(|blocks| {
 404            blocks
 405                .iter()
 406                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 407        }),
 408        "expected tool call update to include terminal content"
 409    );
 410
 411    cx.background_executor
 412        .timer(Duration::from_millis(25))
 413        .await;
 414
 415    assert!(
 416        !handle.was_killed(),
 417        "did not expect terminal handle to be killed without a timeout"
 418    );
 419}
 420
 421#[gpui::test]
 422async fn test_thinking(cx: &mut TestAppContext) {
 423    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 424    let fake_model = model.as_fake();
 425
 426    let events = thread
 427        .update(cx, |thread, cx| {
 428            thread.send(
 429                UserMessageId::new(),
 430                [indoc! {"
 431                    Testing:
 432
 433                    Generate a thinking step where you just think the word 'Think',
 434                    and have your final answer be 'Hello'
 435                "}],
 436                cx,
 437            )
 438        })
 439        .unwrap();
 440    cx.run_until_parked();
 441    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 442        text: "Think".to_string(),
 443        signature: None,
 444    });
 445    fake_model.send_last_completion_stream_text_chunk("Hello");
 446    fake_model
 447        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 448    fake_model.end_last_completion_stream();
 449
 450    let events = events.collect().await;
 451    thread.update(cx, |thread, _cx| {
 452        assert_eq!(
 453            thread.last_received_or_pending_message().unwrap().role(),
 454            Role::Assistant
 455        );
 456        assert_eq!(
 457            thread
 458                .last_received_or_pending_message()
 459                .unwrap()
 460                .to_markdown(),
 461            indoc! {"
 462                <think>Think</think>
 463                Hello
 464            "}
 465        )
 466    });
 467    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 468}
 469
 470#[gpui::test]
 471async fn test_system_prompt(cx: &mut TestAppContext) {
 472    let ThreadTest {
 473        model,
 474        thread,
 475        project_context,
 476        ..
 477    } = setup(cx, TestModel::Fake).await;
 478    let fake_model = model.as_fake();
 479
 480    project_context.update(cx, |project_context, _cx| {
 481        project_context.shell = "test-shell".into()
 482    });
 483    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 484    thread
 485        .update(cx, |thread, cx| {
 486            thread.send(UserMessageId::new(), ["abc"], cx)
 487        })
 488        .unwrap();
 489    cx.run_until_parked();
 490    let mut pending_completions = fake_model.pending_completions();
 491    assert_eq!(
 492        pending_completions.len(),
 493        1,
 494        "unexpected pending completions: {:?}",
 495        pending_completions
 496    );
 497
 498    let pending_completion = pending_completions.pop().unwrap();
 499    assert_eq!(pending_completion.messages[0].role, Role::System);
 500
 501    let system_message = &pending_completion.messages[0];
 502    let system_prompt = system_message.content[0].to_str().unwrap();
 503    assert!(
 504        system_prompt.contains("test-shell"),
 505        "unexpected system message: {:?}",
 506        system_message
 507    );
 508    assert!(
 509        system_prompt.contains("## Fixing Diagnostics"),
 510        "unexpected system message: {:?}",
 511        system_message
 512    );
 513}
 514
 515#[gpui::test]
 516async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 517    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 518    let fake_model = model.as_fake();
 519
 520    thread
 521        .update(cx, |thread, cx| {
 522            thread.send(UserMessageId::new(), ["abc"], cx)
 523        })
 524        .unwrap();
 525    cx.run_until_parked();
 526    let mut pending_completions = fake_model.pending_completions();
 527    assert_eq!(
 528        pending_completions.len(),
 529        1,
 530        "unexpected pending completions: {:?}",
 531        pending_completions
 532    );
 533
 534    let pending_completion = pending_completions.pop().unwrap();
 535    assert_eq!(pending_completion.messages[0].role, Role::System);
 536
 537    let system_message = &pending_completion.messages[0];
 538    let system_prompt = system_message.content[0].to_str().unwrap();
 539    assert!(
 540        !system_prompt.contains("## Tool Use"),
 541        "unexpected system message: {:?}",
 542        system_message
 543    );
 544    assert!(
 545        !system_prompt.contains("## Fixing Diagnostics"),
 546        "unexpected system message: {:?}",
 547        system_message
 548    );
 549}
 550
 551#[gpui::test]
 552async fn test_prompt_caching(cx: &mut TestAppContext) {
 553    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 554    let fake_model = model.as_fake();
 555
 556    // Send initial user message and verify it's cached
 557    thread
 558        .update(cx, |thread, cx| {
 559            thread.send(UserMessageId::new(), ["Message 1"], cx)
 560        })
 561        .unwrap();
 562    cx.run_until_parked();
 563
 564    let completion = fake_model.pending_completions().pop().unwrap();
 565    assert_eq!(
 566        completion.messages[1..],
 567        vec![LanguageModelRequestMessage {
 568            role: Role::User,
 569            content: vec!["Message 1".into()],
 570            cache: true,
 571            reasoning_details: None,
 572        }]
 573    );
 574    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 575        "Response to Message 1".into(),
 576    ));
 577    fake_model.end_last_completion_stream();
 578    cx.run_until_parked();
 579
 580    // Send another user message and verify only the latest is cached
 581    thread
 582        .update(cx, |thread, cx| {
 583            thread.send(UserMessageId::new(), ["Message 2"], cx)
 584        })
 585        .unwrap();
 586    cx.run_until_parked();
 587
 588    let completion = fake_model.pending_completions().pop().unwrap();
 589    assert_eq!(
 590        completion.messages[1..],
 591        vec![
 592            LanguageModelRequestMessage {
 593                role: Role::User,
 594                content: vec!["Message 1".into()],
 595                cache: false,
 596                reasoning_details: None,
 597            },
 598            LanguageModelRequestMessage {
 599                role: Role::Assistant,
 600                content: vec!["Response to Message 1".into()],
 601                cache: false,
 602                reasoning_details: None,
 603            },
 604            LanguageModelRequestMessage {
 605                role: Role::User,
 606                content: vec!["Message 2".into()],
 607                cache: true,
 608                reasoning_details: None,
 609            }
 610        ]
 611    );
 612    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 613        "Response to Message 2".into(),
 614    ));
 615    fake_model.end_last_completion_stream();
 616    cx.run_until_parked();
 617
 618    // Simulate a tool call and verify that the latest tool result is cached
 619    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 620    thread
 621        .update(cx, |thread, cx| {
 622            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 623        })
 624        .unwrap();
 625    cx.run_until_parked();
 626
 627    let tool_use = LanguageModelToolUse {
 628        id: "tool_1".into(),
 629        name: EchoTool::NAME.into(),
 630        raw_input: json!({"text": "test"}).to_string(),
 631        input: json!({"text": "test"}),
 632        is_input_complete: true,
 633        thought_signature: None,
 634    };
 635    fake_model
 636        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 637    fake_model.end_last_completion_stream();
 638    cx.run_until_parked();
 639
 640    let completion = fake_model.pending_completions().pop().unwrap();
 641    let tool_result = LanguageModelToolResult {
 642        tool_use_id: "tool_1".into(),
 643        tool_name: EchoTool::NAME.into(),
 644        is_error: false,
 645        content: "test".into(),
 646        output: Some("test".into()),
 647    };
 648    assert_eq!(
 649        completion.messages[1..],
 650        vec![
 651            LanguageModelRequestMessage {
 652                role: Role::User,
 653                content: vec!["Message 1".into()],
 654                cache: false,
 655                reasoning_details: None,
 656            },
 657            LanguageModelRequestMessage {
 658                role: Role::Assistant,
 659                content: vec!["Response to Message 1".into()],
 660                cache: false,
 661                reasoning_details: None,
 662            },
 663            LanguageModelRequestMessage {
 664                role: Role::User,
 665                content: vec!["Message 2".into()],
 666                cache: false,
 667                reasoning_details: None,
 668            },
 669            LanguageModelRequestMessage {
 670                role: Role::Assistant,
 671                content: vec!["Response to Message 2".into()],
 672                cache: false,
 673                reasoning_details: None,
 674            },
 675            LanguageModelRequestMessage {
 676                role: Role::User,
 677                content: vec!["Use the echo tool".into()],
 678                cache: false,
 679                reasoning_details: None,
 680            },
 681            LanguageModelRequestMessage {
 682                role: Role::Assistant,
 683                content: vec![MessageContent::ToolUse(tool_use)],
 684                cache: false,
 685                reasoning_details: None,
 686            },
 687            LanguageModelRequestMessage {
 688                role: Role::User,
 689                content: vec![MessageContent::ToolResult(tool_result)],
 690                cache: true,
 691                reasoning_details: None,
 692            }
 693        ]
 694    );
 695}
 696
 697#[gpui::test]
 698#[cfg_attr(not(feature = "e2e"), ignore)]
 699async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 700    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 701
 702    // Test a tool call that's likely to complete *before* streaming stops.
 703    let events = thread
 704        .update(cx, |thread, cx| {
 705            thread.add_tool(EchoTool);
 706            thread.send(
 707                UserMessageId::new(),
 708                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 709                cx,
 710            )
 711        })
 712        .unwrap()
 713        .collect()
 714        .await;
 715    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 716
 717    // Test a tool calls that's likely to complete *after* streaming stops.
 718    let events = thread
 719        .update(cx, |thread, cx| {
 720            thread.remove_tool(&EchoTool::NAME);
 721            thread.add_tool(DelayTool);
 722            thread.send(
 723                UserMessageId::new(),
 724                [
 725                    "Now call the delay tool with 200ms.",
 726                    "When the timer goes off, then you echo the output of the tool.",
 727                ],
 728                cx,
 729            )
 730        })
 731        .unwrap()
 732        .collect()
 733        .await;
 734    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 735    thread.update(cx, |thread, _cx| {
 736        assert!(
 737            thread
 738                .last_received_or_pending_message()
 739                .unwrap()
 740                .as_agent_message()
 741                .unwrap()
 742                .content
 743                .iter()
 744                .any(|content| {
 745                    if let AgentMessageContent::Text(text) = content {
 746                        text.contains("Ding")
 747                    } else {
 748                        false
 749                    }
 750                }),
 751            "{}",
 752            thread.to_markdown()
 753        );
 754    });
 755}
 756
 757#[gpui::test]
 758#[cfg_attr(not(feature = "e2e"), ignore)]
 759async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 760    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 761
 762    // Test a tool call that's likely to complete *before* streaming stops.
 763    let mut events = thread
 764        .update(cx, |thread, cx| {
 765            thread.add_tool(WordListTool);
 766            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 767        })
 768        .unwrap();
 769
 770    let mut saw_partial_tool_use = false;
 771    while let Some(event) = events.next().await {
 772        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 773            thread.update(cx, |thread, _cx| {
 774                // Look for a tool use in the thread's last message
 775                let message = thread.last_received_or_pending_message().unwrap();
 776                let agent_message = message.as_agent_message().unwrap();
 777                let last_content = agent_message.content.last().unwrap();
 778                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 779                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 780                    if tool_call.status == acp::ToolCallStatus::Pending {
 781                        if !last_tool_use.is_input_complete
 782                            && last_tool_use.input.get("g").is_none()
 783                        {
 784                            saw_partial_tool_use = true;
 785                        }
 786                    } else {
 787                        last_tool_use
 788                            .input
 789                            .get("a")
 790                            .expect("'a' has streamed because input is now complete");
 791                        last_tool_use
 792                            .input
 793                            .get("g")
 794                            .expect("'g' has streamed because input is now complete");
 795                    }
 796                } else {
 797                    panic!("last content should be a tool use");
 798                }
 799            });
 800        }
 801    }
 802
 803    assert!(
 804        saw_partial_tool_use,
 805        "should see at least one partially streamed tool use in the history"
 806    );
 807}
 808
 809#[gpui::test]
 810async fn test_tool_authorization(cx: &mut TestAppContext) {
 811    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 812    let fake_model = model.as_fake();
 813
 814    let mut events = thread
 815        .update(cx, |thread, cx| {
 816            thread.add_tool(ToolRequiringPermission);
 817            thread.send(UserMessageId::new(), ["abc"], cx)
 818        })
 819        .unwrap();
 820    cx.run_until_parked();
 821    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 822        LanguageModelToolUse {
 823            id: "tool_id_1".into(),
 824            name: ToolRequiringPermission::NAME.into(),
 825            raw_input: "{}".into(),
 826            input: json!({}),
 827            is_input_complete: true,
 828            thought_signature: None,
 829        },
 830    ));
 831    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 832        LanguageModelToolUse {
 833            id: "tool_id_2".into(),
 834            name: ToolRequiringPermission::NAME.into(),
 835            raw_input: "{}".into(),
 836            input: json!({}),
 837            is_input_complete: true,
 838            thought_signature: None,
 839        },
 840    ));
 841    fake_model.end_last_completion_stream();
 842    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 843    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 844
 845    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 846    tool_call_auth_1
 847        .response
 848        .send(acp_thread::SelectedPermissionOutcome::new(
 849            acp::PermissionOptionId::new("allow"),
 850            acp::PermissionOptionKind::AllowOnce,
 851        ))
 852        .unwrap();
 853    cx.run_until_parked();
 854
 855    // Reject the second - send "deny" option_id directly since Deny is now a button
 856    tool_call_auth_2
 857        .response
 858        .send(acp_thread::SelectedPermissionOutcome::new(
 859            acp::PermissionOptionId::new("deny"),
 860            acp::PermissionOptionKind::RejectOnce,
 861        ))
 862        .unwrap();
 863    cx.run_until_parked();
 864
 865    let completion = fake_model.pending_completions().pop().unwrap();
 866    let message = completion.messages.last().unwrap();
 867    assert_eq!(
 868        message.content,
 869        vec![
 870            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 871                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 872                tool_name: ToolRequiringPermission::NAME.into(),
 873                is_error: false,
 874                content: "Allowed".into(),
 875                output: Some("Allowed".into())
 876            }),
 877            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 878                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 879                tool_name: ToolRequiringPermission::NAME.into(),
 880                is_error: true,
 881                content: "Permission to run tool denied by user".into(),
 882                output: Some("Permission to run tool denied by user".into())
 883            })
 884        ]
 885    );
 886
 887    // Simulate yet another tool call.
 888    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 889        LanguageModelToolUse {
 890            id: "tool_id_3".into(),
 891            name: ToolRequiringPermission::NAME.into(),
 892            raw_input: "{}".into(),
 893            input: json!({}),
 894            is_input_complete: true,
 895            thought_signature: None,
 896        },
 897    ));
 898    fake_model.end_last_completion_stream();
 899
 900    // Respond by always allowing tools - send transformed option_id
 901    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 902    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 903    tool_call_auth_3
 904        .response
 905        .send(acp_thread::SelectedPermissionOutcome::new(
 906            acp::PermissionOptionId::new("always_allow:tool_requiring_permission"),
 907            acp::PermissionOptionKind::AllowAlways,
 908        ))
 909        .unwrap();
 910    cx.run_until_parked();
 911    let completion = fake_model.pending_completions().pop().unwrap();
 912    let message = completion.messages.last().unwrap();
 913    assert_eq!(
 914        message.content,
 915        vec![language_model::MessageContent::ToolResult(
 916            LanguageModelToolResult {
 917                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 918                tool_name: ToolRequiringPermission::NAME.into(),
 919                is_error: false,
 920                content: "Allowed".into(),
 921                output: Some("Allowed".into())
 922            }
 923        )]
 924    );
 925
 926    // Simulate a final tool call, ensuring we don't trigger authorization.
 927    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 928        LanguageModelToolUse {
 929            id: "tool_id_4".into(),
 930            name: ToolRequiringPermission::NAME.into(),
 931            raw_input: "{}".into(),
 932            input: json!({}),
 933            is_input_complete: true,
 934            thought_signature: None,
 935        },
 936    ));
 937    fake_model.end_last_completion_stream();
 938    cx.run_until_parked();
 939    let completion = fake_model.pending_completions().pop().unwrap();
 940    let message = completion.messages.last().unwrap();
 941    assert_eq!(
 942        message.content,
 943        vec![language_model::MessageContent::ToolResult(
 944            LanguageModelToolResult {
 945                tool_use_id: "tool_id_4".into(),
 946                tool_name: ToolRequiringPermission::NAME.into(),
 947                is_error: false,
 948                content: "Allowed".into(),
 949                output: Some("Allowed".into())
 950            }
 951        )]
 952    );
 953}
 954
 955#[gpui::test]
 956async fn test_tool_hallucination(cx: &mut TestAppContext) {
 957    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 958    let fake_model = model.as_fake();
 959
 960    let mut events = thread
 961        .update(cx, |thread, cx| {
 962            thread.send(UserMessageId::new(), ["abc"], cx)
 963        })
 964        .unwrap();
 965    cx.run_until_parked();
 966    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 967        LanguageModelToolUse {
 968            id: "tool_id_1".into(),
 969            name: "nonexistent_tool".into(),
 970            raw_input: "{}".into(),
 971            input: json!({}),
 972            is_input_complete: true,
 973            thought_signature: None,
 974        },
 975    ));
 976    fake_model.end_last_completion_stream();
 977
 978    let tool_call = expect_tool_call(&mut events).await;
 979    assert_eq!(tool_call.title, "nonexistent_tool");
 980    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 981    let update = expect_tool_call_update_fields(&mut events).await;
 982    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 983}
 984
 985async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 986    let event = events
 987        .next()
 988        .await
 989        .expect("no tool call authorization event received")
 990        .unwrap();
 991    match event {
 992        ThreadEvent::ToolCall(tool_call) => tool_call,
 993        event => {
 994            panic!("Unexpected event {event:?}");
 995        }
 996    }
 997}
 998
 999async fn expect_tool_call_update_fields(
1000    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1001) -> acp::ToolCallUpdate {
1002    let event = events
1003        .next()
1004        .await
1005        .expect("no tool call authorization event received")
1006        .unwrap();
1007    match event {
1008        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
1009        event => {
1010            panic!("Unexpected event {event:?}");
1011        }
1012    }
1013}
1014
1015async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1016    let event = events
1017        .next()
1018        .await
1019        .expect("no plan event received")
1020        .unwrap();
1021    match event {
1022        ThreadEvent::Plan(plan) => plan,
1023        event => {
1024            panic!("Unexpected event {event:?}");
1025        }
1026    }
1027}
1028
1029async fn next_tool_call_authorization(
1030    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1031) -> ToolCallAuthorization {
1032    loop {
1033        let event = events
1034            .next()
1035            .await
1036            .expect("no tool call authorization event received")
1037            .unwrap();
1038        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1039            let permission_kinds = tool_call_authorization
1040                .options
1041                .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1042                .map(|option| option.kind);
1043            let allow_once = tool_call_authorization
1044                .options
1045                .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1046                .map(|option| option.kind);
1047
1048            assert_eq!(
1049                permission_kinds,
1050                Some(acp::PermissionOptionKind::AllowAlways)
1051            );
1052            assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1053            return tool_call_authorization;
1054        }
1055    }
1056}
1057
1058#[test]
1059fn test_permission_options_terminal_with_pattern() {
1060    let permission_options = ToolPermissionContext::new(
1061        TerminalTool::NAME,
1062        vec!["cargo build --release".to_string()],
1063    )
1064    .build_permission_options();
1065
1066    let PermissionOptions::Dropdown(choices) = permission_options else {
1067        panic!("Expected dropdown permission options");
1068    };
1069
1070    assert_eq!(choices.len(), 3);
1071    let labels: Vec<&str> = choices
1072        .iter()
1073        .map(|choice| choice.allow.name.as_ref())
1074        .collect();
1075    assert!(labels.contains(&"Always for terminal"));
1076    assert!(labels.contains(&"Always for `cargo build` commands"));
1077    assert!(labels.contains(&"Only this time"));
1078}
1079
1080#[test]
1081fn test_permission_options_terminal_command_with_flag_second_token() {
1082    let permission_options =
1083        ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1084            .build_permission_options();
1085
1086    let PermissionOptions::Dropdown(choices) = permission_options else {
1087        panic!("Expected dropdown permission options");
1088    };
1089
1090    assert_eq!(choices.len(), 3);
1091    let labels: Vec<&str> = choices
1092        .iter()
1093        .map(|choice| choice.allow.name.as_ref())
1094        .collect();
1095    assert!(labels.contains(&"Always for terminal"));
1096    assert!(labels.contains(&"Always for `ls` commands"));
1097    assert!(labels.contains(&"Only this time"));
1098}
1099
1100#[test]
1101fn test_permission_options_terminal_single_word_command() {
1102    let permission_options =
1103        ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1104            .build_permission_options();
1105
1106    let PermissionOptions::Dropdown(choices) = permission_options else {
1107        panic!("Expected dropdown permission options");
1108    };
1109
1110    assert_eq!(choices.len(), 3);
1111    let labels: Vec<&str> = choices
1112        .iter()
1113        .map(|choice| choice.allow.name.as_ref())
1114        .collect();
1115    assert!(labels.contains(&"Always for terminal"));
1116    assert!(labels.contains(&"Always for `whoami` commands"));
1117    assert!(labels.contains(&"Only this time"));
1118}
1119
1120#[test]
1121fn test_permission_options_edit_file_with_path_pattern() {
1122    let permission_options =
1123        ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1124            .build_permission_options();
1125
1126    let PermissionOptions::Dropdown(choices) = permission_options else {
1127        panic!("Expected dropdown permission options");
1128    };
1129
1130    let labels: Vec<&str> = choices
1131        .iter()
1132        .map(|choice| choice.allow.name.as_ref())
1133        .collect();
1134    assert!(labels.contains(&"Always for edit file"));
1135    assert!(labels.contains(&"Always for `src/`"));
1136}
1137
1138#[test]
1139fn test_permission_options_fetch_with_domain_pattern() {
1140    let permission_options =
1141        ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1142            .build_permission_options();
1143
1144    let PermissionOptions::Dropdown(choices) = permission_options else {
1145        panic!("Expected dropdown permission options");
1146    };
1147
1148    let labels: Vec<&str> = choices
1149        .iter()
1150        .map(|choice| choice.allow.name.as_ref())
1151        .collect();
1152    assert!(labels.contains(&"Always for fetch"));
1153    assert!(labels.contains(&"Always for `docs.rs`"));
1154}
1155
1156#[test]
1157fn test_permission_options_without_pattern() {
1158    let permission_options = ToolPermissionContext::new(
1159        TerminalTool::NAME,
1160        vec!["./deploy.sh --production".to_string()],
1161    )
1162    .build_permission_options();
1163
1164    let PermissionOptions::Dropdown(choices) = permission_options else {
1165        panic!("Expected dropdown permission options");
1166    };
1167
1168    assert_eq!(choices.len(), 2);
1169    let labels: Vec<&str> = choices
1170        .iter()
1171        .map(|choice| choice.allow.name.as_ref())
1172        .collect();
1173    assert!(labels.contains(&"Always for terminal"));
1174    assert!(labels.contains(&"Only this time"));
1175    assert!(!labels.iter().any(|label| label.contains("commands")));
1176}
1177
1178#[test]
1179fn test_permission_options_symlink_target_are_flat_once_only() {
1180    let permission_options =
1181        ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1182            .build_permission_options();
1183
1184    let PermissionOptions::Flat(options) = permission_options else {
1185        panic!("Expected flat permission options for symlink target authorization");
1186    };
1187
1188    assert_eq!(options.len(), 2);
1189    assert!(options.iter().any(|option| {
1190        option.option_id.0.as_ref() == "allow"
1191            && option.kind == acp::PermissionOptionKind::AllowOnce
1192    }));
1193    assert!(options.iter().any(|option| {
1194        option.option_id.0.as_ref() == "deny"
1195            && option.kind == acp::PermissionOptionKind::RejectOnce
1196    }));
1197}
1198
1199#[test]
1200fn test_permission_option_ids_for_terminal() {
1201    let permission_options = ToolPermissionContext::new(
1202        TerminalTool::NAME,
1203        vec!["cargo build --release".to_string()],
1204    )
1205    .build_permission_options();
1206
1207    let PermissionOptions::Dropdown(choices) = permission_options else {
1208        panic!("Expected dropdown permission options");
1209    };
1210
1211    // Expect 3 choices: always-tool, always-pattern, once
1212    assert_eq!(choices.len(), 3);
1213
1214    // First two choices both use the tool-level option IDs
1215    assert_eq!(
1216        choices[0].allow.option_id.0.as_ref(),
1217        "always_allow:terminal"
1218    );
1219    assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1220    assert!(choices[0].sub_patterns.is_empty());
1221
1222    assert_eq!(
1223        choices[1].allow.option_id.0.as_ref(),
1224        "always_allow:terminal"
1225    );
1226    assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1227    assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1228
1229    // Third choice is the one-time allow/deny
1230    assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1231    assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1232    assert!(choices[2].sub_patterns.is_empty());
1233}
1234
1235#[test]
1236fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1237    let permission_options = ToolPermissionContext::new(
1238        TerminalTool::NAME,
1239        vec!["cargo test 2>&1 | tail".to_string()],
1240    )
1241    .build_permission_options();
1242
1243    let PermissionOptions::DropdownWithPatterns {
1244        choices,
1245        patterns,
1246        tool_name,
1247    } = permission_options
1248    else {
1249        panic!("Expected DropdownWithPatterns permission options for pipeline command");
1250    };
1251
1252    assert_eq!(tool_name, TerminalTool::NAME);
1253
1254    // Should have "Always for terminal" and "Only this time" choices
1255    assert_eq!(choices.len(), 2);
1256    let labels: Vec<&str> = choices
1257        .iter()
1258        .map(|choice| choice.allow.name.as_ref())
1259        .collect();
1260    assert!(labels.contains(&"Always for terminal"));
1261    assert!(labels.contains(&"Only this time"));
1262
1263    // Should have per-command patterns for "cargo test" and "tail"
1264    assert_eq!(patterns.len(), 2);
1265    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1266    assert!(pattern_names.contains(&"cargo test"));
1267    assert!(pattern_names.contains(&"tail"));
1268
1269    // Verify patterns are valid regex patterns
1270    let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1271    assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1272    assert!(regex_patterns.contains(&"^tail\\b"));
1273}
1274
1275#[test]
1276fn test_permission_options_terminal_pipeline_with_chaining() {
1277    let permission_options = ToolPermissionContext::new(
1278        TerminalTool::NAME,
1279        vec!["npm install && npm test | tail".to_string()],
1280    )
1281    .build_permission_options();
1282
1283    let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1284        panic!("Expected DropdownWithPatterns for chained pipeline command");
1285    };
1286
1287    // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1288    assert_eq!(patterns.len(), 3);
1289    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1290    assert!(pattern_names.contains(&"npm install"));
1291    assert!(pattern_names.contains(&"npm test"));
1292    assert!(pattern_names.contains(&"tail"));
1293}
1294
1295#[gpui::test]
1296#[cfg_attr(not(feature = "e2e"), ignore)]
1297async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1298    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1299
1300    // Test concurrent tool calls with different delay times
1301    let events = thread
1302        .update(cx, |thread, cx| {
1303            thread.add_tool(DelayTool);
1304            thread.send(
1305                UserMessageId::new(),
1306                [
1307                    "Call the delay tool twice in the same message.",
1308                    "Once with 100ms. Once with 300ms.",
1309                    "When both timers are complete, describe the outputs.",
1310                ],
1311                cx,
1312            )
1313        })
1314        .unwrap()
1315        .collect()
1316        .await;
1317
1318    let stop_reasons = stop_events(events);
1319    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1320
1321    thread.update(cx, |thread, _cx| {
1322        let last_message = thread.last_received_or_pending_message().unwrap();
1323        let agent_message = last_message.as_agent_message().unwrap();
1324        let text = agent_message
1325            .content
1326            .iter()
1327            .filter_map(|content| {
1328                if let AgentMessageContent::Text(text) = content {
1329                    Some(text.as_str())
1330                } else {
1331                    None
1332                }
1333            })
1334            .collect::<String>();
1335
1336        assert!(text.contains("Ding"));
1337    });
1338}
1339
1340#[gpui::test]
1341async fn test_profiles(cx: &mut TestAppContext) {
1342    let ThreadTest {
1343        model, thread, fs, ..
1344    } = setup(cx, TestModel::Fake).await;
1345    let fake_model = model.as_fake();
1346
1347    thread.update(cx, |thread, _cx| {
1348        thread.add_tool(DelayTool);
1349        thread.add_tool(EchoTool);
1350        thread.add_tool(InfiniteTool);
1351    });
1352
1353    // Override profiles and wait for settings to be loaded.
1354    fs.insert_file(
1355        paths::settings_file(),
1356        json!({
1357            "agent": {
1358                "profiles": {
1359                    "test-1": {
1360                        "name": "Test Profile 1",
1361                        "tools": {
1362                            EchoTool::NAME: true,
1363                            DelayTool::NAME: true,
1364                        }
1365                    },
1366                    "test-2": {
1367                        "name": "Test Profile 2",
1368                        "tools": {
1369                            InfiniteTool::NAME: true,
1370                        }
1371                    }
1372                }
1373            }
1374        })
1375        .to_string()
1376        .into_bytes(),
1377    )
1378    .await;
1379    cx.run_until_parked();
1380
1381    // Test that test-1 profile (default) has echo and delay tools
1382    thread
1383        .update(cx, |thread, cx| {
1384            thread.set_profile(AgentProfileId("test-1".into()), cx);
1385            thread.send(UserMessageId::new(), ["test"], cx)
1386        })
1387        .unwrap();
1388    cx.run_until_parked();
1389
1390    let mut pending_completions = fake_model.pending_completions();
1391    assert_eq!(pending_completions.len(), 1);
1392    let completion = pending_completions.pop().unwrap();
1393    let tool_names: Vec<String> = completion
1394        .tools
1395        .iter()
1396        .map(|tool| tool.name.clone())
1397        .collect();
1398    assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1399    fake_model.end_last_completion_stream();
1400
1401    // Switch to test-2 profile, and verify that it has only the infinite tool.
1402    thread
1403        .update(cx, |thread, cx| {
1404            thread.set_profile(AgentProfileId("test-2".into()), cx);
1405            thread.send(UserMessageId::new(), ["test2"], cx)
1406        })
1407        .unwrap();
1408    cx.run_until_parked();
1409    let mut pending_completions = fake_model.pending_completions();
1410    assert_eq!(pending_completions.len(), 1);
1411    let completion = pending_completions.pop().unwrap();
1412    let tool_names: Vec<String> = completion
1413        .tools
1414        .iter()
1415        .map(|tool| tool.name.clone())
1416        .collect();
1417    assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1418}
1419
1420#[gpui::test]
1421async fn test_mcp_tools(cx: &mut TestAppContext) {
1422    let ThreadTest {
1423        model,
1424        thread,
1425        context_server_store,
1426        fs,
1427        ..
1428    } = setup(cx, TestModel::Fake).await;
1429    let fake_model = model.as_fake();
1430
1431    // Override profiles and wait for settings to be loaded.
1432    fs.insert_file(
1433        paths::settings_file(),
1434        json!({
1435            "agent": {
1436                "tool_permissions": { "default": "allow" },
1437                "profiles": {
1438                    "test": {
1439                        "name": "Test Profile",
1440                        "enable_all_context_servers": true,
1441                        "tools": {
1442                            EchoTool::NAME: true,
1443                        }
1444                    },
1445                }
1446            }
1447        })
1448        .to_string()
1449        .into_bytes(),
1450    )
1451    .await;
1452    cx.run_until_parked();
1453    thread.update(cx, |thread, cx| {
1454        thread.set_profile(AgentProfileId("test".into()), cx)
1455    });
1456
1457    let mut mcp_tool_calls = setup_context_server(
1458        "test_server",
1459        vec![context_server::types::Tool {
1460            name: "echo".into(),
1461            description: None,
1462            input_schema: serde_json::to_value(EchoTool::input_schema(
1463                LanguageModelToolSchemaFormat::JsonSchema,
1464            ))
1465            .unwrap(),
1466            output_schema: None,
1467            annotations: None,
1468        }],
1469        &context_server_store,
1470        cx,
1471    );
1472
1473    let events = thread.update(cx, |thread, cx| {
1474        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1475    });
1476    cx.run_until_parked();
1477
1478    // Simulate the model calling the MCP tool.
1479    let completion = fake_model.pending_completions().pop().unwrap();
1480    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1481    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1482        LanguageModelToolUse {
1483            id: "tool_1".into(),
1484            name: "echo".into(),
1485            raw_input: json!({"text": "test"}).to_string(),
1486            input: json!({"text": "test"}),
1487            is_input_complete: true,
1488            thought_signature: None,
1489        },
1490    ));
1491    fake_model.end_last_completion_stream();
1492    cx.run_until_parked();
1493
1494    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1495    assert_eq!(tool_call_params.name, "echo");
1496    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1497    tool_call_response
1498        .send(context_server::types::CallToolResponse {
1499            content: vec![context_server::types::ToolResponseContent::Text {
1500                text: "test".into(),
1501            }],
1502            is_error: None,
1503            meta: None,
1504            structured_content: None,
1505        })
1506        .unwrap();
1507    cx.run_until_parked();
1508
1509    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1510    fake_model.send_last_completion_stream_text_chunk("Done!");
1511    fake_model.end_last_completion_stream();
1512    events.collect::<Vec<_>>().await;
1513
1514    // Send again after adding the echo tool, ensuring the name collision is resolved.
1515    let events = thread.update(cx, |thread, cx| {
1516        thread.add_tool(EchoTool);
1517        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1518    });
1519    cx.run_until_parked();
1520    let completion = fake_model.pending_completions().pop().unwrap();
1521    assert_eq!(
1522        tool_names_for_completion(&completion),
1523        vec!["echo", "test_server_echo"]
1524    );
1525    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1526        LanguageModelToolUse {
1527            id: "tool_2".into(),
1528            name: "test_server_echo".into(),
1529            raw_input: json!({"text": "mcp"}).to_string(),
1530            input: json!({"text": "mcp"}),
1531            is_input_complete: true,
1532            thought_signature: None,
1533        },
1534    ));
1535    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1536        LanguageModelToolUse {
1537            id: "tool_3".into(),
1538            name: "echo".into(),
1539            raw_input: json!({"text": "native"}).to_string(),
1540            input: json!({"text": "native"}),
1541            is_input_complete: true,
1542            thought_signature: None,
1543        },
1544    ));
1545    fake_model.end_last_completion_stream();
1546    cx.run_until_parked();
1547
1548    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1549    assert_eq!(tool_call_params.name, "echo");
1550    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1551    tool_call_response
1552        .send(context_server::types::CallToolResponse {
1553            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1554            is_error: None,
1555            meta: None,
1556            structured_content: None,
1557        })
1558        .unwrap();
1559    cx.run_until_parked();
1560
1561    // Ensure the tool results were inserted with the correct names.
1562    let completion = fake_model.pending_completions().pop().unwrap();
1563    assert_eq!(
1564        completion.messages.last().unwrap().content,
1565        vec![
1566            MessageContent::ToolResult(LanguageModelToolResult {
1567                tool_use_id: "tool_3".into(),
1568                tool_name: "echo".into(),
1569                is_error: false,
1570                content: "native".into(),
1571                output: Some("native".into()),
1572            },),
1573            MessageContent::ToolResult(LanguageModelToolResult {
1574                tool_use_id: "tool_2".into(),
1575                tool_name: "test_server_echo".into(),
1576                is_error: false,
1577                content: "mcp".into(),
1578                output: Some("mcp".into()),
1579            },),
1580        ]
1581    );
1582    fake_model.end_last_completion_stream();
1583    events.collect::<Vec<_>>().await;
1584}
1585
1586#[gpui::test]
1587async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1588    let ThreadTest {
1589        model,
1590        thread,
1591        context_server_store,
1592        fs,
1593        ..
1594    } = setup(cx, TestModel::Fake).await;
1595    let fake_model = model.as_fake();
1596
1597    // Setup settings to allow MCP tools
1598    fs.insert_file(
1599        paths::settings_file(),
1600        json!({
1601            "agent": {
1602                "always_allow_tool_actions": true,
1603                "profiles": {
1604                    "test": {
1605                        "name": "Test Profile",
1606                        "enable_all_context_servers": true,
1607                        "tools": {}
1608                    },
1609                }
1610            }
1611        })
1612        .to_string()
1613        .into_bytes(),
1614    )
1615    .await;
1616    cx.run_until_parked();
1617    thread.update(cx, |thread, cx| {
1618        thread.set_profile(AgentProfileId("test".into()), cx)
1619    });
1620
1621    // Setup a context server with a tool
1622    let mut mcp_tool_calls = setup_context_server(
1623        "github_server",
1624        vec![context_server::types::Tool {
1625            name: "issue_read".into(),
1626            description: Some("Read a GitHub issue".into()),
1627            input_schema: json!({
1628                "type": "object",
1629                "properties": {
1630                    "issue_url": { "type": "string" }
1631                }
1632            }),
1633            output_schema: None,
1634            annotations: None,
1635        }],
1636        &context_server_store,
1637        cx,
1638    );
1639
1640    // Send a message and have the model call the MCP tool
1641    let events = thread.update(cx, |thread, cx| {
1642        thread
1643            .send(UserMessageId::new(), ["Read issue #47404"], cx)
1644            .unwrap()
1645    });
1646    cx.run_until_parked();
1647
1648    // Verify the MCP tool is available to the model
1649    let completion = fake_model.pending_completions().pop().unwrap();
1650    assert_eq!(
1651        tool_names_for_completion(&completion),
1652        vec!["issue_read"],
1653        "MCP tool should be available"
1654    );
1655
1656    // Simulate the model calling the MCP tool
1657    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1658        LanguageModelToolUse {
1659            id: "tool_1".into(),
1660            name: "issue_read".into(),
1661            raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1662                .to_string(),
1663            input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1664            is_input_complete: true,
1665            thought_signature: None,
1666        },
1667    ));
1668    fake_model.end_last_completion_stream();
1669    cx.run_until_parked();
1670
1671    // The MCP server receives the tool call and responds with content
1672    let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1673    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1674    assert_eq!(tool_call_params.name, "issue_read");
1675    tool_call_response
1676        .send(context_server::types::CallToolResponse {
1677            content: vec![context_server::types::ToolResponseContent::Text {
1678                text: expected_tool_output.into(),
1679            }],
1680            is_error: None,
1681            meta: None,
1682            structured_content: None,
1683        })
1684        .unwrap();
1685    cx.run_until_parked();
1686
1687    // After tool completes, the model continues with a new completion request
1688    // that includes the tool results. We need to respond to this.
1689    let _completion = fake_model.pending_completions().pop().unwrap();
1690    fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1691    fake_model
1692        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1693    fake_model.end_last_completion_stream();
1694    events.collect::<Vec<_>>().await;
1695
1696    // Verify the tool result is stored in the thread by checking the markdown output.
1697    // The tool result is in the first assistant message (not the last one, which is
1698    // the model's response after the tool completed).
1699    thread.update(cx, |thread, _cx| {
1700        let markdown = thread.to_markdown();
1701        assert!(
1702            markdown.contains("**Tool Result**: issue_read"),
1703            "Thread should contain tool result header"
1704        );
1705        assert!(
1706            markdown.contains(expected_tool_output),
1707            "Thread should contain tool output: {}",
1708            expected_tool_output
1709        );
1710    });
1711
1712    // Simulate app restart: disconnect the MCP server.
1713    // After restart, the MCP server won't be connected yet when the thread is replayed.
1714    context_server_store.update(cx, |store, cx| {
1715        let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1716    });
1717    cx.run_until_parked();
1718
1719    // Replay the thread (this is what happens when loading a saved thread)
1720    let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1721
1722    let mut found_tool_call = None;
1723    let mut found_tool_call_update_with_output = None;
1724
1725    while let Some(event) = replay_events.next().await {
1726        let event = event.unwrap();
1727        match &event {
1728            ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1729                found_tool_call = Some(tc.clone());
1730            }
1731            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1732                if update.tool_call_id.to_string() == "tool_1" =>
1733            {
1734                if update.fields.raw_output.is_some() {
1735                    found_tool_call_update_with_output = Some(update.clone());
1736                }
1737            }
1738            _ => {}
1739        }
1740    }
1741
1742    // The tool call should be found
1743    assert!(
1744        found_tool_call.is_some(),
1745        "Tool call should be emitted during replay"
1746    );
1747
1748    assert!(
1749        found_tool_call_update_with_output.is_some(),
1750        "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1751    );
1752
1753    let update = found_tool_call_update_with_output.unwrap();
1754    assert_eq!(
1755        update.fields.raw_output,
1756        Some(expected_tool_output.into()),
1757        "raw_output should contain the saved tool result"
1758    );
1759
1760    // Also verify the status is correct (completed, not failed)
1761    assert_eq!(
1762        update.fields.status,
1763        Some(acp::ToolCallStatus::Completed),
1764        "Tool call status should reflect the original completion status"
1765    );
1766}
1767
1768#[gpui::test]
1769async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1770    let ThreadTest {
1771        model,
1772        thread,
1773        context_server_store,
1774        fs,
1775        ..
1776    } = setup(cx, TestModel::Fake).await;
1777    let fake_model = model.as_fake();
1778
1779    // Set up a profile with all tools enabled
1780    fs.insert_file(
1781        paths::settings_file(),
1782        json!({
1783            "agent": {
1784                "profiles": {
1785                    "test": {
1786                        "name": "Test Profile",
1787                        "enable_all_context_servers": true,
1788                        "tools": {
1789                            EchoTool::NAME: true,
1790                            DelayTool::NAME: true,
1791                            WordListTool::NAME: true,
1792                            ToolRequiringPermission::NAME: true,
1793                            InfiniteTool::NAME: true,
1794                        }
1795                    },
1796                }
1797            }
1798        })
1799        .to_string()
1800        .into_bytes(),
1801    )
1802    .await;
1803    cx.run_until_parked();
1804
1805    thread.update(cx, |thread, cx| {
1806        thread.set_profile(AgentProfileId("test".into()), cx);
1807        thread.add_tool(EchoTool);
1808        thread.add_tool(DelayTool);
1809        thread.add_tool(WordListTool);
1810        thread.add_tool(ToolRequiringPermission);
1811        thread.add_tool(InfiniteTool);
1812    });
1813
1814    // Set up multiple context servers with some overlapping tool names
1815    let _server1_calls = setup_context_server(
1816        "xxx",
1817        vec![
1818            context_server::types::Tool {
1819                name: "echo".into(), // Conflicts with native EchoTool
1820                description: None,
1821                input_schema: serde_json::to_value(EchoTool::input_schema(
1822                    LanguageModelToolSchemaFormat::JsonSchema,
1823                ))
1824                .unwrap(),
1825                output_schema: None,
1826                annotations: None,
1827            },
1828            context_server::types::Tool {
1829                name: "unique_tool_1".into(),
1830                description: None,
1831                input_schema: json!({"type": "object", "properties": {}}),
1832                output_schema: None,
1833                annotations: None,
1834            },
1835        ],
1836        &context_server_store,
1837        cx,
1838    );
1839
1840    let _server2_calls = setup_context_server(
1841        "yyy",
1842        vec![
1843            context_server::types::Tool {
1844                name: "echo".into(), // Also conflicts with native EchoTool
1845                description: None,
1846                input_schema: serde_json::to_value(EchoTool::input_schema(
1847                    LanguageModelToolSchemaFormat::JsonSchema,
1848                ))
1849                .unwrap(),
1850                output_schema: None,
1851                annotations: None,
1852            },
1853            context_server::types::Tool {
1854                name: "unique_tool_2".into(),
1855                description: None,
1856                input_schema: json!({"type": "object", "properties": {}}),
1857                output_schema: None,
1858                annotations: None,
1859            },
1860            context_server::types::Tool {
1861                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1862                description: None,
1863                input_schema: json!({"type": "object", "properties": {}}),
1864                output_schema: None,
1865                annotations: None,
1866            },
1867            context_server::types::Tool {
1868                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1869                description: None,
1870                input_schema: json!({"type": "object", "properties": {}}),
1871                output_schema: None,
1872                annotations: None,
1873            },
1874        ],
1875        &context_server_store,
1876        cx,
1877    );
1878    let _server3_calls = setup_context_server(
1879        "zzz",
1880        vec![
1881            context_server::types::Tool {
1882                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1883                description: None,
1884                input_schema: json!({"type": "object", "properties": {}}),
1885                output_schema: None,
1886                annotations: None,
1887            },
1888            context_server::types::Tool {
1889                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1890                description: None,
1891                input_schema: json!({"type": "object", "properties": {}}),
1892                output_schema: None,
1893                annotations: None,
1894            },
1895            context_server::types::Tool {
1896                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1897                description: None,
1898                input_schema: json!({"type": "object", "properties": {}}),
1899                output_schema: None,
1900                annotations: None,
1901            },
1902        ],
1903        &context_server_store,
1904        cx,
1905    );
1906
1907    // Server with spaces in name - tests snake_case conversion for API compatibility
1908    let _server4_calls = setup_context_server(
1909        "Azure DevOps",
1910        vec![context_server::types::Tool {
1911            name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1912            description: None,
1913            input_schema: serde_json::to_value(EchoTool::input_schema(
1914                LanguageModelToolSchemaFormat::JsonSchema,
1915            ))
1916            .unwrap(),
1917            output_schema: None,
1918            annotations: None,
1919        }],
1920        &context_server_store,
1921        cx,
1922    );
1923
1924    thread
1925        .update(cx, |thread, cx| {
1926            thread.send(UserMessageId::new(), ["Go"], cx)
1927        })
1928        .unwrap();
1929    cx.run_until_parked();
1930    let completion = fake_model.pending_completions().pop().unwrap();
1931    assert_eq!(
1932        tool_names_for_completion(&completion),
1933        vec![
1934            "azure_dev_ops_echo",
1935            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1936            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1937            "delay",
1938            "echo",
1939            "infinite",
1940            "tool_requiring_permission",
1941            "unique_tool_1",
1942            "unique_tool_2",
1943            "word_list",
1944            "xxx_echo",
1945            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1946            "yyy_echo",
1947            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1948        ]
1949    );
1950}
1951
1952#[gpui::test]
1953#[cfg_attr(not(feature = "e2e"), ignore)]
1954async fn test_cancellation(cx: &mut TestAppContext) {
1955    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1956
1957    let mut events = thread
1958        .update(cx, |thread, cx| {
1959            thread.add_tool(InfiniteTool);
1960            thread.add_tool(EchoTool);
1961            thread.send(
1962                UserMessageId::new(),
1963                ["Call the echo tool, then call the infinite tool, then explain their output"],
1964                cx,
1965            )
1966        })
1967        .unwrap();
1968
1969    // Wait until both tools are called.
1970    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1971    let mut echo_id = None;
1972    let mut echo_completed = false;
1973    while let Some(event) = events.next().await {
1974        match event.unwrap() {
1975            ThreadEvent::ToolCall(tool_call) => {
1976                assert_eq!(tool_call.title, expected_tools.remove(0));
1977                if tool_call.title == "Echo" {
1978                    echo_id = Some(tool_call.tool_call_id);
1979                }
1980            }
1981            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1982                acp::ToolCallUpdate {
1983                    tool_call_id,
1984                    fields:
1985                        acp::ToolCallUpdateFields {
1986                            status: Some(acp::ToolCallStatus::Completed),
1987                            ..
1988                        },
1989                    ..
1990                },
1991            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1992                echo_completed = true;
1993            }
1994            _ => {}
1995        }
1996
1997        if expected_tools.is_empty() && echo_completed {
1998            break;
1999        }
2000    }
2001
2002    // Cancel the current send and ensure that the event stream is closed, even
2003    // if one of the tools is still running.
2004    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2005    let events = events.collect::<Vec<_>>().await;
2006    let last_event = events.last();
2007    assert!(
2008        matches!(
2009            last_event,
2010            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2011        ),
2012        "unexpected event {last_event:?}"
2013    );
2014
2015    // Ensure we can still send a new message after cancellation.
2016    let events = thread
2017        .update(cx, |thread, cx| {
2018            thread.send(
2019                UserMessageId::new(),
2020                ["Testing: reply with 'Hello' then stop."],
2021                cx,
2022            )
2023        })
2024        .unwrap()
2025        .collect::<Vec<_>>()
2026        .await;
2027    thread.update(cx, |thread, _cx| {
2028        let message = thread.last_received_or_pending_message().unwrap();
2029        let agent_message = message.as_agent_message().unwrap();
2030        assert_eq!(
2031            agent_message.content,
2032            vec![AgentMessageContent::Text("Hello".to_string())]
2033        );
2034    });
2035    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2036}
2037
2038#[gpui::test]
2039async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2040    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2041    always_allow_tools(cx);
2042    let fake_model = model.as_fake();
2043
2044    let environment = Rc::new(cx.update(|cx| {
2045        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2046    }));
2047    let handle = environment.terminal_handle.clone().unwrap();
2048
2049    let mut events = thread
2050        .update(cx, |thread, cx| {
2051            thread.add_tool(crate::TerminalTool::new(
2052                thread.project().clone(),
2053                environment,
2054            ));
2055            thread.send(UserMessageId::new(), ["run a command"], cx)
2056        })
2057        .unwrap();
2058
2059    cx.run_until_parked();
2060
2061    // Simulate the model calling the terminal tool
2062    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2063        LanguageModelToolUse {
2064            id: "terminal_tool_1".into(),
2065            name: TerminalTool::NAME.into(),
2066            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2067            input: json!({"command": "sleep 1000", "cd": "."}),
2068            is_input_complete: true,
2069            thought_signature: None,
2070        },
2071    ));
2072    fake_model.end_last_completion_stream();
2073
2074    // Wait for the terminal tool to start running
2075    wait_for_terminal_tool_started(&mut events, cx).await;
2076
2077    // Cancel the thread while the terminal is running
2078    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2079
2080    // Collect remaining events, driving the executor to let cancellation complete
2081    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2082
2083    // Verify the terminal was killed
2084    assert!(
2085        handle.was_killed(),
2086        "expected terminal handle to be killed on cancellation"
2087    );
2088
2089    // Verify we got a cancellation stop event
2090    assert_eq!(
2091        stop_events(remaining_events),
2092        vec![acp::StopReason::Cancelled],
2093    );
2094
2095    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2096    thread.update(cx, |thread, _cx| {
2097        let message = thread.last_received_or_pending_message().unwrap();
2098        let agent_message = message.as_agent_message().unwrap();
2099
2100        let tool_use = agent_message
2101            .content
2102            .iter()
2103            .find_map(|content| match content {
2104                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2105                _ => None,
2106            })
2107            .expect("expected tool use in agent message");
2108
2109        let tool_result = agent_message
2110            .tool_results
2111            .get(&tool_use.id)
2112            .expect("expected tool result");
2113
2114        let result_text = match &tool_result.content {
2115            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2116            _ => panic!("expected text content in tool result"),
2117        };
2118
2119        // "partial output" comes from FakeTerminalHandle's output field
2120        assert!(
2121            result_text.contains("partial output"),
2122            "expected tool result to contain terminal output, got: {result_text}"
2123        );
2124        // Match the actual format from process_content in terminal_tool.rs
2125        assert!(
2126            result_text.contains("The user stopped this command"),
2127            "expected tool result to indicate user stopped, got: {result_text}"
2128        );
2129    });
2130
2131    // Verify we can send a new message after cancellation
2132    verify_thread_recovery(&thread, &fake_model, cx).await;
2133}
2134
2135#[gpui::test]
2136async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2137    // This test verifies that tools which properly handle cancellation via
2138    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2139    // to cancellation and report that they were cancelled.
2140    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2141    always_allow_tools(cx);
2142    let fake_model = model.as_fake();
2143
2144    let (tool, was_cancelled) = CancellationAwareTool::new();
2145
2146    let mut events = thread
2147        .update(cx, |thread, cx| {
2148            thread.add_tool(tool);
2149            thread.send(
2150                UserMessageId::new(),
2151                ["call the cancellation aware tool"],
2152                cx,
2153            )
2154        })
2155        .unwrap();
2156
2157    cx.run_until_parked();
2158
2159    // Simulate the model calling the cancellation-aware tool
2160    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2161        LanguageModelToolUse {
2162            id: "cancellation_aware_1".into(),
2163            name: "cancellation_aware".into(),
2164            raw_input: r#"{}"#.into(),
2165            input: json!({}),
2166            is_input_complete: true,
2167            thought_signature: None,
2168        },
2169    ));
2170    fake_model.end_last_completion_stream();
2171
2172    cx.run_until_parked();
2173
2174    // Wait for the tool call to be reported
2175    let mut tool_started = false;
2176    let deadline = cx.executor().num_cpus() * 100;
2177    for _ in 0..deadline {
2178        cx.run_until_parked();
2179
2180        while let Some(Some(event)) = events.next().now_or_never() {
2181            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2182                if tool_call.title == "Cancellation Aware Tool" {
2183                    tool_started = true;
2184                    break;
2185                }
2186            }
2187        }
2188
2189        if tool_started {
2190            break;
2191        }
2192
2193        cx.background_executor
2194            .timer(Duration::from_millis(10))
2195            .await;
2196    }
2197    assert!(tool_started, "expected cancellation aware tool to start");
2198
2199    // Cancel the thread and wait for it to complete
2200    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2201
2202    // The cancel task should complete promptly because the tool handles cancellation
2203    let timeout = cx.background_executor.timer(Duration::from_secs(5));
2204    futures::select! {
2205        _ = cancel_task.fuse() => {}
2206        _ = timeout.fuse() => {
2207            panic!("cancel task timed out - tool did not respond to cancellation");
2208        }
2209    }
2210
2211    // Verify the tool detected cancellation via its flag
2212    assert!(
2213        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2214        "tool should have detected cancellation via event_stream.cancelled_by_user()"
2215    );
2216
2217    // Collect remaining events
2218    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2219
2220    // Verify we got a cancellation stop event
2221    assert_eq!(
2222        stop_events(remaining_events),
2223        vec![acp::StopReason::Cancelled],
2224    );
2225
2226    // Verify we can send a new message after cancellation
2227    verify_thread_recovery(&thread, &fake_model, cx).await;
2228}
2229
2230/// Helper to verify thread can recover after cancellation by sending a simple message.
2231async fn verify_thread_recovery(
2232    thread: &Entity<Thread>,
2233    fake_model: &FakeLanguageModel,
2234    cx: &mut TestAppContext,
2235) {
2236    let events = thread
2237        .update(cx, |thread, cx| {
2238            thread.send(
2239                UserMessageId::new(),
2240                ["Testing: reply with 'Hello' then stop."],
2241                cx,
2242            )
2243        })
2244        .unwrap();
2245    cx.run_until_parked();
2246    fake_model.send_last_completion_stream_text_chunk("Hello");
2247    fake_model
2248        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2249    fake_model.end_last_completion_stream();
2250
2251    let events = events.collect::<Vec<_>>().await;
2252    thread.update(cx, |thread, _cx| {
2253        let message = thread.last_received_or_pending_message().unwrap();
2254        let agent_message = message.as_agent_message().unwrap();
2255        assert_eq!(
2256            agent_message.content,
2257            vec![AgentMessageContent::Text("Hello".to_string())]
2258        );
2259    });
2260    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2261}
2262
2263/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2264async fn wait_for_terminal_tool_started(
2265    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2266    cx: &mut TestAppContext,
2267) {
2268    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2269    for _ in 0..deadline {
2270        cx.run_until_parked();
2271
2272        while let Some(Some(event)) = events.next().now_or_never() {
2273            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2274                update,
2275            ))) = &event
2276            {
2277                if update.fields.content.as_ref().is_some_and(|content| {
2278                    content
2279                        .iter()
2280                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2281                }) {
2282                    return;
2283                }
2284            }
2285        }
2286
2287        cx.background_executor
2288            .timer(Duration::from_millis(10))
2289            .await;
2290    }
2291    panic!("terminal tool did not start within the expected time");
2292}
2293
2294/// Collects events until a Stop event is received, driving the executor to completion.
2295async fn collect_events_until_stop(
2296    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2297    cx: &mut TestAppContext,
2298) -> Vec<Result<ThreadEvent>> {
2299    let mut collected = Vec::new();
2300    let deadline = cx.executor().num_cpus() * 200;
2301
2302    for _ in 0..deadline {
2303        cx.executor().advance_clock(Duration::from_millis(10));
2304        cx.run_until_parked();
2305
2306        while let Some(Some(event)) = events.next().now_or_never() {
2307            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2308            collected.push(event);
2309            if is_stop {
2310                return collected;
2311            }
2312        }
2313    }
2314    panic!(
2315        "did not receive Stop event within the expected time; collected {} events",
2316        collected.len()
2317    );
2318}
2319
2320#[gpui::test]
2321async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2322    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2323    always_allow_tools(cx);
2324    let fake_model = model.as_fake();
2325
2326    let environment = Rc::new(cx.update(|cx| {
2327        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2328    }));
2329    let handle = environment.terminal_handle.clone().unwrap();
2330
2331    let message_id = UserMessageId::new();
2332    let mut events = thread
2333        .update(cx, |thread, cx| {
2334            thread.add_tool(crate::TerminalTool::new(
2335                thread.project().clone(),
2336                environment,
2337            ));
2338            thread.send(message_id.clone(), ["run a command"], cx)
2339        })
2340        .unwrap();
2341
2342    cx.run_until_parked();
2343
2344    // Simulate the model calling the terminal tool
2345    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2346        LanguageModelToolUse {
2347            id: "terminal_tool_1".into(),
2348            name: TerminalTool::NAME.into(),
2349            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2350            input: json!({"command": "sleep 1000", "cd": "."}),
2351            is_input_complete: true,
2352            thought_signature: None,
2353        },
2354    ));
2355    fake_model.end_last_completion_stream();
2356
2357    // Wait for the terminal tool to start running
2358    wait_for_terminal_tool_started(&mut events, cx).await;
2359
2360    // Truncate the thread while the terminal is running
2361    thread
2362        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2363        .unwrap();
2364
2365    // Drive the executor to let cancellation complete
2366    let _ = collect_events_until_stop(&mut events, cx).await;
2367
2368    // Verify the terminal was killed
2369    assert!(
2370        handle.was_killed(),
2371        "expected terminal handle to be killed on truncate"
2372    );
2373
2374    // Verify the thread is empty after truncation
2375    thread.update(cx, |thread, _cx| {
2376        assert_eq!(
2377            thread.to_markdown(),
2378            "",
2379            "expected thread to be empty after truncating the only message"
2380        );
2381    });
2382
2383    // Verify we can send a new message after truncation
2384    verify_thread_recovery(&thread, &fake_model, cx).await;
2385}
2386
2387#[gpui::test]
2388async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2389    // Tests that cancellation properly kills all running terminal tools when multiple are active.
2390    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2391    always_allow_tools(cx);
2392    let fake_model = model.as_fake();
2393
2394    let environment = Rc::new(MultiTerminalEnvironment::new());
2395
2396    let mut events = thread
2397        .update(cx, |thread, cx| {
2398            thread.add_tool(crate::TerminalTool::new(
2399                thread.project().clone(),
2400                environment.clone(),
2401            ));
2402            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2403        })
2404        .unwrap();
2405
2406    cx.run_until_parked();
2407
2408    // Simulate the model calling two terminal tools
2409    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2410        LanguageModelToolUse {
2411            id: "terminal_tool_1".into(),
2412            name: TerminalTool::NAME.into(),
2413            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2414            input: json!({"command": "sleep 1000", "cd": "."}),
2415            is_input_complete: true,
2416            thought_signature: None,
2417        },
2418    ));
2419    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2420        LanguageModelToolUse {
2421            id: "terminal_tool_2".into(),
2422            name: TerminalTool::NAME.into(),
2423            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2424            input: json!({"command": "sleep 2000", "cd": "."}),
2425            is_input_complete: true,
2426            thought_signature: None,
2427        },
2428    ));
2429    fake_model.end_last_completion_stream();
2430
2431    // Wait for both terminal tools to start by counting terminal content updates
2432    let mut terminals_started = 0;
2433    let deadline = cx.executor().num_cpus() * 100;
2434    for _ in 0..deadline {
2435        cx.run_until_parked();
2436
2437        while let Some(Some(event)) = events.next().now_or_never() {
2438            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2439                update,
2440            ))) = &event
2441            {
2442                if update.fields.content.as_ref().is_some_and(|content| {
2443                    content
2444                        .iter()
2445                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2446                }) {
2447                    terminals_started += 1;
2448                    if terminals_started >= 2 {
2449                        break;
2450                    }
2451                }
2452            }
2453        }
2454        if terminals_started >= 2 {
2455            break;
2456        }
2457
2458        cx.background_executor
2459            .timer(Duration::from_millis(10))
2460            .await;
2461    }
2462    assert!(
2463        terminals_started >= 2,
2464        "expected 2 terminal tools to start, got {terminals_started}"
2465    );
2466
2467    // Cancel the thread while both terminals are running
2468    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2469
2470    // Collect remaining events
2471    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2472
2473    // Verify both terminal handles were killed
2474    let handles = environment.handles();
2475    assert_eq!(
2476        handles.len(),
2477        2,
2478        "expected 2 terminal handles to be created"
2479    );
2480    assert!(
2481        handles[0].was_killed(),
2482        "expected first terminal handle to be killed on cancellation"
2483    );
2484    assert!(
2485        handles[1].was_killed(),
2486        "expected second terminal handle to be killed on cancellation"
2487    );
2488
2489    // Verify we got a cancellation stop event
2490    assert_eq!(
2491        stop_events(remaining_events),
2492        vec![acp::StopReason::Cancelled],
2493    );
2494}
2495
2496#[gpui::test]
2497async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2498    // Tests that clicking the stop button on the terminal card (as opposed to the main
2499    // cancel button) properly reports user stopped via the was_stopped_by_user path.
2500    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2501    always_allow_tools(cx);
2502    let fake_model = model.as_fake();
2503
2504    let environment = Rc::new(cx.update(|cx| {
2505        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2506    }));
2507    let handle = environment.terminal_handle.clone().unwrap();
2508
2509    let mut events = thread
2510        .update(cx, |thread, cx| {
2511            thread.add_tool(crate::TerminalTool::new(
2512                thread.project().clone(),
2513                environment,
2514            ));
2515            thread.send(UserMessageId::new(), ["run a command"], cx)
2516        })
2517        .unwrap();
2518
2519    cx.run_until_parked();
2520
2521    // Simulate the model calling the terminal tool
2522    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2523        LanguageModelToolUse {
2524            id: "terminal_tool_1".into(),
2525            name: TerminalTool::NAME.into(),
2526            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2527            input: json!({"command": "sleep 1000", "cd": "."}),
2528            is_input_complete: true,
2529            thought_signature: None,
2530        },
2531    ));
2532    fake_model.end_last_completion_stream();
2533
2534    // Wait for the terminal tool to start running
2535    wait_for_terminal_tool_started(&mut events, cx).await;
2536
2537    // Simulate user clicking stop on the terminal card itself.
2538    // This sets the flag and signals exit (simulating what the real UI would do).
2539    handle.set_stopped_by_user(true);
2540    handle.killed.store(true, Ordering::SeqCst);
2541    handle.signal_exit();
2542
2543    // Wait for the tool to complete
2544    cx.run_until_parked();
2545
2546    // The thread continues after tool completion - simulate the model ending its turn
2547    fake_model
2548        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2549    fake_model.end_last_completion_stream();
2550
2551    // Collect remaining events
2552    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2553
2554    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2555    assert_eq!(
2556        stop_events(remaining_events),
2557        vec![acp::StopReason::EndTurn],
2558    );
2559
2560    // Verify the tool result indicates user stopped
2561    thread.update(cx, |thread, _cx| {
2562        let message = thread.last_received_or_pending_message().unwrap();
2563        let agent_message = message.as_agent_message().unwrap();
2564
2565        let tool_use = agent_message
2566            .content
2567            .iter()
2568            .find_map(|content| match content {
2569                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2570                _ => None,
2571            })
2572            .expect("expected tool use in agent message");
2573
2574        let tool_result = agent_message
2575            .tool_results
2576            .get(&tool_use.id)
2577            .expect("expected tool result");
2578
2579        let result_text = match &tool_result.content {
2580            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2581            _ => panic!("expected text content in tool result"),
2582        };
2583
2584        assert!(
2585            result_text.contains("The user stopped this command"),
2586            "expected tool result to indicate user stopped, got: {result_text}"
2587        );
2588    });
2589}
2590
2591#[gpui::test]
2592async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2593    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2594    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2595    always_allow_tools(cx);
2596    let fake_model = model.as_fake();
2597
2598    let environment = Rc::new(cx.update(|cx| {
2599        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2600    }));
2601    let handle = environment.terminal_handle.clone().unwrap();
2602
2603    let mut events = thread
2604        .update(cx, |thread, cx| {
2605            thread.add_tool(crate::TerminalTool::new(
2606                thread.project().clone(),
2607                environment,
2608            ));
2609            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2610        })
2611        .unwrap();
2612
2613    cx.run_until_parked();
2614
2615    // Simulate the model calling the terminal tool with a short timeout
2616    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2617        LanguageModelToolUse {
2618            id: "terminal_tool_1".into(),
2619            name: TerminalTool::NAME.into(),
2620            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2621            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2622            is_input_complete: true,
2623            thought_signature: None,
2624        },
2625    ));
2626    fake_model.end_last_completion_stream();
2627
2628    // Wait for the terminal tool to start running
2629    wait_for_terminal_tool_started(&mut events, cx).await;
2630
2631    // Advance clock past the timeout
2632    cx.executor().advance_clock(Duration::from_millis(200));
2633    cx.run_until_parked();
2634
2635    // The thread continues after tool completion - simulate the model ending its turn
2636    fake_model
2637        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2638    fake_model.end_last_completion_stream();
2639
2640    // Collect remaining events
2641    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2642
2643    // Verify the terminal was killed due to timeout
2644    assert!(
2645        handle.was_killed(),
2646        "expected terminal handle to be killed on timeout"
2647    );
2648
2649    // Verify we got an EndTurn (the tool completed, just with timeout)
2650    assert_eq!(
2651        stop_events(remaining_events),
2652        vec![acp::StopReason::EndTurn],
2653    );
2654
2655    // Verify the tool result indicates timeout, not user stopped
2656    thread.update(cx, |thread, _cx| {
2657        let message = thread.last_received_or_pending_message().unwrap();
2658        let agent_message = message.as_agent_message().unwrap();
2659
2660        let tool_use = agent_message
2661            .content
2662            .iter()
2663            .find_map(|content| match content {
2664                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2665                _ => None,
2666            })
2667            .expect("expected tool use in agent message");
2668
2669        let tool_result = agent_message
2670            .tool_results
2671            .get(&tool_use.id)
2672            .expect("expected tool result");
2673
2674        let result_text = match &tool_result.content {
2675            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2676            _ => panic!("expected text content in tool result"),
2677        };
2678
2679        assert!(
2680            result_text.contains("timed out"),
2681            "expected tool result to indicate timeout, got: {result_text}"
2682        );
2683        assert!(
2684            !result_text.contains("The user stopped"),
2685            "tool result should not mention user stopped when it timed out, got: {result_text}"
2686        );
2687    });
2688}
2689
2690#[gpui::test]
2691async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2692    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2693    let fake_model = model.as_fake();
2694
2695    let events_1 = thread
2696        .update(cx, |thread, cx| {
2697            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2698        })
2699        .unwrap();
2700    cx.run_until_parked();
2701    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2702    cx.run_until_parked();
2703
2704    let events_2 = thread
2705        .update(cx, |thread, cx| {
2706            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2707        })
2708        .unwrap();
2709    cx.run_until_parked();
2710    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2711    fake_model
2712        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2713    fake_model.end_last_completion_stream();
2714
2715    let events_1 = events_1.collect::<Vec<_>>().await;
2716    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2717    let events_2 = events_2.collect::<Vec<_>>().await;
2718    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2719}
2720
2721#[gpui::test]
2722async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2723    // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2724    // the retry loop waits on a timer. If the user switches models and sends a new message
2725    // during that delay, the old turn should exit immediately instead of retrying with the
2726    // stale model.
2727    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2728    let model_a = model.as_fake();
2729
2730    // Start a turn with model_a.
2731    let events_1 = thread
2732        .update(cx, |thread, cx| {
2733            thread.send(UserMessageId::new(), ["Hello"], cx)
2734        })
2735        .unwrap();
2736    cx.run_until_parked();
2737    assert_eq!(model_a.completion_count(), 1);
2738
2739    // Model returns a retryable upstream 500. The turn enters the retry delay.
2740    model_a.send_last_completion_stream_error(
2741        LanguageModelCompletionError::UpstreamProviderError {
2742            message: "Internal server error".to_string(),
2743            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2744            retry_after: None,
2745        },
2746    );
2747    model_a.end_last_completion_stream();
2748    cx.run_until_parked();
2749
2750    // The old completion was consumed; model_a has no pending requests yet because the
2751    // retry timer hasn't fired.
2752    assert_eq!(model_a.completion_count(), 0);
2753
2754    // Switch to model_b and send a new message. This cancels the old turn.
2755    let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2756        "fake", "model-b", "Model B", false,
2757    ));
2758    thread.update(cx, |thread, cx| {
2759        thread.set_model(model_b.clone(), cx);
2760    });
2761    let events_2 = thread
2762        .update(cx, |thread, cx| {
2763            thread.send(UserMessageId::new(), ["Continue"], cx)
2764        })
2765        .unwrap();
2766    cx.run_until_parked();
2767
2768    // model_b should have received its completion request.
2769    assert_eq!(model_b.as_fake().completion_count(), 1);
2770
2771    // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2772    cx.executor().advance_clock(Duration::from_secs(10));
2773    cx.run_until_parked();
2774
2775    // model_a must NOT have received another completion request — the cancelled turn
2776    // should have exited during the retry delay rather than retrying with the old model.
2777    assert_eq!(
2778        model_a.completion_count(),
2779        0,
2780        "old model should not receive a retry request after cancellation"
2781    );
2782
2783    // Complete model_b's turn.
2784    model_b
2785        .as_fake()
2786        .send_last_completion_stream_text_chunk("Done!");
2787    model_b
2788        .as_fake()
2789        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2790    model_b.as_fake().end_last_completion_stream();
2791
2792    let events_1 = events_1.collect::<Vec<_>>().await;
2793    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2794
2795    let events_2 = events_2.collect::<Vec<_>>().await;
2796    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2797}
2798
2799#[gpui::test]
2800async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2801    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2802    let fake_model = model.as_fake();
2803
2804    let events_1 = thread
2805        .update(cx, |thread, cx| {
2806            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2807        })
2808        .unwrap();
2809    cx.run_until_parked();
2810    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2811    fake_model
2812        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2813    fake_model.end_last_completion_stream();
2814    let events_1 = events_1.collect::<Vec<_>>().await;
2815
2816    let events_2 = thread
2817        .update(cx, |thread, cx| {
2818            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2819        })
2820        .unwrap();
2821    cx.run_until_parked();
2822    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2823    fake_model
2824        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2825    fake_model.end_last_completion_stream();
2826    let events_2 = events_2.collect::<Vec<_>>().await;
2827
2828    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2829    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2830}
2831
2832#[gpui::test]
2833async fn test_refusal(cx: &mut TestAppContext) {
2834    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2835    let fake_model = model.as_fake();
2836
2837    let events = thread
2838        .update(cx, |thread, cx| {
2839            thread.send(UserMessageId::new(), ["Hello"], cx)
2840        })
2841        .unwrap();
2842    cx.run_until_parked();
2843    thread.read_with(cx, |thread, _| {
2844        assert_eq!(
2845            thread.to_markdown(),
2846            indoc! {"
2847                ## User
2848
2849                Hello
2850            "}
2851        );
2852    });
2853
2854    fake_model.send_last_completion_stream_text_chunk("Hey!");
2855    cx.run_until_parked();
2856    thread.read_with(cx, |thread, _| {
2857        assert_eq!(
2858            thread.to_markdown(),
2859            indoc! {"
2860                ## User
2861
2862                Hello
2863
2864                ## Assistant
2865
2866                Hey!
2867            "}
2868        );
2869    });
2870
2871    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2872    fake_model
2873        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2874    let events = events.collect::<Vec<_>>().await;
2875    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2876    thread.read_with(cx, |thread, _| {
2877        assert_eq!(thread.to_markdown(), "");
2878    });
2879}
2880
2881#[gpui::test]
2882async fn test_truncate_first_message(cx: &mut TestAppContext) {
2883    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2884    let fake_model = model.as_fake();
2885
2886    let message_id = UserMessageId::new();
2887    thread
2888        .update(cx, |thread, cx| {
2889            thread.send(message_id.clone(), ["Hello"], cx)
2890        })
2891        .unwrap();
2892    cx.run_until_parked();
2893    thread.read_with(cx, |thread, _| {
2894        assert_eq!(
2895            thread.to_markdown(),
2896            indoc! {"
2897                ## User
2898
2899                Hello
2900            "}
2901        );
2902        assert_eq!(thread.latest_token_usage(), None);
2903    });
2904
2905    fake_model.send_last_completion_stream_text_chunk("Hey!");
2906    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2907        language_model::TokenUsage {
2908            input_tokens: 32_000,
2909            output_tokens: 16_000,
2910            cache_creation_input_tokens: 0,
2911            cache_read_input_tokens: 0,
2912        },
2913    ));
2914    cx.run_until_parked();
2915    thread.read_with(cx, |thread, _| {
2916        assert_eq!(
2917            thread.to_markdown(),
2918            indoc! {"
2919                ## User
2920
2921                Hello
2922
2923                ## Assistant
2924
2925                Hey!
2926            "}
2927        );
2928        assert_eq!(
2929            thread.latest_token_usage(),
2930            Some(acp_thread::TokenUsage {
2931                used_tokens: 32_000 + 16_000,
2932                max_tokens: 1_000_000,
2933                max_output_tokens: None,
2934                input_tokens: 32_000,
2935                output_tokens: 16_000,
2936            })
2937        );
2938    });
2939
2940    thread
2941        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2942        .unwrap();
2943    cx.run_until_parked();
2944    thread.read_with(cx, |thread, _| {
2945        assert_eq!(thread.to_markdown(), "");
2946        assert_eq!(thread.latest_token_usage(), None);
2947    });
2948
2949    // Ensure we can still send a new message after truncation.
2950    thread
2951        .update(cx, |thread, cx| {
2952            thread.send(UserMessageId::new(), ["Hi"], cx)
2953        })
2954        .unwrap();
2955    thread.update(cx, |thread, _cx| {
2956        assert_eq!(
2957            thread.to_markdown(),
2958            indoc! {"
2959                ## User
2960
2961                Hi
2962            "}
2963        );
2964    });
2965    cx.run_until_parked();
2966    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2967    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2968        language_model::TokenUsage {
2969            input_tokens: 40_000,
2970            output_tokens: 20_000,
2971            cache_creation_input_tokens: 0,
2972            cache_read_input_tokens: 0,
2973        },
2974    ));
2975    cx.run_until_parked();
2976    thread.read_with(cx, |thread, _| {
2977        assert_eq!(
2978            thread.to_markdown(),
2979            indoc! {"
2980                ## User
2981
2982                Hi
2983
2984                ## Assistant
2985
2986                Ahoy!
2987            "}
2988        );
2989
2990        assert_eq!(
2991            thread.latest_token_usage(),
2992            Some(acp_thread::TokenUsage {
2993                used_tokens: 40_000 + 20_000,
2994                max_tokens: 1_000_000,
2995                max_output_tokens: None,
2996                input_tokens: 40_000,
2997                output_tokens: 20_000,
2998            })
2999        );
3000    });
3001}
3002
3003#[gpui::test]
3004async fn test_truncate_second_message(cx: &mut TestAppContext) {
3005    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3006    let fake_model = model.as_fake();
3007
3008    thread
3009        .update(cx, |thread, cx| {
3010            thread.send(UserMessageId::new(), ["Message 1"], cx)
3011        })
3012        .unwrap();
3013    cx.run_until_parked();
3014    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3015    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3016        language_model::TokenUsage {
3017            input_tokens: 32_000,
3018            output_tokens: 16_000,
3019            cache_creation_input_tokens: 0,
3020            cache_read_input_tokens: 0,
3021        },
3022    ));
3023    fake_model.end_last_completion_stream();
3024    cx.run_until_parked();
3025
3026    let assert_first_message_state = |cx: &mut TestAppContext| {
3027        thread.clone().read_with(cx, |thread, _| {
3028            assert_eq!(
3029                thread.to_markdown(),
3030                indoc! {"
3031                    ## User
3032
3033                    Message 1
3034
3035                    ## Assistant
3036
3037                    Message 1 response
3038                "}
3039            );
3040
3041            assert_eq!(
3042                thread.latest_token_usage(),
3043                Some(acp_thread::TokenUsage {
3044                    used_tokens: 32_000 + 16_000,
3045                    max_tokens: 1_000_000,
3046                    max_output_tokens: None,
3047                    input_tokens: 32_000,
3048                    output_tokens: 16_000,
3049                })
3050            );
3051        });
3052    };
3053
3054    assert_first_message_state(cx);
3055
3056    let second_message_id = UserMessageId::new();
3057    thread
3058        .update(cx, |thread, cx| {
3059            thread.send(second_message_id.clone(), ["Message 2"], cx)
3060        })
3061        .unwrap();
3062    cx.run_until_parked();
3063
3064    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3065    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3066        language_model::TokenUsage {
3067            input_tokens: 40_000,
3068            output_tokens: 20_000,
3069            cache_creation_input_tokens: 0,
3070            cache_read_input_tokens: 0,
3071        },
3072    ));
3073    fake_model.end_last_completion_stream();
3074    cx.run_until_parked();
3075
3076    thread.read_with(cx, |thread, _| {
3077        assert_eq!(
3078            thread.to_markdown(),
3079            indoc! {"
3080                ## User
3081
3082                Message 1
3083
3084                ## Assistant
3085
3086                Message 1 response
3087
3088                ## User
3089
3090                Message 2
3091
3092                ## Assistant
3093
3094                Message 2 response
3095            "}
3096        );
3097
3098        assert_eq!(
3099            thread.latest_token_usage(),
3100            Some(acp_thread::TokenUsage {
3101                used_tokens: 40_000 + 20_000,
3102                max_tokens: 1_000_000,
3103                max_output_tokens: None,
3104                input_tokens: 40_000,
3105                output_tokens: 20_000,
3106            })
3107        );
3108    });
3109
3110    thread
3111        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3112        .unwrap();
3113    cx.run_until_parked();
3114
3115    assert_first_message_state(cx);
3116}
3117
3118#[gpui::test]
3119async fn test_title_generation(cx: &mut TestAppContext) {
3120    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3121    let fake_model = model.as_fake();
3122
3123    let summary_model = Arc::new(FakeLanguageModel::default());
3124    thread.update(cx, |thread, cx| {
3125        thread.set_summarization_model(Some(summary_model.clone()), cx)
3126    });
3127
3128    let send = thread
3129        .update(cx, |thread, cx| {
3130            thread.send(UserMessageId::new(), ["Hello"], cx)
3131        })
3132        .unwrap();
3133    cx.run_until_parked();
3134
3135    fake_model.send_last_completion_stream_text_chunk("Hey!");
3136    fake_model.end_last_completion_stream();
3137    cx.run_until_parked();
3138    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), None));
3139
3140    // Ensure the summary model has been invoked to generate a title.
3141    summary_model.send_last_completion_stream_text_chunk("Hello ");
3142    summary_model.send_last_completion_stream_text_chunk("world\nG");
3143    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3144    summary_model.end_last_completion_stream();
3145    send.collect::<Vec<_>>().await;
3146    cx.run_until_parked();
3147    thread.read_with(cx, |thread, _| {
3148        assert_eq!(thread.title(), Some("Hello world".into()))
3149    });
3150
3151    // Send another message, ensuring no title is generated this time.
3152    let send = thread
3153        .update(cx, |thread, cx| {
3154            thread.send(UserMessageId::new(), ["Hello again"], cx)
3155        })
3156        .unwrap();
3157    cx.run_until_parked();
3158    fake_model.send_last_completion_stream_text_chunk("Hey again!");
3159    fake_model.end_last_completion_stream();
3160    cx.run_until_parked();
3161    assert_eq!(summary_model.pending_completions(), Vec::new());
3162    send.collect::<Vec<_>>().await;
3163    thread.read_with(cx, |thread, _| {
3164        assert_eq!(thread.title(), Some("Hello world".into()))
3165    });
3166}
3167
3168#[gpui::test]
3169async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3170    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3171    let fake_model = model.as_fake();
3172
3173    let _events = thread
3174        .update(cx, |thread, cx| {
3175            thread.add_tool(ToolRequiringPermission);
3176            thread.add_tool(EchoTool);
3177            thread.send(UserMessageId::new(), ["Hey!"], cx)
3178        })
3179        .unwrap();
3180    cx.run_until_parked();
3181
3182    let permission_tool_use = LanguageModelToolUse {
3183        id: "tool_id_1".into(),
3184        name: ToolRequiringPermission::NAME.into(),
3185        raw_input: "{}".into(),
3186        input: json!({}),
3187        is_input_complete: true,
3188        thought_signature: None,
3189    };
3190    let echo_tool_use = LanguageModelToolUse {
3191        id: "tool_id_2".into(),
3192        name: EchoTool::NAME.into(),
3193        raw_input: json!({"text": "test"}).to_string(),
3194        input: json!({"text": "test"}),
3195        is_input_complete: true,
3196        thought_signature: None,
3197    };
3198    fake_model.send_last_completion_stream_text_chunk("Hi!");
3199    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3200        permission_tool_use,
3201    ));
3202    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3203        echo_tool_use.clone(),
3204    ));
3205    fake_model.end_last_completion_stream();
3206    cx.run_until_parked();
3207
3208    // Ensure pending tools are skipped when building a request.
3209    let request = thread
3210        .read_with(cx, |thread, cx| {
3211            thread.build_completion_request(CompletionIntent::EditFile, cx)
3212        })
3213        .unwrap();
3214    assert_eq!(
3215        request.messages[1..],
3216        vec![
3217            LanguageModelRequestMessage {
3218                role: Role::User,
3219                content: vec!["Hey!".into()],
3220                cache: true,
3221                reasoning_details: None,
3222            },
3223            LanguageModelRequestMessage {
3224                role: Role::Assistant,
3225                content: vec![
3226                    MessageContent::Text("Hi!".into()),
3227                    MessageContent::ToolUse(echo_tool_use.clone())
3228                ],
3229                cache: false,
3230                reasoning_details: None,
3231            },
3232            LanguageModelRequestMessage {
3233                role: Role::User,
3234                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3235                    tool_use_id: echo_tool_use.id.clone(),
3236                    tool_name: echo_tool_use.name,
3237                    is_error: false,
3238                    content: "test".into(),
3239                    output: Some("test".into())
3240                })],
3241                cache: false,
3242                reasoning_details: None,
3243            },
3244        ],
3245    );
3246}
3247
3248#[gpui::test]
3249async fn test_agent_connection(cx: &mut TestAppContext) {
3250    cx.update(settings::init);
3251    let templates = Templates::new();
3252
3253    // Initialize language model system with test provider
3254    cx.update(|cx| {
3255        gpui_tokio::init(cx);
3256
3257        let http_client = FakeHttpClient::with_404_response();
3258        let clock = Arc::new(clock::FakeSystemClock::new());
3259        let client = Client::new(clock, http_client, cx);
3260        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3261        language_model::init(cx);
3262        RefreshLlmTokenListener::register(client.clone(), user_store.clone(), cx);
3263        language_models::init(user_store, client.clone(), cx);
3264        LanguageModelRegistry::test(cx);
3265    });
3266    cx.executor().forbid_parking();
3267
3268    // Create a project for new_thread
3269    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3270    fake_fs.insert_tree(path!("/test"), json!({})).await;
3271    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3272    let cwd = PathList::new(&[Path::new("/test")]);
3273    let thread_store = cx.new(|cx| ThreadStore::new(cx));
3274
3275    // Create agent and connection
3276    let agent = cx
3277        .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3278    let connection = NativeAgentConnection(agent.clone());
3279
3280    // Create a thread using new_thread
3281    let connection_rc = Rc::new(connection.clone());
3282    let acp_thread = cx
3283        .update(|cx| connection_rc.new_session(project, cwd, cx))
3284        .await
3285        .expect("new_thread should succeed");
3286
3287    // Get the session_id from the AcpThread
3288    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3289
3290    // Test model_selector returns Some
3291    let selector_opt = connection.model_selector(&session_id);
3292    assert!(
3293        selector_opt.is_some(),
3294        "agent should always support ModelSelector"
3295    );
3296    let selector = selector_opt.unwrap();
3297
3298    // Test list_models
3299    let listed_models = cx
3300        .update(|cx| selector.list_models(cx))
3301        .await
3302        .expect("list_models should succeed");
3303    let AgentModelList::Grouped(listed_models) = listed_models else {
3304        panic!("Unexpected model list type");
3305    };
3306    assert!(!listed_models.is_empty(), "should have at least one model");
3307    assert_eq!(
3308        listed_models[&AgentModelGroupName("Fake".into())][0]
3309            .id
3310            .0
3311            .as_ref(),
3312        "fake/fake"
3313    );
3314
3315    // Test selected_model returns the default
3316    let model = cx
3317        .update(|cx| selector.selected_model(cx))
3318        .await
3319        .expect("selected_model should succeed");
3320    let model = cx
3321        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3322        .unwrap();
3323    let model = model.as_fake();
3324    assert_eq!(model.id().0, "fake", "should return default model");
3325
3326    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3327    cx.run_until_parked();
3328    model.send_last_completion_stream_text_chunk("def");
3329    cx.run_until_parked();
3330    acp_thread.read_with(cx, |thread, cx| {
3331        assert_eq!(
3332            thread.to_markdown(cx),
3333            indoc! {"
3334                ## User
3335
3336                abc
3337
3338                ## Assistant
3339
3340                def
3341
3342            "}
3343        )
3344    });
3345
3346    // Test cancel
3347    cx.update(|cx| connection.cancel(&session_id, cx));
3348    request.await.expect("prompt should fail gracefully");
3349
3350    // Explicitly close the session and drop the ACP thread.
3351    cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3352        .await
3353        .unwrap();
3354    drop(acp_thread);
3355    let result = cx
3356        .update(|cx| {
3357            connection.prompt(
3358                Some(acp_thread::UserMessageId::new()),
3359                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3360                cx,
3361            )
3362        })
3363        .await;
3364    assert_eq!(
3365        result.as_ref().unwrap_err().to_string(),
3366        "Session not found",
3367        "unexpected result: {:?}",
3368        result
3369    );
3370}
3371
3372#[gpui::test]
3373async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3374    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3375    thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3376    let fake_model = model.as_fake();
3377
3378    let mut events = thread
3379        .update(cx, |thread, cx| {
3380            thread.send(UserMessageId::new(), ["Echo something"], cx)
3381        })
3382        .unwrap();
3383    cx.run_until_parked();
3384
3385    // Simulate streaming partial input.
3386    let input = json!({});
3387    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3388        LanguageModelToolUse {
3389            id: "1".into(),
3390            name: EchoTool::NAME.into(),
3391            raw_input: input.to_string(),
3392            input,
3393            is_input_complete: false,
3394            thought_signature: None,
3395        },
3396    ));
3397
3398    // Input streaming completed
3399    let input = json!({ "text": "Hello!" });
3400    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3401        LanguageModelToolUse {
3402            id: "1".into(),
3403            name: "echo".into(),
3404            raw_input: input.to_string(),
3405            input,
3406            is_input_complete: true,
3407            thought_signature: None,
3408        },
3409    ));
3410    fake_model.end_last_completion_stream();
3411    cx.run_until_parked();
3412
3413    let tool_call = expect_tool_call(&mut events).await;
3414    assert_eq!(
3415        tool_call,
3416        acp::ToolCall::new("1", "Echo")
3417            .raw_input(json!({}))
3418            .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3419    );
3420    let update = expect_tool_call_update_fields(&mut events).await;
3421    assert_eq!(
3422        update,
3423        acp::ToolCallUpdate::new(
3424            "1",
3425            acp::ToolCallUpdateFields::new()
3426                .title("Echo")
3427                .kind(acp::ToolKind::Other)
3428                .raw_input(json!({ "text": "Hello!"}))
3429        )
3430    );
3431    let update = expect_tool_call_update_fields(&mut events).await;
3432    assert_eq!(
3433        update,
3434        acp::ToolCallUpdate::new(
3435            "1",
3436            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3437        )
3438    );
3439    let update = expect_tool_call_update_fields(&mut events).await;
3440    assert_eq!(
3441        update,
3442        acp::ToolCallUpdate::new(
3443            "1",
3444            acp::ToolCallUpdateFields::new()
3445                .status(acp::ToolCallStatus::Completed)
3446                .raw_output("Hello!")
3447        )
3448    );
3449}
3450
3451#[gpui::test]
3452async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3453    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3454    thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3455    let fake_model = model.as_fake();
3456
3457    let mut events = thread
3458        .update(cx, |thread, cx| {
3459            thread.send(UserMessageId::new(), ["Make a plan"], cx)
3460        })
3461        .unwrap();
3462    cx.run_until_parked();
3463
3464    let input = json!({
3465        "plan": [
3466            {
3467                "step": "Inspect the code",
3468                "status": "completed",
3469            },
3470            {
3471                "step": "Implement the tool",
3472                "status": "in_progress"
3473            },
3474            {
3475                "step": "Run tests",
3476                "status": "pending",
3477            }
3478        ]
3479    });
3480    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3481        LanguageModelToolUse {
3482            id: "plan_1".into(),
3483            name: UpdatePlanTool::NAME.into(),
3484            raw_input: input.to_string(),
3485            input,
3486            is_input_complete: true,
3487            thought_signature: None,
3488        },
3489    ));
3490    fake_model.end_last_completion_stream();
3491    cx.run_until_parked();
3492
3493    let tool_call = expect_tool_call(&mut events).await;
3494    assert_eq!(
3495        tool_call,
3496        acp::ToolCall::new("plan_1", "Update plan")
3497            .kind(acp::ToolKind::Think)
3498            .raw_input(json!({
3499                "plan": [
3500                    {
3501                        "step": "Inspect the code",
3502                        "status": "completed",
3503                    },
3504                    {
3505                        "step": "Implement the tool",
3506                        "status": "in_progress"
3507                    },
3508                    {
3509                        "step": "Run tests",
3510                        "status": "pending",
3511                    }
3512                ]
3513            }))
3514            .meta(acp::Meta::from_iter([(
3515                "tool_name".into(),
3516                "update_plan".into()
3517            )]))
3518    );
3519
3520    let update = expect_tool_call_update_fields(&mut events).await;
3521    assert_eq!(
3522        update,
3523        acp::ToolCallUpdate::new(
3524            "plan_1",
3525            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3526        )
3527    );
3528
3529    let plan = expect_plan(&mut events).await;
3530    assert_eq!(
3531        plan,
3532        acp::Plan::new(vec![
3533            acp::PlanEntry::new(
3534                "Inspect the code",
3535                acp::PlanEntryPriority::Medium,
3536                acp::PlanEntryStatus::Completed,
3537            ),
3538            acp::PlanEntry::new(
3539                "Implement the tool",
3540                acp::PlanEntryPriority::Medium,
3541                acp::PlanEntryStatus::InProgress,
3542            ),
3543            acp::PlanEntry::new(
3544                "Run tests",
3545                acp::PlanEntryPriority::Medium,
3546                acp::PlanEntryStatus::Pending,
3547            ),
3548        ])
3549    );
3550
3551    let update = expect_tool_call_update_fields(&mut events).await;
3552    assert_eq!(
3553        update,
3554        acp::ToolCallUpdate::new(
3555            "plan_1",
3556            acp::ToolCallUpdateFields::new()
3557                .status(acp::ToolCallStatus::Completed)
3558                .raw_output("Plan updated")
3559        )
3560    );
3561}
3562
3563#[gpui::test]
3564async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3565    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3566    let fake_model = model.as_fake();
3567
3568    let mut events = thread
3569        .update(cx, |thread, cx| {
3570            thread.send(UserMessageId::new(), ["Hello!"], cx)
3571        })
3572        .unwrap();
3573    cx.run_until_parked();
3574
3575    fake_model.send_last_completion_stream_text_chunk("Hey!");
3576    fake_model.end_last_completion_stream();
3577
3578    let mut retry_events = Vec::new();
3579    while let Some(Ok(event)) = events.next().await {
3580        match event {
3581            ThreadEvent::Retry(retry_status) => {
3582                retry_events.push(retry_status);
3583            }
3584            ThreadEvent::Stop(..) => break,
3585            _ => {}
3586        }
3587    }
3588
3589    assert_eq!(retry_events.len(), 0);
3590    thread.read_with(cx, |thread, _cx| {
3591        assert_eq!(
3592            thread.to_markdown(),
3593            indoc! {"
3594                ## User
3595
3596                Hello!
3597
3598                ## Assistant
3599
3600                Hey!
3601            "}
3602        )
3603    });
3604}
3605
3606#[gpui::test]
3607async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3608    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3609    let fake_model = model.as_fake();
3610
3611    let mut events = thread
3612        .update(cx, |thread, cx| {
3613            thread.send(UserMessageId::new(), ["Hello!"], cx)
3614        })
3615        .unwrap();
3616    cx.run_until_parked();
3617
3618    fake_model.send_last_completion_stream_text_chunk("Hey,");
3619    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3620        provider: LanguageModelProviderName::new("Anthropic"),
3621        retry_after: Some(Duration::from_secs(3)),
3622    });
3623    fake_model.end_last_completion_stream();
3624
3625    cx.executor().advance_clock(Duration::from_secs(3));
3626    cx.run_until_parked();
3627
3628    fake_model.send_last_completion_stream_text_chunk("there!");
3629    fake_model.end_last_completion_stream();
3630    cx.run_until_parked();
3631
3632    let mut retry_events = Vec::new();
3633    while let Some(Ok(event)) = events.next().await {
3634        match event {
3635            ThreadEvent::Retry(retry_status) => {
3636                retry_events.push(retry_status);
3637            }
3638            ThreadEvent::Stop(..) => break,
3639            _ => {}
3640        }
3641    }
3642
3643    assert_eq!(retry_events.len(), 1);
3644    assert!(matches!(
3645        retry_events[0],
3646        acp_thread::RetryStatus { attempt: 1, .. }
3647    ));
3648    thread.read_with(cx, |thread, _cx| {
3649        assert_eq!(
3650            thread.to_markdown(),
3651            indoc! {"
3652                ## User
3653
3654                Hello!
3655
3656                ## Assistant
3657
3658                Hey,
3659
3660                [resume]
3661
3662                ## Assistant
3663
3664                there!
3665            "}
3666        )
3667    });
3668}
3669
3670#[gpui::test]
3671async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3672    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3673    let fake_model = model.as_fake();
3674
3675    let events = thread
3676        .update(cx, |thread, cx| {
3677            thread.add_tool(EchoTool);
3678            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3679        })
3680        .unwrap();
3681    cx.run_until_parked();
3682
3683    let tool_use_1 = LanguageModelToolUse {
3684        id: "tool_1".into(),
3685        name: EchoTool::NAME.into(),
3686        raw_input: json!({"text": "test"}).to_string(),
3687        input: json!({"text": "test"}),
3688        is_input_complete: true,
3689        thought_signature: None,
3690    };
3691    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3692        tool_use_1.clone(),
3693    ));
3694    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3695        provider: LanguageModelProviderName::new("Anthropic"),
3696        retry_after: Some(Duration::from_secs(3)),
3697    });
3698    fake_model.end_last_completion_stream();
3699
3700    cx.executor().advance_clock(Duration::from_secs(3));
3701    let completion = fake_model.pending_completions().pop().unwrap();
3702    assert_eq!(
3703        completion.messages[1..],
3704        vec![
3705            LanguageModelRequestMessage {
3706                role: Role::User,
3707                content: vec!["Call the echo tool!".into()],
3708                cache: false,
3709                reasoning_details: None,
3710            },
3711            LanguageModelRequestMessage {
3712                role: Role::Assistant,
3713                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3714                cache: false,
3715                reasoning_details: None,
3716            },
3717            LanguageModelRequestMessage {
3718                role: Role::User,
3719                content: vec![language_model::MessageContent::ToolResult(
3720                    LanguageModelToolResult {
3721                        tool_use_id: tool_use_1.id.clone(),
3722                        tool_name: tool_use_1.name.clone(),
3723                        is_error: false,
3724                        content: "test".into(),
3725                        output: Some("test".into())
3726                    }
3727                )],
3728                cache: true,
3729                reasoning_details: None,
3730            },
3731        ]
3732    );
3733
3734    fake_model.send_last_completion_stream_text_chunk("Done");
3735    fake_model.end_last_completion_stream();
3736    cx.run_until_parked();
3737    events.collect::<Vec<_>>().await;
3738    thread.read_with(cx, |thread, _cx| {
3739        assert_eq!(
3740            thread.last_received_or_pending_message(),
3741            Some(Message::Agent(AgentMessage {
3742                content: vec![AgentMessageContent::Text("Done".into())],
3743                tool_results: IndexMap::default(),
3744                reasoning_details: None,
3745            }))
3746        );
3747    })
3748}
3749
3750#[gpui::test]
3751async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3752    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3753    let fake_model = model.as_fake();
3754
3755    let mut events = thread
3756        .update(cx, |thread, cx| {
3757            thread.send(UserMessageId::new(), ["Hello!"], cx)
3758        })
3759        .unwrap();
3760    cx.run_until_parked();
3761
3762    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3763        fake_model.send_last_completion_stream_error(
3764            LanguageModelCompletionError::ServerOverloaded {
3765                provider: LanguageModelProviderName::new("Anthropic"),
3766                retry_after: Some(Duration::from_secs(3)),
3767            },
3768        );
3769        fake_model.end_last_completion_stream();
3770        cx.executor().advance_clock(Duration::from_secs(3));
3771        cx.run_until_parked();
3772    }
3773
3774    let mut errors = Vec::new();
3775    let mut retry_events = Vec::new();
3776    while let Some(event) = events.next().await {
3777        match event {
3778            Ok(ThreadEvent::Retry(retry_status)) => {
3779                retry_events.push(retry_status);
3780            }
3781            Ok(ThreadEvent::Stop(..)) => break,
3782            Err(error) => errors.push(error),
3783            _ => {}
3784        }
3785    }
3786
3787    assert_eq!(
3788        retry_events.len(),
3789        crate::thread::MAX_RETRY_ATTEMPTS as usize
3790    );
3791    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3792        assert_eq!(retry_events[i].attempt, i + 1);
3793    }
3794    assert_eq!(errors.len(), 1);
3795    let error = errors[0]
3796        .downcast_ref::<LanguageModelCompletionError>()
3797        .unwrap();
3798    assert!(matches!(
3799        error,
3800        LanguageModelCompletionError::ServerOverloaded { .. }
3801    ));
3802}
3803
3804#[gpui::test]
3805async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3806    cx: &mut TestAppContext,
3807) {
3808    init_test(cx);
3809    always_allow_tools(cx);
3810
3811    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3812    let fake_model = model.as_fake();
3813
3814    thread.update(cx, |thread, _cx| {
3815        thread.add_tool(StreamingEchoTool::new());
3816    });
3817
3818    let _events = thread
3819        .update(cx, |thread, cx| {
3820            thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3821        })
3822        .unwrap();
3823    cx.run_until_parked();
3824
3825    // Send a partial tool use (is_input_complete = false), simulating the LLM
3826    // streaming input for a tool.
3827    let tool_use = LanguageModelToolUse {
3828        id: "tool_1".into(),
3829        name: "streaming_echo".into(),
3830        raw_input: r#"{"text": "partial"}"#.into(),
3831        input: json!({"text": "partial"}),
3832        is_input_complete: false,
3833        thought_signature: None,
3834    };
3835    fake_model
3836        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3837    cx.run_until_parked();
3838
3839    // Send a stream error WITHOUT ever sending is_input_complete = true.
3840    // Before the fix, this would deadlock: the tool waits for more partials
3841    // (or cancellation), run_turn_internal waits for the tool, and the sender
3842    // keeping the channel open lives inside RunningTurn.
3843    fake_model.send_last_completion_stream_error(
3844        LanguageModelCompletionError::UpstreamProviderError {
3845            message: "Internal server error".to_string(),
3846            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3847            retry_after: None,
3848        },
3849    );
3850    fake_model.end_last_completion_stream();
3851
3852    // Advance past the retry delay so run_turn_internal retries.
3853    cx.executor().advance_clock(Duration::from_secs(5));
3854    cx.run_until_parked();
3855
3856    // The retry request should contain the streaming tool's error result,
3857    // proving the tool terminated and its result was forwarded.
3858    let completion = fake_model
3859        .pending_completions()
3860        .pop()
3861        .expect("No running turn");
3862    assert_eq!(
3863        completion.messages[1..],
3864        vec![
3865            LanguageModelRequestMessage {
3866                role: Role::User,
3867                content: vec!["Use the streaming_echo tool".into()],
3868                cache: false,
3869                reasoning_details: None,
3870            },
3871            LanguageModelRequestMessage {
3872                role: Role::Assistant,
3873                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3874                cache: false,
3875                reasoning_details: None,
3876            },
3877            LanguageModelRequestMessage {
3878                role: Role::User,
3879                content: vec![language_model::MessageContent::ToolResult(
3880                    LanguageModelToolResult {
3881                        tool_use_id: tool_use.id.clone(),
3882                        tool_name: tool_use.name,
3883                        is_error: true,
3884                        content: "Failed to receive tool input: tool input was not fully received"
3885                            .into(),
3886                        output: Some(
3887                            "Failed to receive tool input: tool input was not fully received"
3888                                .into()
3889                        ),
3890                    }
3891                )],
3892                cache: true,
3893                reasoning_details: None,
3894            },
3895        ]
3896    );
3897
3898    // Finish the retry round so the turn completes cleanly.
3899    fake_model.send_last_completion_stream_text_chunk("Done");
3900    fake_model.end_last_completion_stream();
3901    cx.run_until_parked();
3902
3903    thread.read_with(cx, |thread, _cx| {
3904        assert!(
3905            thread.is_turn_complete(),
3906            "Thread should not be stuck; the turn should have completed",
3907        );
3908    });
3909}
3910
3911#[gpui::test]
3912async fn test_streaming_tool_json_parse_error_is_forwarded_to_running_tool(
3913    cx: &mut TestAppContext,
3914) {
3915    init_test(cx);
3916    always_allow_tools(cx);
3917
3918    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3919    let fake_model = model.as_fake();
3920
3921    thread.update(cx, |thread, _cx| {
3922        thread.add_tool(StreamingJsonErrorContextTool);
3923    });
3924
3925    let _events = thread
3926        .update(cx, |thread, cx| {
3927            thread.send(
3928                UserMessageId::new(),
3929                ["Use the streaming_json_error_context tool"],
3930                cx,
3931            )
3932        })
3933        .unwrap();
3934    cx.run_until_parked();
3935
3936    let tool_use = LanguageModelToolUse {
3937        id: "tool_1".into(),
3938        name: StreamingJsonErrorContextTool::NAME.into(),
3939        raw_input: r#"{"text": "partial"#.into(),
3940        input: json!({"text": "partial"}),
3941        is_input_complete: false,
3942        thought_signature: None,
3943    };
3944    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use));
3945    cx.run_until_parked();
3946
3947    fake_model.send_last_completion_stream_event(
3948        LanguageModelCompletionEvent::ToolUseJsonParseError {
3949            id: "tool_1".into(),
3950            tool_name: StreamingJsonErrorContextTool::NAME.into(),
3951            raw_input: r#"{"text": "partial"#.into(),
3952            json_parse_error: "EOF while parsing a string at line 1 column 17".into(),
3953        },
3954    );
3955    fake_model
3956        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
3957    fake_model.end_last_completion_stream();
3958    cx.run_until_parked();
3959
3960    cx.executor().advance_clock(Duration::from_secs(5));
3961    cx.run_until_parked();
3962
3963    let completion = fake_model
3964        .pending_completions()
3965        .pop()
3966        .expect("No running turn");
3967
3968    let tool_results: Vec<_> = completion
3969        .messages
3970        .iter()
3971        .flat_map(|message| &message.content)
3972        .filter_map(|content| match content {
3973            MessageContent::ToolResult(result)
3974                if result.tool_use_id == language_model::LanguageModelToolUseId::from("tool_1") =>
3975            {
3976                Some(result)
3977            }
3978            _ => None,
3979        })
3980        .collect();
3981
3982    assert_eq!(
3983        tool_results.len(),
3984        1,
3985        "Expected exactly 1 tool result for tool_1, got {}: {:#?}",
3986        tool_results.len(),
3987        tool_results
3988    );
3989
3990    let result = tool_results[0];
3991    assert!(result.is_error);
3992    let content_text = match &result.content {
3993        language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
3994        other => panic!("Expected text content, got {:?}", other),
3995    };
3996    assert!(
3997        content_text.contains("Saw partial text 'partial' before invalid JSON"),
3998        "Expected tool-enriched partial context, got: {content_text}"
3999    );
4000    assert!(
4001        content_text
4002            .contains("Error parsing input JSON: EOF while parsing a string at line 1 column 17"),
4003        "Expected forwarded JSON parse error, got: {content_text}"
4004    );
4005    assert!(
4006        !content_text.contains("tool input was not fully received"),
4007        "Should not contain orphaned sender error, got: {content_text}"
4008    );
4009
4010    fake_model.send_last_completion_stream_text_chunk("Done");
4011    fake_model.end_last_completion_stream();
4012    cx.run_until_parked();
4013
4014    thread.read_with(cx, |thread, _cx| {
4015        assert!(
4016            thread.is_turn_complete(),
4017            "Thread should not be stuck; the turn should have completed",
4018        );
4019    });
4020}
4021
4022/// Filters out the stop events for asserting against in tests
4023fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
4024    result_events
4025        .into_iter()
4026        .filter_map(|event| match event.unwrap() {
4027            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
4028            _ => None,
4029        })
4030        .collect()
4031}
4032
4033struct ThreadTest {
4034    model: Arc<dyn LanguageModel>,
4035    thread: Entity<Thread>,
4036    project_context: Entity<ProjectContext>,
4037    context_server_store: Entity<ContextServerStore>,
4038    fs: Arc<FakeFs>,
4039}
4040
4041enum TestModel {
4042    Sonnet4,
4043    Fake,
4044}
4045
4046impl TestModel {
4047    fn id(&self) -> LanguageModelId {
4048        match self {
4049            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
4050            TestModel::Fake => unreachable!(),
4051        }
4052    }
4053}
4054
4055async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
4056    cx.executor().allow_parking();
4057
4058    let fs = FakeFs::new(cx.background_executor.clone());
4059    fs.create_dir(paths::settings_file().parent().unwrap())
4060        .await
4061        .unwrap();
4062    fs.insert_file(
4063        paths::settings_file(),
4064        json!({
4065            "agent": {
4066                "default_profile": "test-profile",
4067                "profiles": {
4068                    "test-profile": {
4069                        "name": "Test Profile",
4070                        "tools": {
4071                            EchoTool::NAME: true,
4072                            DelayTool::NAME: true,
4073                            WordListTool::NAME: true,
4074                            ToolRequiringPermission::NAME: true,
4075                            InfiniteTool::NAME: true,
4076                            CancellationAwareTool::NAME: true,
4077                            StreamingEchoTool::NAME: true,
4078                            StreamingJsonErrorContextTool::NAME: true,
4079                            StreamingFailingEchoTool::NAME: true,
4080                            TerminalTool::NAME: true,
4081                            UpdatePlanTool::NAME: true,
4082                        }
4083                    }
4084                }
4085            }
4086        })
4087        .to_string()
4088        .into_bytes(),
4089    )
4090    .await;
4091
4092    cx.update(|cx| {
4093        settings::init(cx);
4094
4095        match model {
4096            TestModel::Fake => {}
4097            TestModel::Sonnet4 => {
4098                gpui_tokio::init(cx);
4099                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
4100                cx.set_http_client(Arc::new(http_client));
4101                let client = Client::production(cx);
4102                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
4103                language_model::init(cx);
4104                RefreshLlmTokenListener::register(client.clone(), user_store.clone(), cx);
4105                language_models::init(user_store, client.clone(), cx);
4106            }
4107        };
4108
4109        watch_settings(fs.clone(), cx);
4110    });
4111
4112    let templates = Templates::new();
4113
4114    fs.insert_tree(path!("/test"), json!({})).await;
4115    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
4116
4117    let model = cx
4118        .update(|cx| {
4119            if let TestModel::Fake = model {
4120                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
4121            } else {
4122                let model_id = model.id();
4123                let models = LanguageModelRegistry::read_global(cx);
4124                let model = models
4125                    .available_models(cx)
4126                    .find(|model| model.id() == model_id)
4127                    .unwrap();
4128
4129                let provider = models.provider(&model.provider_id()).unwrap();
4130                let authenticated = provider.authenticate(cx);
4131
4132                cx.spawn(async move |_cx| {
4133                    authenticated.await.unwrap();
4134                    model
4135                })
4136            }
4137        })
4138        .await;
4139
4140    let project_context = cx.new(|_cx| ProjectContext::default());
4141    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4142    let context_server_registry =
4143        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4144    let thread = cx.new(|cx| {
4145        Thread::new(
4146            project,
4147            project_context.clone(),
4148            context_server_registry,
4149            templates,
4150            Some(model.clone()),
4151            cx,
4152        )
4153    });
4154    ThreadTest {
4155        model,
4156        thread,
4157        project_context,
4158        context_server_store,
4159        fs,
4160    }
4161}
4162
4163#[cfg(test)]
4164#[ctor::ctor]
4165fn init_logger() {
4166    if std::env::var("RUST_LOG").is_ok() {
4167        env_logger::init();
4168    }
4169}
4170
4171fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4172    let fs = fs.clone();
4173    cx.spawn({
4174        async move |cx| {
4175            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4176                cx.background_executor(),
4177                fs,
4178                paths::settings_file().clone(),
4179            );
4180            let _watcher_task = watcher_task;
4181
4182            while let Some(new_settings_content) = new_settings_content_rx.next().await {
4183                cx.update(|cx| {
4184                    SettingsStore::update_global(cx, |settings, cx| {
4185                        settings.set_user_settings(&new_settings_content, cx)
4186                    })
4187                })
4188                .ok();
4189            }
4190        }
4191    })
4192    .detach();
4193}
4194
4195fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4196    completion
4197        .tools
4198        .iter()
4199        .map(|tool| tool.name.clone())
4200        .collect()
4201}
4202
4203fn setup_context_server(
4204    name: &'static str,
4205    tools: Vec<context_server::types::Tool>,
4206    context_server_store: &Entity<ContextServerStore>,
4207    cx: &mut TestAppContext,
4208) -> mpsc::UnboundedReceiver<(
4209    context_server::types::CallToolParams,
4210    oneshot::Sender<context_server::types::CallToolResponse>,
4211)> {
4212    cx.update(|cx| {
4213        let mut settings = ProjectSettings::get_global(cx).clone();
4214        settings.context_servers.insert(
4215            name.into(),
4216            project::project_settings::ContextServerSettings::Stdio {
4217                enabled: true,
4218                remote: false,
4219                command: ContextServerCommand {
4220                    path: "somebinary".into(),
4221                    args: Vec::new(),
4222                    env: None,
4223                    timeout: None,
4224                },
4225            },
4226        );
4227        ProjectSettings::override_global(settings, cx);
4228    });
4229
4230    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4231    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4232        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4233            context_server::types::InitializeResponse {
4234                protocol_version: context_server::types::ProtocolVersion(
4235                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4236                ),
4237                server_info: context_server::types::Implementation {
4238                    name: name.into(),
4239                    version: "1.0.0".to_string(),
4240                },
4241                capabilities: context_server::types::ServerCapabilities {
4242                    tools: Some(context_server::types::ToolsCapabilities {
4243                        list_changed: Some(true),
4244                    }),
4245                    ..Default::default()
4246                },
4247                meta: None,
4248            }
4249        })
4250        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4251            let tools = tools.clone();
4252            async move {
4253                context_server::types::ListToolsResponse {
4254                    tools,
4255                    next_cursor: None,
4256                    meta: None,
4257                }
4258            }
4259        })
4260        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4261            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4262            async move {
4263                let (response_tx, response_rx) = oneshot::channel();
4264                mcp_tool_calls_tx
4265                    .unbounded_send((params, response_tx))
4266                    .unwrap();
4267                response_rx.await.unwrap()
4268            }
4269        });
4270    context_server_store.update(cx, |store, cx| {
4271        store.start_server(
4272            Arc::new(ContextServer::new(
4273                ContextServerId(name.into()),
4274                Arc::new(fake_transport),
4275            )),
4276            cx,
4277        );
4278    });
4279    cx.run_until_parked();
4280    mcp_tool_calls_rx
4281}
4282
4283#[gpui::test]
4284async fn test_tokens_before_message(cx: &mut TestAppContext) {
4285    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4286    let fake_model = model.as_fake();
4287
4288    // First message
4289    let message_1_id = UserMessageId::new();
4290    thread
4291        .update(cx, |thread, cx| {
4292            thread.send(message_1_id.clone(), ["First message"], cx)
4293        })
4294        .unwrap();
4295    cx.run_until_parked();
4296
4297    // Before any response, tokens_before_message should return None for first message
4298    thread.read_with(cx, |thread, _| {
4299        assert_eq!(
4300            thread.tokens_before_message(&message_1_id),
4301            None,
4302            "First message should have no tokens before it"
4303        );
4304    });
4305
4306    // Complete first message with usage
4307    fake_model.send_last_completion_stream_text_chunk("Response 1");
4308    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4309        language_model::TokenUsage {
4310            input_tokens: 100,
4311            output_tokens: 50,
4312            cache_creation_input_tokens: 0,
4313            cache_read_input_tokens: 0,
4314        },
4315    ));
4316    fake_model.end_last_completion_stream();
4317    cx.run_until_parked();
4318
4319    // First message still has no tokens before it
4320    thread.read_with(cx, |thread, _| {
4321        assert_eq!(
4322            thread.tokens_before_message(&message_1_id),
4323            None,
4324            "First message should still have no tokens before it after response"
4325        );
4326    });
4327
4328    // Second message
4329    let message_2_id = UserMessageId::new();
4330    thread
4331        .update(cx, |thread, cx| {
4332            thread.send(message_2_id.clone(), ["Second message"], cx)
4333        })
4334        .unwrap();
4335    cx.run_until_parked();
4336
4337    // Second message should have first message's input tokens before it
4338    thread.read_with(cx, |thread, _| {
4339        assert_eq!(
4340            thread.tokens_before_message(&message_2_id),
4341            Some(100),
4342            "Second message should have 100 tokens before it (from first request)"
4343        );
4344    });
4345
4346    // Complete second message
4347    fake_model.send_last_completion_stream_text_chunk("Response 2");
4348    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4349        language_model::TokenUsage {
4350            input_tokens: 250, // Total for this request (includes previous context)
4351            output_tokens: 75,
4352            cache_creation_input_tokens: 0,
4353            cache_read_input_tokens: 0,
4354        },
4355    ));
4356    fake_model.end_last_completion_stream();
4357    cx.run_until_parked();
4358
4359    // Third message
4360    let message_3_id = UserMessageId::new();
4361    thread
4362        .update(cx, |thread, cx| {
4363            thread.send(message_3_id.clone(), ["Third message"], cx)
4364        })
4365        .unwrap();
4366    cx.run_until_parked();
4367
4368    // Third message should have second message's input tokens (250) before it
4369    thread.read_with(cx, |thread, _| {
4370        assert_eq!(
4371            thread.tokens_before_message(&message_3_id),
4372            Some(250),
4373            "Third message should have 250 tokens before it (from second request)"
4374        );
4375        // Second message should still have 100
4376        assert_eq!(
4377            thread.tokens_before_message(&message_2_id),
4378            Some(100),
4379            "Second message should still have 100 tokens before it"
4380        );
4381        // First message still has none
4382        assert_eq!(
4383            thread.tokens_before_message(&message_1_id),
4384            None,
4385            "First message should still have no tokens before it"
4386        );
4387    });
4388}
4389
4390#[gpui::test]
4391async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4392    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4393    let fake_model = model.as_fake();
4394
4395    // Set up three messages with responses
4396    let message_1_id = UserMessageId::new();
4397    thread
4398        .update(cx, |thread, cx| {
4399            thread.send(message_1_id.clone(), ["Message 1"], cx)
4400        })
4401        .unwrap();
4402    cx.run_until_parked();
4403    fake_model.send_last_completion_stream_text_chunk("Response 1");
4404    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4405        language_model::TokenUsage {
4406            input_tokens: 100,
4407            output_tokens: 50,
4408            cache_creation_input_tokens: 0,
4409            cache_read_input_tokens: 0,
4410        },
4411    ));
4412    fake_model.end_last_completion_stream();
4413    cx.run_until_parked();
4414
4415    let message_2_id = UserMessageId::new();
4416    thread
4417        .update(cx, |thread, cx| {
4418            thread.send(message_2_id.clone(), ["Message 2"], cx)
4419        })
4420        .unwrap();
4421    cx.run_until_parked();
4422    fake_model.send_last_completion_stream_text_chunk("Response 2");
4423    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4424        language_model::TokenUsage {
4425            input_tokens: 250,
4426            output_tokens: 75,
4427            cache_creation_input_tokens: 0,
4428            cache_read_input_tokens: 0,
4429        },
4430    ));
4431    fake_model.end_last_completion_stream();
4432    cx.run_until_parked();
4433
4434    // Verify initial state
4435    thread.read_with(cx, |thread, _| {
4436        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4437    });
4438
4439    // Truncate at message 2 (removes message 2 and everything after)
4440    thread
4441        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4442        .unwrap();
4443    cx.run_until_parked();
4444
4445    // After truncation, message_2_id no longer exists, so lookup should return None
4446    thread.read_with(cx, |thread, _| {
4447        assert_eq!(
4448            thread.tokens_before_message(&message_2_id),
4449            None,
4450            "After truncation, message 2 no longer exists"
4451        );
4452        // Message 1 still exists but has no tokens before it
4453        assert_eq!(
4454            thread.tokens_before_message(&message_1_id),
4455            None,
4456            "First message still has no tokens before it"
4457        );
4458    });
4459}
4460
4461#[gpui::test]
4462async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4463    init_test(cx);
4464
4465    let fs = FakeFs::new(cx.executor());
4466    fs.insert_tree("/root", json!({})).await;
4467    let project = Project::test(fs, ["/root".as_ref()], cx).await;
4468
4469    // Test 1: Deny rule blocks command
4470    {
4471        let environment = Rc::new(cx.update(|cx| {
4472            FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4473        }));
4474
4475        cx.update(|cx| {
4476            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4477            settings.tool_permissions.tools.insert(
4478                TerminalTool::NAME.into(),
4479                agent_settings::ToolRules {
4480                    default: Some(settings::ToolPermissionMode::Confirm),
4481                    always_allow: vec![],
4482                    always_deny: vec![
4483                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4484                    ],
4485                    always_confirm: vec![],
4486                    invalid_patterns: vec![],
4487                },
4488            );
4489            agent_settings::AgentSettings::override_global(settings, cx);
4490        });
4491
4492        #[allow(clippy::arc_with_non_send_sync)]
4493        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4494        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4495
4496        let task = cx.update(|cx| {
4497            tool.run(
4498                ToolInput::resolved(crate::TerminalToolInput {
4499                    command: "rm -rf /".to_string(),
4500                    cd: ".".to_string(),
4501                    timeout_ms: None,
4502                }),
4503                event_stream,
4504                cx,
4505            )
4506        });
4507
4508        let result = task.await;
4509        assert!(
4510            result.is_err(),
4511            "expected command to be blocked by deny rule"
4512        );
4513        let err_msg = result.unwrap_err().to_lowercase();
4514        assert!(
4515            err_msg.contains("blocked"),
4516            "error should mention the command was blocked"
4517        );
4518    }
4519
4520    // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4521    {
4522        let environment = Rc::new(cx.update(|cx| {
4523            FakeThreadEnvironment::default()
4524                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4525        }));
4526
4527        cx.update(|cx| {
4528            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4529            settings.tool_permissions.tools.insert(
4530                TerminalTool::NAME.into(),
4531                agent_settings::ToolRules {
4532                    default: Some(settings::ToolPermissionMode::Deny),
4533                    always_allow: vec![
4534                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4535                    ],
4536                    always_deny: vec![],
4537                    always_confirm: vec![],
4538                    invalid_patterns: vec![],
4539                },
4540            );
4541            agent_settings::AgentSettings::override_global(settings, cx);
4542        });
4543
4544        #[allow(clippy::arc_with_non_send_sync)]
4545        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4546        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4547
4548        let task = cx.update(|cx| {
4549            tool.run(
4550                ToolInput::resolved(crate::TerminalToolInput {
4551                    command: "echo hello".to_string(),
4552                    cd: ".".to_string(),
4553                    timeout_ms: None,
4554                }),
4555                event_stream,
4556                cx,
4557            )
4558        });
4559
4560        let update = rx.expect_update_fields().await;
4561        assert!(
4562            update.content.iter().any(|blocks| {
4563                blocks
4564                    .iter()
4565                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4566            }),
4567            "expected terminal content (allow rule should skip confirmation and override default deny)"
4568        );
4569
4570        let result = task.await;
4571        assert!(
4572            result.is_ok(),
4573            "expected command to succeed without confirmation"
4574        );
4575    }
4576
4577    // Test 3: global default: allow does NOT override always_confirm patterns
4578    {
4579        let environment = Rc::new(cx.update(|cx| {
4580            FakeThreadEnvironment::default()
4581                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4582        }));
4583
4584        cx.update(|cx| {
4585            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4586            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4587            settings.tool_permissions.tools.insert(
4588                TerminalTool::NAME.into(),
4589                agent_settings::ToolRules {
4590                    default: Some(settings::ToolPermissionMode::Allow),
4591                    always_allow: vec![],
4592                    always_deny: vec![],
4593                    always_confirm: vec![
4594                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4595                    ],
4596                    invalid_patterns: vec![],
4597                },
4598            );
4599            agent_settings::AgentSettings::override_global(settings, cx);
4600        });
4601
4602        #[allow(clippy::arc_with_non_send_sync)]
4603        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4604        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4605
4606        let _task = cx.update(|cx| {
4607            tool.run(
4608                ToolInput::resolved(crate::TerminalToolInput {
4609                    command: "sudo rm file".to_string(),
4610                    cd: ".".to_string(),
4611                    timeout_ms: None,
4612                }),
4613                event_stream,
4614                cx,
4615            )
4616        });
4617
4618        // With global default: allow, confirm patterns are still respected
4619        // The expect_authorization() call will panic if no authorization is requested,
4620        // which validates that the confirm pattern still triggers confirmation
4621        let _auth = rx.expect_authorization().await;
4622
4623        drop(_task);
4624    }
4625
4626    // Test 4: tool-specific default: deny is respected even with global default: allow
4627    {
4628        let environment = Rc::new(cx.update(|cx| {
4629            FakeThreadEnvironment::default()
4630                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4631        }));
4632
4633        cx.update(|cx| {
4634            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4635            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4636            settings.tool_permissions.tools.insert(
4637                TerminalTool::NAME.into(),
4638                agent_settings::ToolRules {
4639                    default: Some(settings::ToolPermissionMode::Deny),
4640                    always_allow: vec![],
4641                    always_deny: vec![],
4642                    always_confirm: vec![],
4643                    invalid_patterns: vec![],
4644                },
4645            );
4646            agent_settings::AgentSettings::override_global(settings, cx);
4647        });
4648
4649        #[allow(clippy::arc_with_non_send_sync)]
4650        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4651        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4652
4653        let task = cx.update(|cx| {
4654            tool.run(
4655                ToolInput::resolved(crate::TerminalToolInput {
4656                    command: "echo hello".to_string(),
4657                    cd: ".".to_string(),
4658                    timeout_ms: None,
4659                }),
4660                event_stream,
4661                cx,
4662            )
4663        });
4664
4665        // tool-specific default: deny is respected even with global default: allow
4666        let result = task.await;
4667        assert!(
4668            result.is_err(),
4669            "expected command to be blocked by tool-specific deny default"
4670        );
4671        let err_msg = result.unwrap_err().to_lowercase();
4672        assert!(
4673            err_msg.contains("disabled"),
4674            "error should mention the tool is disabled, got: {err_msg}"
4675        );
4676    }
4677}
4678
4679#[gpui::test]
4680async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4681    init_test(cx);
4682    cx.update(|cx| {
4683        LanguageModelRegistry::test(cx);
4684    });
4685    cx.update(|cx| {
4686        cx.update_flags(true, vec!["subagents".to_string()]);
4687    });
4688
4689    let fs = FakeFs::new(cx.executor());
4690    fs.insert_tree(
4691        "/",
4692        json!({
4693            "a": {
4694                "b.md": "Lorem"
4695            }
4696        }),
4697    )
4698    .await;
4699    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4700    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4701    let agent = cx.update(|cx| {
4702        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4703    });
4704    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4705
4706    let acp_thread = cx
4707        .update(|cx| {
4708            connection
4709                .clone()
4710                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4711        })
4712        .await
4713        .unwrap();
4714    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4715    let thread = agent.read_with(cx, |agent, _| {
4716        agent.sessions.get(&session_id).unwrap().thread.clone()
4717    });
4718    let model = Arc::new(FakeLanguageModel::default());
4719
4720    // Ensure empty threads are not saved, even if they get mutated.
4721    thread.update(cx, |thread, cx| {
4722        thread.set_model(model.clone(), cx);
4723    });
4724    cx.run_until_parked();
4725
4726    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4727    cx.run_until_parked();
4728    model.send_last_completion_stream_text_chunk("spawning subagent");
4729    let subagent_tool_input = SpawnAgentToolInput {
4730        label: "label".to_string(),
4731        message: "subagent task prompt".to_string(),
4732        session_id: None,
4733    };
4734    let subagent_tool_use = LanguageModelToolUse {
4735        id: "subagent_1".into(),
4736        name: SpawnAgentTool::NAME.into(),
4737        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4738        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4739        is_input_complete: true,
4740        thought_signature: None,
4741    };
4742    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4743        subagent_tool_use,
4744    ));
4745    model.end_last_completion_stream();
4746
4747    cx.run_until_parked();
4748
4749    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4750        thread
4751            .running_subagent_ids(cx)
4752            .get(0)
4753            .expect("subagent thread should be running")
4754            .clone()
4755    });
4756
4757    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4758        agent
4759            .sessions
4760            .get(&subagent_session_id)
4761            .expect("subagent session should exist")
4762            .acp_thread
4763            .clone()
4764    });
4765
4766    model.send_last_completion_stream_text_chunk("subagent task response");
4767    model.end_last_completion_stream();
4768
4769    cx.run_until_parked();
4770
4771    assert_eq!(
4772        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4773        indoc! {"
4774            ## User
4775
4776            subagent task prompt
4777
4778            ## Assistant
4779
4780            subagent task response
4781
4782        "}
4783    );
4784
4785    model.send_last_completion_stream_text_chunk("Response");
4786    model.end_last_completion_stream();
4787
4788    send.await.unwrap();
4789
4790    assert_eq!(
4791        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4792        indoc! {r#"
4793            ## User
4794
4795            Prompt
4796
4797            ## Assistant
4798
4799            spawning subagent
4800
4801            **Tool Call: label**
4802            Status: Completed
4803
4804            subagent task response
4805
4806            ## Assistant
4807
4808            Response
4809
4810        "#},
4811    );
4812}
4813
4814#[gpui::test]
4815async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4816    init_test(cx);
4817    cx.update(|cx| {
4818        LanguageModelRegistry::test(cx);
4819    });
4820    cx.update(|cx| {
4821        cx.update_flags(true, vec!["subagents".to_string()]);
4822    });
4823
4824    let fs = FakeFs::new(cx.executor());
4825    fs.insert_tree(
4826        "/",
4827        json!({
4828            "a": {
4829                "b.md": "Lorem"
4830            }
4831        }),
4832    )
4833    .await;
4834    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4835    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4836    let agent = cx.update(|cx| {
4837        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4838    });
4839    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4840
4841    let acp_thread = cx
4842        .update(|cx| {
4843            connection
4844                .clone()
4845                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4846        })
4847        .await
4848        .unwrap();
4849    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4850    let thread = agent.read_with(cx, |agent, _| {
4851        agent.sessions.get(&session_id).unwrap().thread.clone()
4852    });
4853    let model = Arc::new(FakeLanguageModel::default());
4854
4855    // Ensure empty threads are not saved, even if they get mutated.
4856    thread.update(cx, |thread, cx| {
4857        thread.set_model(model.clone(), cx);
4858    });
4859    cx.run_until_parked();
4860
4861    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4862    cx.run_until_parked();
4863    model.send_last_completion_stream_text_chunk("spawning subagent");
4864    let subagent_tool_input = SpawnAgentToolInput {
4865        label: "label".to_string(),
4866        message: "subagent task prompt".to_string(),
4867        session_id: None,
4868    };
4869    let subagent_tool_use = LanguageModelToolUse {
4870        id: "subagent_1".into(),
4871        name: SpawnAgentTool::NAME.into(),
4872        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4873        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4874        is_input_complete: true,
4875        thought_signature: None,
4876    };
4877    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4878        subagent_tool_use,
4879    ));
4880    model.end_last_completion_stream();
4881
4882    cx.run_until_parked();
4883
4884    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4885        thread
4886            .running_subagent_ids(cx)
4887            .get(0)
4888            .expect("subagent thread should be running")
4889            .clone()
4890    });
4891
4892    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4893        agent
4894            .sessions
4895            .get(&subagent_session_id)
4896            .expect("subagent session should exist")
4897            .acp_thread
4898            .clone()
4899    });
4900
4901    model.send_last_completion_stream_text_chunk("subagent task response 1");
4902    model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4903        text: "thinking more about the subagent task".into(),
4904        signature: None,
4905    });
4906    model.send_last_completion_stream_text_chunk("subagent task response 2");
4907    model.end_last_completion_stream();
4908
4909    cx.run_until_parked();
4910
4911    assert_eq!(
4912        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4913        indoc! {"
4914            ## User
4915
4916            subagent task prompt
4917
4918            ## Assistant
4919
4920            subagent task response 1
4921
4922            <thinking>
4923            thinking more about the subagent task
4924            </thinking>
4925
4926            subagent task response 2
4927
4928        "}
4929    );
4930
4931    model.send_last_completion_stream_text_chunk("Response");
4932    model.end_last_completion_stream();
4933
4934    send.await.unwrap();
4935
4936    assert_eq!(
4937        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4938        indoc! {r#"
4939            ## User
4940
4941            Prompt
4942
4943            ## Assistant
4944
4945            spawning subagent
4946
4947            **Tool Call: label**
4948            Status: Completed
4949
4950            subagent task response 1
4951
4952            subagent task response 2
4953
4954            ## Assistant
4955
4956            Response
4957
4958        "#},
4959    );
4960}
4961
4962#[gpui::test]
4963async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4964    init_test(cx);
4965    cx.update(|cx| {
4966        LanguageModelRegistry::test(cx);
4967    });
4968    cx.update(|cx| {
4969        cx.update_flags(true, vec!["subagents".to_string()]);
4970    });
4971
4972    let fs = FakeFs::new(cx.executor());
4973    fs.insert_tree(
4974        "/",
4975        json!({
4976            "a": {
4977                "b.md": "Lorem"
4978            }
4979        }),
4980    )
4981    .await;
4982    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4983    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4984    let agent = cx.update(|cx| {
4985        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4986    });
4987    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4988
4989    let acp_thread = cx
4990        .update(|cx| {
4991            connection
4992                .clone()
4993                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4994        })
4995        .await
4996        .unwrap();
4997    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4998    let thread = agent.read_with(cx, |agent, _| {
4999        agent.sessions.get(&session_id).unwrap().thread.clone()
5000    });
5001    let model = Arc::new(FakeLanguageModel::default());
5002
5003    // Ensure empty threads are not saved, even if they get mutated.
5004    thread.update(cx, |thread, cx| {
5005        thread.set_model(model.clone(), cx);
5006    });
5007    cx.run_until_parked();
5008
5009    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5010    cx.run_until_parked();
5011    model.send_last_completion_stream_text_chunk("spawning subagent");
5012    let subagent_tool_input = SpawnAgentToolInput {
5013        label: "label".to_string(),
5014        message: "subagent task prompt".to_string(),
5015        session_id: None,
5016    };
5017    let subagent_tool_use = LanguageModelToolUse {
5018        id: "subagent_1".into(),
5019        name: SpawnAgentTool::NAME.into(),
5020        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5021        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5022        is_input_complete: true,
5023        thought_signature: None,
5024    };
5025    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5026        subagent_tool_use,
5027    ));
5028    model.end_last_completion_stream();
5029
5030    cx.run_until_parked();
5031
5032    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5033        thread
5034            .running_subagent_ids(cx)
5035            .get(0)
5036            .expect("subagent thread should be running")
5037            .clone()
5038    });
5039    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5040        agent
5041            .sessions
5042            .get(&subagent_session_id)
5043            .expect("subagent session should exist")
5044            .acp_thread
5045            .clone()
5046    });
5047
5048    // model.send_last_completion_stream_text_chunk("subagent task response");
5049    // model.end_last_completion_stream();
5050
5051    // cx.run_until_parked();
5052
5053    acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
5054
5055    cx.run_until_parked();
5056
5057    send.await.unwrap();
5058
5059    acp_thread.read_with(cx, |thread, cx| {
5060        assert_eq!(thread.status(), ThreadStatus::Idle);
5061        assert_eq!(
5062            thread.to_markdown(cx),
5063            indoc! {"
5064                ## User
5065
5066                Prompt
5067
5068                ## Assistant
5069
5070                spawning subagent
5071
5072                **Tool Call: label**
5073                Status: Canceled
5074
5075            "}
5076        );
5077    });
5078    subagent_acp_thread.read_with(cx, |thread, cx| {
5079        assert_eq!(thread.status(), ThreadStatus::Idle);
5080        assert_eq!(
5081            thread.to_markdown(cx),
5082            indoc! {"
5083                ## User
5084
5085                subagent task prompt
5086
5087            "}
5088        );
5089    });
5090}
5091
5092#[gpui::test]
5093async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
5094    init_test(cx);
5095    cx.update(|cx| {
5096        LanguageModelRegistry::test(cx);
5097    });
5098    cx.update(|cx| {
5099        cx.update_flags(true, vec!["subagents".to_string()]);
5100    });
5101
5102    let fs = FakeFs::new(cx.executor());
5103    fs.insert_tree(
5104        "/",
5105        json!({
5106            "a": {
5107                "b.md": "Lorem"
5108            }
5109        }),
5110    )
5111    .await;
5112    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5113    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5114    let agent = cx.update(|cx| {
5115        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5116    });
5117    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5118
5119    let acp_thread = cx
5120        .update(|cx| {
5121            connection
5122                .clone()
5123                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5124        })
5125        .await
5126        .unwrap();
5127    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5128    let thread = agent.read_with(cx, |agent, _| {
5129        agent.sessions.get(&session_id).unwrap().thread.clone()
5130    });
5131    let model = Arc::new(FakeLanguageModel::default());
5132
5133    thread.update(cx, |thread, cx| {
5134        thread.set_model(model.clone(), cx);
5135    });
5136    cx.run_until_parked();
5137
5138    // === First turn: create subagent ===
5139    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5140    cx.run_until_parked();
5141    model.send_last_completion_stream_text_chunk("spawning subagent");
5142    let subagent_tool_input = SpawnAgentToolInput {
5143        label: "initial task".to_string(),
5144        message: "do the first task".to_string(),
5145        session_id: None,
5146    };
5147    let subagent_tool_use = LanguageModelToolUse {
5148        id: "subagent_1".into(),
5149        name: SpawnAgentTool::NAME.into(),
5150        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5151        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5152        is_input_complete: true,
5153        thought_signature: None,
5154    };
5155    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5156        subagent_tool_use,
5157    ));
5158    model.end_last_completion_stream();
5159
5160    cx.run_until_parked();
5161
5162    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5163        thread
5164            .running_subagent_ids(cx)
5165            .get(0)
5166            .expect("subagent thread should be running")
5167            .clone()
5168    });
5169
5170    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5171        agent
5172            .sessions
5173            .get(&subagent_session_id)
5174            .expect("subagent session should exist")
5175            .acp_thread
5176            .clone()
5177    });
5178
5179    // Subagent responds
5180    model.send_last_completion_stream_text_chunk("first task response");
5181    model.end_last_completion_stream();
5182
5183    cx.run_until_parked();
5184
5185    // Parent model responds to complete first turn
5186    model.send_last_completion_stream_text_chunk("First response");
5187    model.end_last_completion_stream();
5188
5189    send.await.unwrap();
5190
5191    // Verify subagent is no longer running
5192    thread.read_with(cx, |thread, cx| {
5193        assert!(
5194            thread.running_subagent_ids(cx).is_empty(),
5195            "subagent should not be running after completion"
5196        );
5197    });
5198
5199    // === Second turn: resume subagent with session_id ===
5200    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5201    cx.run_until_parked();
5202    model.send_last_completion_stream_text_chunk("resuming subagent");
5203    let resume_tool_input = SpawnAgentToolInput {
5204        label: "follow-up task".to_string(),
5205        message: "do the follow-up task".to_string(),
5206        session_id: Some(subagent_session_id.clone()),
5207    };
5208    let resume_tool_use = LanguageModelToolUse {
5209        id: "subagent_2".into(),
5210        name: SpawnAgentTool::NAME.into(),
5211        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5212        input: serde_json::to_value(&resume_tool_input).unwrap(),
5213        is_input_complete: true,
5214        thought_signature: None,
5215    };
5216    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5217    model.end_last_completion_stream();
5218
5219    cx.run_until_parked();
5220
5221    // Subagent should be running again with the same session
5222    thread.read_with(cx, |thread, cx| {
5223        let running = thread.running_subagent_ids(cx);
5224        assert_eq!(running.len(), 1, "subagent should be running");
5225        assert_eq!(running[0], subagent_session_id, "should be same session");
5226    });
5227
5228    // Subagent responds to follow-up
5229    model.send_last_completion_stream_text_chunk("follow-up task response");
5230    model.end_last_completion_stream();
5231
5232    cx.run_until_parked();
5233
5234    // Parent model responds to complete second turn
5235    model.send_last_completion_stream_text_chunk("Second response");
5236    model.end_last_completion_stream();
5237
5238    send2.await.unwrap();
5239
5240    // Verify subagent is no longer running
5241    thread.read_with(cx, |thread, cx| {
5242        assert!(
5243            thread.running_subagent_ids(cx).is_empty(),
5244            "subagent should not be running after resume completion"
5245        );
5246    });
5247
5248    // Verify the subagent's acp thread has both conversation turns
5249    assert_eq!(
5250        subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5251        indoc! {"
5252            ## User
5253
5254            do the first task
5255
5256            ## Assistant
5257
5258            first task response
5259
5260            ## User
5261
5262            do the follow-up task
5263
5264            ## Assistant
5265
5266            follow-up task response
5267
5268        "}
5269    );
5270}
5271
5272#[gpui::test]
5273async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5274    init_test(cx);
5275
5276    cx.update(|cx| {
5277        cx.update_flags(true, vec!["subagents".to_string()]);
5278    });
5279
5280    let fs = FakeFs::new(cx.executor());
5281    fs.insert_tree(path!("/test"), json!({})).await;
5282    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5283    let project_context = cx.new(|_cx| ProjectContext::default());
5284    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5285    let context_server_registry =
5286        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5287    let model = Arc::new(FakeLanguageModel::default());
5288
5289    let parent_thread = cx.new(|cx| {
5290        Thread::new(
5291            project.clone(),
5292            project_context,
5293            context_server_registry,
5294            Templates::new(),
5295            Some(model.clone()),
5296            cx,
5297        )
5298    });
5299
5300    let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5301    subagent_thread.read_with(cx, |subagent_thread, cx| {
5302        assert!(subagent_thread.is_subagent());
5303        assert_eq!(subagent_thread.depth(), 1);
5304        assert_eq!(
5305            subagent_thread.model().map(|model| model.id()),
5306            Some(model.id())
5307        );
5308        assert_eq!(
5309            subagent_thread.parent_thread_id(),
5310            Some(parent_thread.read(cx).id().clone())
5311        );
5312
5313        let request = subagent_thread
5314            .build_completion_request(CompletionIntent::UserPrompt, cx)
5315            .unwrap();
5316        assert_eq!(request.intent, Some(CompletionIntent::Subagent));
5317    });
5318}
5319
5320#[gpui::test]
5321async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5322    init_test(cx);
5323
5324    cx.update(|cx| {
5325        cx.update_flags(true, vec!["subagents".to_string()]);
5326    });
5327
5328    let fs = FakeFs::new(cx.executor());
5329    fs.insert_tree(path!("/test"), json!({})).await;
5330    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5331    let project_context = cx.new(|_cx| ProjectContext::default());
5332    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5333    let context_server_registry =
5334        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5335    let model = Arc::new(FakeLanguageModel::default());
5336    let environment = Rc::new(cx.update(|cx| {
5337        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5338    }));
5339
5340    let deep_parent_thread = cx.new(|cx| {
5341        let mut thread = Thread::new(
5342            project.clone(),
5343            project_context,
5344            context_server_registry,
5345            Templates::new(),
5346            Some(model.clone()),
5347            cx,
5348        );
5349        thread.set_subagent_context(SubagentContext {
5350            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5351            depth: MAX_SUBAGENT_DEPTH - 1,
5352        });
5353        thread
5354    });
5355    let deep_subagent_thread = cx.new(|cx| {
5356        let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5357        thread.add_default_tools(environment, cx);
5358        thread
5359    });
5360
5361    deep_subagent_thread.read_with(cx, |thread, _| {
5362        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5363        assert!(
5364            !thread.has_registered_tool(SpawnAgentTool::NAME),
5365            "subagent tool should not be present at max depth"
5366        );
5367    });
5368}
5369
5370#[gpui::test]
5371async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5372    init_test(cx);
5373
5374    cx.update(|cx| {
5375        cx.update_flags(true, vec!["subagents".to_string()]);
5376    });
5377
5378    let fs = FakeFs::new(cx.executor());
5379    fs.insert_tree(path!("/test"), json!({})).await;
5380    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5381    let project_context = cx.new(|_cx| ProjectContext::default());
5382    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5383    let context_server_registry =
5384        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5385    let model = Arc::new(FakeLanguageModel::default());
5386
5387    let parent = cx.new(|cx| {
5388        Thread::new(
5389            project.clone(),
5390            project_context.clone(),
5391            context_server_registry.clone(),
5392            Templates::new(),
5393            Some(model.clone()),
5394            cx,
5395        )
5396    });
5397
5398    let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5399
5400    parent.update(cx, |thread, _cx| {
5401        thread.register_running_subagent(subagent.downgrade());
5402    });
5403
5404    subagent
5405        .update(cx, |thread, cx| {
5406            thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5407        })
5408        .unwrap();
5409    cx.run_until_parked();
5410
5411    subagent.read_with(cx, |thread, _| {
5412        assert!(!thread.is_turn_complete(), "subagent should be running");
5413    });
5414
5415    parent.update(cx, |thread, cx| {
5416        thread.cancel(cx).detach();
5417    });
5418
5419    subagent.read_with(cx, |thread, _| {
5420        assert!(
5421            thread.is_turn_complete(),
5422            "subagent should be cancelled when parent cancels"
5423        );
5424    });
5425}
5426
5427#[gpui::test]
5428async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5429    init_test(cx);
5430    cx.update(|cx| {
5431        LanguageModelRegistry::test(cx);
5432    });
5433    cx.update(|cx| {
5434        cx.update_flags(true, vec!["subagents".to_string()]);
5435    });
5436
5437    let fs = FakeFs::new(cx.executor());
5438    fs.insert_tree(
5439        "/",
5440        json!({
5441            "a": {
5442                "b.md": "Lorem"
5443            }
5444        }),
5445    )
5446    .await;
5447    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5448    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5449    let agent = cx.update(|cx| {
5450        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5451    });
5452    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5453
5454    let acp_thread = cx
5455        .update(|cx| {
5456            connection
5457                .clone()
5458                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5459        })
5460        .await
5461        .unwrap();
5462    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5463    let thread = agent.read_with(cx, |agent, _| {
5464        agent.sessions.get(&session_id).unwrap().thread.clone()
5465    });
5466    let model = Arc::new(FakeLanguageModel::default());
5467
5468    thread.update(cx, |thread, cx| {
5469        thread.set_model(model.clone(), cx);
5470    });
5471    cx.run_until_parked();
5472
5473    // Start the parent turn
5474    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5475    cx.run_until_parked();
5476    model.send_last_completion_stream_text_chunk("spawning subagent");
5477    let subagent_tool_input = SpawnAgentToolInput {
5478        label: "label".to_string(),
5479        message: "subagent task prompt".to_string(),
5480        session_id: None,
5481    };
5482    let subagent_tool_use = LanguageModelToolUse {
5483        id: "subagent_1".into(),
5484        name: SpawnAgentTool::NAME.into(),
5485        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5486        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5487        is_input_complete: true,
5488        thought_signature: None,
5489    };
5490    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5491        subagent_tool_use,
5492    ));
5493    model.end_last_completion_stream();
5494
5495    cx.run_until_parked();
5496
5497    // Verify subagent is running
5498    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5499        thread
5500            .running_subagent_ids(cx)
5501            .get(0)
5502            .expect("subagent thread should be running")
5503            .clone()
5504    });
5505
5506    // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5507    model.send_last_completion_stream_text_chunk("partial work");
5508    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5509        TokenUsage {
5510            input_tokens: 850_000,
5511            output_tokens: 0,
5512            cache_creation_input_tokens: 0,
5513            cache_read_input_tokens: 0,
5514        },
5515    ));
5516
5517    cx.run_until_parked();
5518
5519    // The subagent should no longer be running
5520    thread.read_with(cx, |thread, cx| {
5521        assert!(
5522            thread.running_subagent_ids(cx).is_empty(),
5523            "subagent should be stopped after context window warning"
5524        );
5525    });
5526
5527    // The parent model should get a new completion request to respond to the tool error
5528    model.send_last_completion_stream_text_chunk("Response after warning");
5529    model.end_last_completion_stream();
5530
5531    send.await.unwrap();
5532
5533    // Verify the parent thread shows the warning error in the tool call
5534    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5535    assert!(
5536        markdown.contains("nearing the end of its context window"),
5537        "tool output should contain context window warning message, got:\n{markdown}"
5538    );
5539    assert!(
5540        markdown.contains("Status: Failed"),
5541        "tool call should have Failed status, got:\n{markdown}"
5542    );
5543
5544    // Verify the subagent session still exists (can be resumed)
5545    agent.read_with(cx, |agent, _cx| {
5546        assert!(
5547            agent.sessions.contains_key(&subagent_session_id),
5548            "subagent session should still exist for potential resume"
5549        );
5550    });
5551}
5552
5553#[gpui::test]
5554async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5555    init_test(cx);
5556    cx.update(|cx| {
5557        LanguageModelRegistry::test(cx);
5558    });
5559    cx.update(|cx| {
5560        cx.update_flags(true, vec!["subagents".to_string()]);
5561    });
5562
5563    let fs = FakeFs::new(cx.executor());
5564    fs.insert_tree(
5565        "/",
5566        json!({
5567            "a": {
5568                "b.md": "Lorem"
5569            }
5570        }),
5571    )
5572    .await;
5573    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5574    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5575    let agent = cx.update(|cx| {
5576        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5577    });
5578    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5579
5580    let acp_thread = cx
5581        .update(|cx| {
5582            connection
5583                .clone()
5584                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5585        })
5586        .await
5587        .unwrap();
5588    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5589    let thread = agent.read_with(cx, |agent, _| {
5590        agent.sessions.get(&session_id).unwrap().thread.clone()
5591    });
5592    let model = Arc::new(FakeLanguageModel::default());
5593
5594    thread.update(cx, |thread, cx| {
5595        thread.set_model(model.clone(), cx);
5596    });
5597    cx.run_until_parked();
5598
5599    // === First turn: create subagent, trigger context window warning ===
5600    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5601    cx.run_until_parked();
5602    model.send_last_completion_stream_text_chunk("spawning subagent");
5603    let subagent_tool_input = SpawnAgentToolInput {
5604        label: "initial task".to_string(),
5605        message: "do the first task".to_string(),
5606        session_id: None,
5607    };
5608    let subagent_tool_use = LanguageModelToolUse {
5609        id: "subagent_1".into(),
5610        name: SpawnAgentTool::NAME.into(),
5611        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5612        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5613        is_input_complete: true,
5614        thought_signature: None,
5615    };
5616    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5617        subagent_tool_use,
5618    ));
5619    model.end_last_completion_stream();
5620
5621    cx.run_until_parked();
5622
5623    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5624        thread
5625            .running_subagent_ids(cx)
5626            .get(0)
5627            .expect("subagent thread should be running")
5628            .clone()
5629    });
5630
5631    // Subagent sends a usage update that crosses the warning threshold.
5632    // This triggers Normal→Warning, stopping the subagent.
5633    model.send_last_completion_stream_text_chunk("partial work");
5634    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5635        TokenUsage {
5636            input_tokens: 850_000,
5637            output_tokens: 0,
5638            cache_creation_input_tokens: 0,
5639            cache_read_input_tokens: 0,
5640        },
5641    ));
5642
5643    cx.run_until_parked();
5644
5645    // Verify the first turn was stopped with a context window warning
5646    thread.read_with(cx, |thread, cx| {
5647        assert!(
5648            thread.running_subagent_ids(cx).is_empty(),
5649            "subagent should be stopped after context window warning"
5650        );
5651    });
5652
5653    // Parent model responds to complete first turn
5654    model.send_last_completion_stream_text_chunk("First response");
5655    model.end_last_completion_stream();
5656
5657    send.await.unwrap();
5658
5659    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5660    assert!(
5661        markdown.contains("nearing the end of its context window"),
5662        "first turn should have context window warning, got:\n{markdown}"
5663    );
5664
5665    // === Second turn: resume the same subagent (now at Warning level) ===
5666    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5667    cx.run_until_parked();
5668    model.send_last_completion_stream_text_chunk("resuming subagent");
5669    let resume_tool_input = SpawnAgentToolInput {
5670        label: "follow-up task".to_string(),
5671        message: "do the follow-up task".to_string(),
5672        session_id: Some(subagent_session_id.clone()),
5673    };
5674    let resume_tool_use = LanguageModelToolUse {
5675        id: "subagent_2".into(),
5676        name: SpawnAgentTool::NAME.into(),
5677        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5678        input: serde_json::to_value(&resume_tool_input).unwrap(),
5679        is_input_complete: true,
5680        thought_signature: None,
5681    };
5682    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5683    model.end_last_completion_stream();
5684
5685    cx.run_until_parked();
5686
5687    // Subagent responds with tokens still at warning level (no worse).
5688    // Since ratio_before_prompt was already Warning, this should NOT
5689    // trigger the context window warning again.
5690    model.send_last_completion_stream_text_chunk("follow-up task response");
5691    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5692        TokenUsage {
5693            input_tokens: 870_000,
5694            output_tokens: 0,
5695            cache_creation_input_tokens: 0,
5696            cache_read_input_tokens: 0,
5697        },
5698    ));
5699    model.end_last_completion_stream();
5700
5701    cx.run_until_parked();
5702
5703    // Parent model responds to complete second turn
5704    model.send_last_completion_stream_text_chunk("Second response");
5705    model.end_last_completion_stream();
5706
5707    send2.await.unwrap();
5708
5709    // The resumed subagent should have completed normally since the ratio
5710    // didn't transition (it was Warning before and stayed at Warning)
5711    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5712    assert!(
5713        markdown.contains("follow-up task response"),
5714        "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5715    );
5716    // The second tool call should NOT have a context window warning
5717    let second_tool_pos = markdown
5718        .find("follow-up task")
5719        .expect("should find follow-up tool call");
5720    let after_second_tool = &markdown[second_tool_pos..];
5721    assert!(
5722        !after_second_tool.contains("nearing the end of its context window"),
5723        "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5724    );
5725}
5726
5727#[gpui::test]
5728async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5729    init_test(cx);
5730    cx.update(|cx| {
5731        LanguageModelRegistry::test(cx);
5732    });
5733    cx.update(|cx| {
5734        cx.update_flags(true, vec!["subagents".to_string()]);
5735    });
5736
5737    let fs = FakeFs::new(cx.executor());
5738    fs.insert_tree(
5739        "/",
5740        json!({
5741            "a": {
5742                "b.md": "Lorem"
5743            }
5744        }),
5745    )
5746    .await;
5747    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5748    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5749    let agent = cx.update(|cx| {
5750        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5751    });
5752    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5753
5754    let acp_thread = cx
5755        .update(|cx| {
5756            connection
5757                .clone()
5758                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5759        })
5760        .await
5761        .unwrap();
5762    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5763    let thread = agent.read_with(cx, |agent, _| {
5764        agent.sessions.get(&session_id).unwrap().thread.clone()
5765    });
5766    let model = Arc::new(FakeLanguageModel::default());
5767
5768    thread.update(cx, |thread, cx| {
5769        thread.set_model(model.clone(), cx);
5770    });
5771    cx.run_until_parked();
5772
5773    // Start the parent turn
5774    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5775    cx.run_until_parked();
5776    model.send_last_completion_stream_text_chunk("spawning subagent");
5777    let subagent_tool_input = SpawnAgentToolInput {
5778        label: "label".to_string(),
5779        message: "subagent task prompt".to_string(),
5780        session_id: None,
5781    };
5782    let subagent_tool_use = LanguageModelToolUse {
5783        id: "subagent_1".into(),
5784        name: SpawnAgentTool::NAME.into(),
5785        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5786        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5787        is_input_complete: true,
5788        thought_signature: None,
5789    };
5790    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5791        subagent_tool_use,
5792    ));
5793    model.end_last_completion_stream();
5794
5795    cx.run_until_parked();
5796
5797    // Verify subagent is running
5798    thread.read_with(cx, |thread, cx| {
5799        assert!(
5800            !thread.running_subagent_ids(cx).is_empty(),
5801            "subagent should be running"
5802        );
5803    });
5804
5805    // The subagent's model returns a non-retryable error
5806    model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5807        tokens: None,
5808    });
5809
5810    cx.run_until_parked();
5811
5812    // The subagent should no longer be running
5813    thread.read_with(cx, |thread, cx| {
5814        assert!(
5815            thread.running_subagent_ids(cx).is_empty(),
5816            "subagent should not be running after error"
5817        );
5818    });
5819
5820    // The parent model should get a new completion request to respond to the tool error
5821    model.send_last_completion_stream_text_chunk("Response after error");
5822    model.end_last_completion_stream();
5823
5824    send.await.unwrap();
5825
5826    // Verify the parent thread shows the error in the tool call
5827    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5828    assert!(
5829        markdown.contains("Status: Failed"),
5830        "tool call should have Failed status after model error, got:\n{markdown}"
5831    );
5832}
5833
5834#[gpui::test]
5835async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5836    init_test(cx);
5837
5838    let fs = FakeFs::new(cx.executor());
5839    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5840        .await;
5841    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5842
5843    cx.update(|cx| {
5844        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5845        settings.tool_permissions.tools.insert(
5846            EditFileTool::NAME.into(),
5847            agent_settings::ToolRules {
5848                default: Some(settings::ToolPermissionMode::Allow),
5849                always_allow: vec![],
5850                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5851                always_confirm: vec![],
5852                invalid_patterns: vec![],
5853            },
5854        );
5855        agent_settings::AgentSettings::override_global(settings, cx);
5856    });
5857
5858    let context_server_registry =
5859        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5860    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5861    let templates = crate::Templates::new();
5862    let thread = cx.new(|cx| {
5863        crate::Thread::new(
5864            project.clone(),
5865            cx.new(|_cx| prompt_store::ProjectContext::default()),
5866            context_server_registry,
5867            templates.clone(),
5868            None,
5869            cx,
5870        )
5871    });
5872
5873    #[allow(clippy::arc_with_non_send_sync)]
5874    let tool = Arc::new(crate::EditFileTool::new(
5875        project.clone(),
5876        thread.downgrade(),
5877        language_registry,
5878        templates,
5879    ));
5880    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5881
5882    let task = cx.update(|cx| {
5883        tool.run(
5884            ToolInput::resolved(crate::EditFileToolInput {
5885                display_description: "Edit sensitive file".to_string(),
5886                path: "root/sensitive_config.txt".into(),
5887                mode: crate::EditFileMode::Edit,
5888            }),
5889            event_stream,
5890            cx,
5891        )
5892    });
5893
5894    let result = task.await;
5895    assert!(result.is_err(), "expected edit to be blocked");
5896    assert!(
5897        result.unwrap_err().to_string().contains("blocked"),
5898        "error should mention the edit was blocked"
5899    );
5900}
5901
5902#[gpui::test]
5903async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5904    init_test(cx);
5905
5906    let fs = FakeFs::new(cx.executor());
5907    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5908        .await;
5909    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5910
5911    cx.update(|cx| {
5912        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5913        settings.tool_permissions.tools.insert(
5914            DeletePathTool::NAME.into(),
5915            agent_settings::ToolRules {
5916                default: Some(settings::ToolPermissionMode::Allow),
5917                always_allow: vec![],
5918                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5919                always_confirm: vec![],
5920                invalid_patterns: vec![],
5921            },
5922        );
5923        agent_settings::AgentSettings::override_global(settings, cx);
5924    });
5925
5926    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5927
5928    #[allow(clippy::arc_with_non_send_sync)]
5929    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5930    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5931
5932    let task = cx.update(|cx| {
5933        tool.run(
5934            ToolInput::resolved(crate::DeletePathToolInput {
5935                path: "root/important_data.txt".to_string(),
5936            }),
5937            event_stream,
5938            cx,
5939        )
5940    });
5941
5942    let result = task.await;
5943    assert!(result.is_err(), "expected deletion to be blocked");
5944    assert!(
5945        result.unwrap_err().contains("blocked"),
5946        "error should mention the deletion was blocked"
5947    );
5948}
5949
5950#[gpui::test]
5951async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5952    init_test(cx);
5953
5954    let fs = FakeFs::new(cx.executor());
5955    fs.insert_tree(
5956        "/root",
5957        json!({
5958            "safe.txt": "content",
5959            "protected": {}
5960        }),
5961    )
5962    .await;
5963    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5964
5965    cx.update(|cx| {
5966        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5967        settings.tool_permissions.tools.insert(
5968            MovePathTool::NAME.into(),
5969            agent_settings::ToolRules {
5970                default: Some(settings::ToolPermissionMode::Allow),
5971                always_allow: vec![],
5972                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5973                always_confirm: vec![],
5974                invalid_patterns: vec![],
5975            },
5976        );
5977        agent_settings::AgentSettings::override_global(settings, cx);
5978    });
5979
5980    #[allow(clippy::arc_with_non_send_sync)]
5981    let tool = Arc::new(crate::MovePathTool::new(project));
5982    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5983
5984    let task = cx.update(|cx| {
5985        tool.run(
5986            ToolInput::resolved(crate::MovePathToolInput {
5987                source_path: "root/safe.txt".to_string(),
5988                destination_path: "root/protected/safe.txt".to_string(),
5989            }),
5990            event_stream,
5991            cx,
5992        )
5993    });
5994
5995    let result = task.await;
5996    assert!(
5997        result.is_err(),
5998        "expected move to be blocked due to destination path"
5999    );
6000    assert!(
6001        result.unwrap_err().contains("blocked"),
6002        "error should mention the move was blocked"
6003    );
6004}
6005
6006#[gpui::test]
6007async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
6008    init_test(cx);
6009
6010    let fs = FakeFs::new(cx.executor());
6011    fs.insert_tree(
6012        "/root",
6013        json!({
6014            "secret.txt": "secret content",
6015            "public": {}
6016        }),
6017    )
6018    .await;
6019    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6020
6021    cx.update(|cx| {
6022        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6023        settings.tool_permissions.tools.insert(
6024            MovePathTool::NAME.into(),
6025            agent_settings::ToolRules {
6026                default: Some(settings::ToolPermissionMode::Allow),
6027                always_allow: vec![],
6028                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
6029                always_confirm: vec![],
6030                invalid_patterns: vec![],
6031            },
6032        );
6033        agent_settings::AgentSettings::override_global(settings, cx);
6034    });
6035
6036    #[allow(clippy::arc_with_non_send_sync)]
6037    let tool = Arc::new(crate::MovePathTool::new(project));
6038    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6039
6040    let task = cx.update(|cx| {
6041        tool.run(
6042            ToolInput::resolved(crate::MovePathToolInput {
6043                source_path: "root/secret.txt".to_string(),
6044                destination_path: "root/public/not_secret.txt".to_string(),
6045            }),
6046            event_stream,
6047            cx,
6048        )
6049    });
6050
6051    let result = task.await;
6052    assert!(
6053        result.is_err(),
6054        "expected move to be blocked due to source path"
6055    );
6056    assert!(
6057        result.unwrap_err().contains("blocked"),
6058        "error should mention the move was blocked"
6059    );
6060}
6061
6062#[gpui::test]
6063async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
6064    init_test(cx);
6065
6066    let fs = FakeFs::new(cx.executor());
6067    fs.insert_tree(
6068        "/root",
6069        json!({
6070            "confidential.txt": "confidential data",
6071            "dest": {}
6072        }),
6073    )
6074    .await;
6075    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6076
6077    cx.update(|cx| {
6078        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6079        settings.tool_permissions.tools.insert(
6080            CopyPathTool::NAME.into(),
6081            agent_settings::ToolRules {
6082                default: Some(settings::ToolPermissionMode::Allow),
6083                always_allow: vec![],
6084                always_deny: vec![
6085                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
6086                ],
6087                always_confirm: vec![],
6088                invalid_patterns: vec![],
6089            },
6090        );
6091        agent_settings::AgentSettings::override_global(settings, cx);
6092    });
6093
6094    #[allow(clippy::arc_with_non_send_sync)]
6095    let tool = Arc::new(crate::CopyPathTool::new(project));
6096    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6097
6098    let task = cx.update(|cx| {
6099        tool.run(
6100            ToolInput::resolved(crate::CopyPathToolInput {
6101                source_path: "root/confidential.txt".to_string(),
6102                destination_path: "root/dest/copy.txt".to_string(),
6103            }),
6104            event_stream,
6105            cx,
6106        )
6107    });
6108
6109    let result = task.await;
6110    assert!(result.is_err(), "expected copy to be blocked");
6111    assert!(
6112        result.unwrap_err().contains("blocked"),
6113        "error should mention the copy was blocked"
6114    );
6115}
6116
6117#[gpui::test]
6118async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
6119    init_test(cx);
6120
6121    let fs = FakeFs::new(cx.executor());
6122    fs.insert_tree(
6123        "/root",
6124        json!({
6125            "normal.txt": "normal content",
6126            "readonly": {
6127                "config.txt": "readonly content"
6128            }
6129        }),
6130    )
6131    .await;
6132    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6133
6134    cx.update(|cx| {
6135        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6136        settings.tool_permissions.tools.insert(
6137            SaveFileTool::NAME.into(),
6138            agent_settings::ToolRules {
6139                default: Some(settings::ToolPermissionMode::Allow),
6140                always_allow: vec![],
6141                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6142                always_confirm: vec![],
6143                invalid_patterns: vec![],
6144            },
6145        );
6146        agent_settings::AgentSettings::override_global(settings, cx);
6147    });
6148
6149    #[allow(clippy::arc_with_non_send_sync)]
6150    let tool = Arc::new(crate::SaveFileTool::new(project));
6151    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6152
6153    let task = cx.update(|cx| {
6154        tool.run(
6155            ToolInput::resolved(crate::SaveFileToolInput {
6156                paths: vec![
6157                    std::path::PathBuf::from("root/normal.txt"),
6158                    std::path::PathBuf::from("root/readonly/config.txt"),
6159                ],
6160            }),
6161            event_stream,
6162            cx,
6163        )
6164    });
6165
6166    let result = task.await;
6167    assert!(
6168        result.is_err(),
6169        "expected save to be blocked due to denied path"
6170    );
6171    assert!(
6172        result.unwrap_err().contains("blocked"),
6173        "error should mention the save was blocked"
6174    );
6175}
6176
6177#[gpui::test]
6178async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6179    init_test(cx);
6180
6181    let fs = FakeFs::new(cx.executor());
6182    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6183        .await;
6184    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6185
6186    cx.update(|cx| {
6187        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6188        settings.tool_permissions.tools.insert(
6189            SaveFileTool::NAME.into(),
6190            agent_settings::ToolRules {
6191                default: Some(settings::ToolPermissionMode::Allow),
6192                always_allow: vec![],
6193                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6194                always_confirm: vec![],
6195                invalid_patterns: vec![],
6196            },
6197        );
6198        agent_settings::AgentSettings::override_global(settings, cx);
6199    });
6200
6201    #[allow(clippy::arc_with_non_send_sync)]
6202    let tool = Arc::new(crate::SaveFileTool::new(project));
6203    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6204
6205    let task = cx.update(|cx| {
6206        tool.run(
6207            ToolInput::resolved(crate::SaveFileToolInput {
6208                paths: vec![std::path::PathBuf::from("root/config.secret")],
6209            }),
6210            event_stream,
6211            cx,
6212        )
6213    });
6214
6215    let result = task.await;
6216    assert!(result.is_err(), "expected save to be blocked");
6217    assert!(
6218        result.unwrap_err().contains("blocked"),
6219        "error should mention the save was blocked"
6220    );
6221}
6222
6223#[gpui::test]
6224async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6225    init_test(cx);
6226
6227    cx.update(|cx| {
6228        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6229        settings.tool_permissions.tools.insert(
6230            WebSearchTool::NAME.into(),
6231            agent_settings::ToolRules {
6232                default: Some(settings::ToolPermissionMode::Allow),
6233                always_allow: vec![],
6234                always_deny: vec![
6235                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6236                ],
6237                always_confirm: vec![],
6238                invalid_patterns: vec![],
6239            },
6240        );
6241        agent_settings::AgentSettings::override_global(settings, cx);
6242    });
6243
6244    #[allow(clippy::arc_with_non_send_sync)]
6245    let tool = Arc::new(crate::WebSearchTool);
6246    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6247
6248    let input: crate::WebSearchToolInput =
6249        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6250
6251    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6252
6253    let result = task.await;
6254    assert!(result.is_err(), "expected search to be blocked");
6255    match result.unwrap_err() {
6256        crate::WebSearchToolOutput::Error { error } => {
6257            assert!(
6258                error.contains("blocked"),
6259                "error should mention the search was blocked"
6260            );
6261        }
6262        other => panic!("expected Error variant, got: {other:?}"),
6263    }
6264}
6265
6266#[gpui::test]
6267async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6268    init_test(cx);
6269
6270    let fs = FakeFs::new(cx.executor());
6271    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6272        .await;
6273    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6274
6275    cx.update(|cx| {
6276        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6277        settings.tool_permissions.tools.insert(
6278            EditFileTool::NAME.into(),
6279            agent_settings::ToolRules {
6280                default: Some(settings::ToolPermissionMode::Confirm),
6281                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6282                always_deny: vec![],
6283                always_confirm: vec![],
6284                invalid_patterns: vec![],
6285            },
6286        );
6287        agent_settings::AgentSettings::override_global(settings, cx);
6288    });
6289
6290    let context_server_registry =
6291        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6292    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6293    let templates = crate::Templates::new();
6294    let thread = cx.new(|cx| {
6295        crate::Thread::new(
6296            project.clone(),
6297            cx.new(|_cx| prompt_store::ProjectContext::default()),
6298            context_server_registry,
6299            templates.clone(),
6300            None,
6301            cx,
6302        )
6303    });
6304
6305    #[allow(clippy::arc_with_non_send_sync)]
6306    let tool = Arc::new(crate::EditFileTool::new(
6307        project,
6308        thread.downgrade(),
6309        language_registry,
6310        templates,
6311    ));
6312    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6313
6314    let _task = cx.update(|cx| {
6315        tool.run(
6316            ToolInput::resolved(crate::EditFileToolInput {
6317                display_description: "Edit README".to_string(),
6318                path: "root/README.md".into(),
6319                mode: crate::EditFileMode::Edit,
6320            }),
6321            event_stream,
6322            cx,
6323        )
6324    });
6325
6326    cx.run_until_parked();
6327
6328    let event = rx.try_recv();
6329    assert!(
6330        !matches!(event, Ok(Ok(ThreadEvent::ToolCallAuthorization(_)))),
6331        "expected no authorization request for allowed .md file"
6332    );
6333}
6334
6335#[gpui::test]
6336async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6337    init_test(cx);
6338
6339    let fs = FakeFs::new(cx.executor());
6340    fs.insert_tree(
6341        "/root",
6342        json!({
6343            ".zed": {
6344                "settings.json": "{}"
6345            },
6346            "README.md": "# Hello"
6347        }),
6348    )
6349    .await;
6350    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6351
6352    cx.update(|cx| {
6353        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6354        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6355        agent_settings::AgentSettings::override_global(settings, cx);
6356    });
6357
6358    let context_server_registry =
6359        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6360    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6361    let templates = crate::Templates::new();
6362    let thread = cx.new(|cx| {
6363        crate::Thread::new(
6364            project.clone(),
6365            cx.new(|_cx| prompt_store::ProjectContext::default()),
6366            context_server_registry,
6367            templates.clone(),
6368            None,
6369            cx,
6370        )
6371    });
6372
6373    #[allow(clippy::arc_with_non_send_sync)]
6374    let tool = Arc::new(crate::EditFileTool::new(
6375        project,
6376        thread.downgrade(),
6377        language_registry,
6378        templates,
6379    ));
6380
6381    // Editing a file inside .zed/ should still prompt even with global default: allow,
6382    // because local settings paths are sensitive and require confirmation regardless.
6383    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6384    let _task = cx.update(|cx| {
6385        tool.run(
6386            ToolInput::resolved(crate::EditFileToolInput {
6387                display_description: "Edit local settings".to_string(),
6388                path: "root/.zed/settings.json".into(),
6389                mode: crate::EditFileMode::Edit,
6390            }),
6391            event_stream,
6392            cx,
6393        )
6394    });
6395
6396    let _update = rx.expect_update_fields().await;
6397    let _auth = rx.expect_authorization().await;
6398}
6399
6400#[gpui::test]
6401async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6402    init_test(cx);
6403
6404    cx.update(|cx| {
6405        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6406        settings.tool_permissions.tools.insert(
6407            FetchTool::NAME.into(),
6408            agent_settings::ToolRules {
6409                default: Some(settings::ToolPermissionMode::Allow),
6410                always_allow: vec![],
6411                always_deny: vec![
6412                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6413                ],
6414                always_confirm: vec![],
6415                invalid_patterns: vec![],
6416            },
6417        );
6418        agent_settings::AgentSettings::override_global(settings, cx);
6419    });
6420
6421    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6422
6423    #[allow(clippy::arc_with_non_send_sync)]
6424    let tool = Arc::new(crate::FetchTool::new(http_client));
6425    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6426
6427    let input: crate::FetchToolInput =
6428        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6429
6430    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6431
6432    let result = task.await;
6433    assert!(result.is_err(), "expected fetch to be blocked");
6434    assert!(
6435        result.unwrap_err().contains("blocked"),
6436        "error should mention the fetch was blocked"
6437    );
6438}
6439
6440#[gpui::test]
6441async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6442    init_test(cx);
6443
6444    cx.update(|cx| {
6445        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6446        settings.tool_permissions.tools.insert(
6447            FetchTool::NAME.into(),
6448            agent_settings::ToolRules {
6449                default: Some(settings::ToolPermissionMode::Confirm),
6450                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6451                always_deny: vec![],
6452                always_confirm: vec![],
6453                invalid_patterns: vec![],
6454            },
6455        );
6456        agent_settings::AgentSettings::override_global(settings, cx);
6457    });
6458
6459    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6460
6461    #[allow(clippy::arc_with_non_send_sync)]
6462    let tool = Arc::new(crate::FetchTool::new(http_client));
6463    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6464
6465    let input: crate::FetchToolInput =
6466        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6467
6468    let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6469
6470    cx.run_until_parked();
6471
6472    let event = rx.try_recv();
6473    assert!(
6474        !matches!(event, Ok(Ok(ThreadEvent::ToolCallAuthorization(_)))),
6475        "expected no authorization request for allowed docs.rs URL"
6476    );
6477}
6478
6479#[gpui::test]
6480async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6481    init_test(cx);
6482    always_allow_tools(cx);
6483
6484    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6485    let fake_model = model.as_fake();
6486
6487    // Add a tool so we can simulate tool calls
6488    thread.update(cx, |thread, _cx| {
6489        thread.add_tool(EchoTool);
6490    });
6491
6492    // Start a turn by sending a message
6493    let mut events = thread
6494        .update(cx, |thread, cx| {
6495            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6496        })
6497        .unwrap();
6498    cx.run_until_parked();
6499
6500    // Simulate the model making a tool call
6501    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6502        LanguageModelToolUse {
6503            id: "tool_1".into(),
6504            name: "echo".into(),
6505            raw_input: r#"{"text": "hello"}"#.into(),
6506            input: json!({"text": "hello"}),
6507            is_input_complete: true,
6508            thought_signature: None,
6509        },
6510    ));
6511    fake_model
6512        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6513
6514    // Signal that a message is queued before ending the stream
6515    thread.update(cx, |thread, _cx| {
6516        thread.set_has_queued_message(true);
6517    });
6518
6519    // Now end the stream - tool will run, and the boundary check should see the queue
6520    fake_model.end_last_completion_stream();
6521
6522    // Collect all events until the turn stops
6523    let all_events = collect_events_until_stop(&mut events, cx).await;
6524
6525    // Verify we received the tool call event
6526    let tool_call_ids: Vec<_> = all_events
6527        .iter()
6528        .filter_map(|e| match e {
6529            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6530            _ => None,
6531        })
6532        .collect();
6533    assert_eq!(
6534        tool_call_ids,
6535        vec!["tool_1"],
6536        "Should have received a tool call event for our echo tool"
6537    );
6538
6539    // The turn should have stopped with EndTurn
6540    let stop_reasons = stop_events(all_events);
6541    assert_eq!(
6542        stop_reasons,
6543        vec![acp::StopReason::EndTurn],
6544        "Turn should have ended after tool completion due to queued message"
6545    );
6546
6547    // Verify the queued message flag is still set
6548    thread.update(cx, |thread, _cx| {
6549        assert!(
6550            thread.has_queued_message(),
6551            "Should still have queued message flag set"
6552        );
6553    });
6554
6555    // Thread should be idle now
6556    thread.update(cx, |thread, _cx| {
6557        assert!(
6558            thread.is_turn_complete(),
6559            "Thread should not be running after turn ends"
6560        );
6561    });
6562}
6563
6564#[gpui::test]
6565async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6566    init_test(cx);
6567    always_allow_tools(cx);
6568
6569    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6570    let fake_model = model.as_fake();
6571
6572    thread.update(cx, |thread, _cx| {
6573        thread.add_tool(StreamingFailingEchoTool {
6574            receive_chunks_until_failure: 1,
6575        });
6576    });
6577
6578    let _events = thread
6579        .update(cx, |thread, cx| {
6580            thread.send(
6581                UserMessageId::new(),
6582                ["Use the streaming_failing_echo tool"],
6583                cx,
6584            )
6585        })
6586        .unwrap();
6587    cx.run_until_parked();
6588
6589    let tool_use = LanguageModelToolUse {
6590        id: "call_1".into(),
6591        name: StreamingFailingEchoTool::NAME.into(),
6592        raw_input: "hello".into(),
6593        input: json!({}),
6594        is_input_complete: false,
6595        thought_signature: None,
6596    };
6597
6598    fake_model
6599        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6600
6601    cx.run_until_parked();
6602
6603    let completions = fake_model.pending_completions();
6604    let last_completion = completions.last().unwrap();
6605
6606    assert_eq!(
6607        last_completion.messages[1..],
6608        vec![
6609            LanguageModelRequestMessage {
6610                role: Role::User,
6611                content: vec!["Use the streaming_failing_echo tool".into()],
6612                cache: false,
6613                reasoning_details: None,
6614            },
6615            LanguageModelRequestMessage {
6616                role: Role::Assistant,
6617                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6618                cache: false,
6619                reasoning_details: None,
6620            },
6621            LanguageModelRequestMessage {
6622                role: Role::User,
6623                content: vec![language_model::MessageContent::ToolResult(
6624                    LanguageModelToolResult {
6625                        tool_use_id: tool_use.id.clone(),
6626                        tool_name: tool_use.name,
6627                        is_error: true,
6628                        content: "failed".into(),
6629                        output: Some("failed".into()),
6630                    }
6631                )],
6632                cache: true,
6633                reasoning_details: None,
6634            },
6635        ]
6636    );
6637}
6638
6639#[gpui::test]
6640async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6641    init_test(cx);
6642    always_allow_tools(cx);
6643
6644    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6645    let fake_model = model.as_fake();
6646
6647    let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6648        oneshot::channel();
6649
6650    thread.update(cx, |thread, _cx| {
6651        thread.add_tool(
6652            StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6653        );
6654        thread.add_tool(StreamingFailingEchoTool {
6655            receive_chunks_until_failure: 1,
6656        });
6657    });
6658
6659    let _events = thread
6660        .update(cx, |thread, cx| {
6661            thread.send(
6662                UserMessageId::new(),
6663                ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6664                cx,
6665            )
6666        })
6667        .unwrap();
6668    cx.run_until_parked();
6669
6670    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6671        LanguageModelToolUse {
6672            id: "call_1".into(),
6673            name: StreamingEchoTool::NAME.into(),
6674            raw_input: "hello".into(),
6675            input: json!({ "text": "hello" }),
6676            is_input_complete: false,
6677            thought_signature: None,
6678        },
6679    ));
6680    let first_tool_use = LanguageModelToolUse {
6681        id: "call_1".into(),
6682        name: StreamingEchoTool::NAME.into(),
6683        raw_input: "hello world".into(),
6684        input: json!({ "text": "hello world" }),
6685        is_input_complete: true,
6686        thought_signature: None,
6687    };
6688    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6689        first_tool_use.clone(),
6690    ));
6691    let second_tool_use = LanguageModelToolUse {
6692        name: StreamingFailingEchoTool::NAME.into(),
6693        raw_input: "hello".into(),
6694        input: json!({ "text": "hello" }),
6695        is_input_complete: false,
6696        thought_signature: None,
6697        id: "call_2".into(),
6698    };
6699    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6700        second_tool_use.clone(),
6701    ));
6702
6703    cx.run_until_parked();
6704
6705    complete_streaming_echo_tool_call_tx.send(()).unwrap();
6706
6707    cx.run_until_parked();
6708
6709    let completions = fake_model.pending_completions();
6710    let last_completion = completions.last().unwrap();
6711
6712    assert_eq!(
6713        last_completion.messages[1..],
6714        vec![
6715            LanguageModelRequestMessage {
6716                role: Role::User,
6717                content: vec![
6718                    "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6719                ],
6720                cache: false,
6721                reasoning_details: None,
6722            },
6723            LanguageModelRequestMessage {
6724                role: Role::Assistant,
6725                content: vec![
6726                    language_model::MessageContent::ToolUse(first_tool_use.clone()),
6727                    language_model::MessageContent::ToolUse(second_tool_use.clone())
6728                ],
6729                cache: false,
6730                reasoning_details: None,
6731            },
6732            LanguageModelRequestMessage {
6733                role: Role::User,
6734                content: vec![
6735                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6736                        tool_use_id: second_tool_use.id.clone(),
6737                        tool_name: second_tool_use.name,
6738                        is_error: true,
6739                        content: "failed".into(),
6740                        output: Some("failed".into()),
6741                    }),
6742                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6743                        tool_use_id: first_tool_use.id.clone(),
6744                        tool_name: first_tool_use.name,
6745                        is_error: false,
6746                        content: "hello world".into(),
6747                        output: Some("hello world".into()),
6748                    }),
6749                ],
6750                cache: true,
6751                reasoning_details: None,
6752            },
6753        ]
6754    );
6755}
6756
6757#[gpui::test]
6758async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6759    let ThreadTest {
6760        model, thread, fs, ..
6761    } = setup(cx, TestModel::Fake).await;
6762    let fake_model_a = model.as_fake();
6763
6764    thread.update(cx, |thread, _cx| {
6765        thread.add_tool(EchoTool);
6766        thread.add_tool(DelayTool);
6767    });
6768
6769    // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6770    fs.insert_file(
6771        paths::settings_file(),
6772        json!({
6773            "agent": {
6774                "profiles": {
6775                    "profile-a": {
6776                        "name": "Profile A",
6777                        "tools": {
6778                            EchoTool::NAME: true,
6779                            DelayTool::NAME: true,
6780                        }
6781                    },
6782                    "profile-b": {
6783                        "name": "Profile B",
6784                        "tools": {
6785                            DelayTool::NAME: true,
6786                        }
6787                    }
6788                }
6789            }
6790        })
6791        .to_string()
6792        .into_bytes(),
6793    )
6794    .await;
6795    cx.run_until_parked();
6796
6797    thread.update(cx, |thread, cx| {
6798        thread.set_profile(AgentProfileId("profile-a".into()), cx);
6799        thread.set_thinking_enabled(false, cx);
6800    });
6801
6802    // Send a message — first iteration starts with model A, profile-a, thinking off.
6803    thread
6804        .update(cx, |thread, cx| {
6805            thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6806        })
6807        .unwrap();
6808    cx.run_until_parked();
6809
6810    // Verify first request has both tools and thinking disabled.
6811    let completions = fake_model_a.pending_completions();
6812    assert_eq!(completions.len(), 1);
6813    let first_tools = tool_names_for_completion(&completions[0]);
6814    assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6815    assert!(!completions[0].thinking_allowed);
6816
6817    // Model A responds with an echo tool call.
6818    fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6819        LanguageModelToolUse {
6820            id: "tool_1".into(),
6821            name: "echo".into(),
6822            raw_input: r#"{"text":"hello"}"#.into(),
6823            input: json!({"text": "hello"}),
6824            is_input_complete: true,
6825            thought_signature: None,
6826        },
6827    ));
6828    fake_model_a.end_last_completion_stream();
6829
6830    // Before the next iteration runs, switch to profile-b (only DelayTool),
6831    // swap in a new model, and enable thinking.
6832    let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6833        "test-provider",
6834        "model-b",
6835        "Model B",
6836        true,
6837    ));
6838    thread.update(cx, |thread, cx| {
6839        thread.set_profile(AgentProfileId("profile-b".into()), cx);
6840        thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6841        thread.set_thinking_enabled(true, cx);
6842    });
6843
6844    // Run until parked — processes the echo tool call, loops back, picks up
6845    // the new model/profile/thinking, and makes a second request to model B.
6846    cx.run_until_parked();
6847
6848    // The second request should have gone to model B.
6849    let model_b_completions = fake_model_b.pending_completions();
6850    assert_eq!(
6851        model_b_completions.len(),
6852        1,
6853        "second request should go to model B"
6854    );
6855
6856    // Profile-b only has DelayTool, so echo should be gone.
6857    let second_tools = tool_names_for_completion(&model_b_completions[0]);
6858    assert_eq!(second_tools, vec![DelayTool::NAME]);
6859
6860    // Thinking should now be enabled.
6861    assert!(model_b_completions[0].thinking_allowed);
6862}