mod.rs

   1use super::*;
   2use acp_thread::{
   3    AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
   4    UserMessageId,
   5};
   6use agent_client_protocol::{self as acp};
   7use agent_settings::AgentProfileId;
   8use anyhow::Result;
   9use client::{Client, UserStore};
  10use cloud_llm_client::CompletionIntent;
  11use collections::IndexMap;
  12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  13use feature_flags::FeatureFlagAppExt as _;
  14use fs::{FakeFs, Fs};
  15use futures::{
  16    FutureExt as _, StreamExt,
  17    channel::{
  18        mpsc::{self, UnboundedReceiver},
  19        oneshot,
  20    },
  21    future::{Fuse, Shared},
  22};
  23use gpui::{
  24    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  25    http_client::FakeHttpClient,
  26};
  27use indoc::indoc;
  28use language_model::{
  29    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  30    LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  31    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  32    LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
  33    fake_provider::FakeLanguageModel,
  34};
  35use pretty_assertions::assert_eq;
  36use project::{
  37    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  38};
  39use prompt_store::ProjectContext;
  40use reqwest_client::ReqwestClient;
  41use schemars::JsonSchema;
  42use serde::{Deserialize, Serialize};
  43use serde_json::json;
  44use settings::{Settings, SettingsStore};
  45use std::{
  46    path::Path,
  47    pin::Pin,
  48    rc::Rc,
  49    sync::{
  50        Arc,
  51        atomic::{AtomicBool, AtomicUsize, Ordering},
  52    },
  53    time::Duration,
  54};
  55use util::path;
  56
  57mod edit_file_thread_test;
  58mod test_tools;
  59use test_tools::*;
  60
  61pub(crate) fn init_test(cx: &mut TestAppContext) {
  62    cx.update(|cx| {
  63        let settings_store = SettingsStore::test(cx);
  64        cx.set_global(settings_store);
  65    });
  66}
  67
  68pub(crate) struct FakeTerminalHandle {
  69    killed: Arc<AtomicBool>,
  70    stopped_by_user: Arc<AtomicBool>,
  71    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  72    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  73    output: acp::TerminalOutputResponse,
  74    id: acp::TerminalId,
  75}
  76
  77impl FakeTerminalHandle {
  78    pub(crate) fn new_never_exits(cx: &mut App) -> Self {
  79        let killed = Arc::new(AtomicBool::new(false));
  80        let stopped_by_user = Arc::new(AtomicBool::new(false));
  81
  82        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  83
  84        let wait_for_exit = cx
  85            .spawn(async move |_cx| {
  86                // Wait for the exit signal (sent when kill() is called)
  87                let _ = exit_receiver.await;
  88                acp::TerminalExitStatus::new()
  89            })
  90            .shared();
  91
  92        Self {
  93            killed,
  94            stopped_by_user,
  95            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  96            wait_for_exit,
  97            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  98            id: acp::TerminalId::new("fake_terminal".to_string()),
  99        }
 100    }
 101
 102    pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
 103        let killed = Arc::new(AtomicBool::new(false));
 104        let stopped_by_user = Arc::new(AtomicBool::new(false));
 105        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 106
 107        let wait_for_exit = cx
 108            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 109            .shared();
 110
 111        Self {
 112            killed,
 113            stopped_by_user,
 114            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 115            wait_for_exit,
 116            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 117            id: acp::TerminalId::new("fake_terminal".to_string()),
 118        }
 119    }
 120
 121    pub(crate) fn was_killed(&self) -> bool {
 122        self.killed.load(Ordering::SeqCst)
 123    }
 124
 125    pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
 126        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 127    }
 128
 129    pub(crate) fn signal_exit(&self) {
 130        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 131            let _ = sender.send(());
 132        }
 133    }
 134}
 135
 136impl crate::TerminalHandle for FakeTerminalHandle {
 137    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 138        Ok(self.id.clone())
 139    }
 140
 141    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 142        Ok(self.output.clone())
 143    }
 144
 145    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 146        Ok(self.wait_for_exit.clone())
 147    }
 148
 149    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 150        self.killed.store(true, Ordering::SeqCst);
 151        self.signal_exit();
 152        Ok(())
 153    }
 154
 155    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 156        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 157    }
 158}
 159
 160struct FakeSubagentHandle {
 161    session_id: acp::SessionId,
 162    send_task: Shared<Task<String>>,
 163}
 164
 165impl SubagentHandle for FakeSubagentHandle {
 166    fn id(&self) -> acp::SessionId {
 167        self.session_id.clone()
 168    }
 169
 170    fn num_entries(&self, _cx: &App) -> usize {
 171        unimplemented!()
 172    }
 173
 174    fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
 175        let task = self.send_task.clone();
 176        cx.background_spawn(async move { Ok(task.await) })
 177    }
 178}
 179
 180#[derive(Default)]
 181pub(crate) struct FakeThreadEnvironment {
 182    terminal_handle: Option<Rc<FakeTerminalHandle>>,
 183    subagent_handle: Option<Rc<FakeSubagentHandle>>,
 184    terminal_creations: Arc<AtomicUsize>,
 185}
 186
 187impl FakeThreadEnvironment {
 188    pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
 189        Self {
 190            terminal_handle: Some(terminal_handle.into()),
 191            ..self
 192        }
 193    }
 194
 195    pub(crate) fn terminal_creation_count(&self) -> usize {
 196        self.terminal_creations.load(Ordering::SeqCst)
 197    }
 198}
 199
 200impl crate::ThreadEnvironment for FakeThreadEnvironment {
 201    fn create_terminal(
 202        &self,
 203        _command: String,
 204        _cwd: Option<std::path::PathBuf>,
 205        _output_byte_limit: Option<u64>,
 206        _cx: &mut AsyncApp,
 207    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 208        self.terminal_creations.fetch_add(1, Ordering::SeqCst);
 209        let handle = self
 210            .terminal_handle
 211            .clone()
 212            .expect("Terminal handle not available on FakeThreadEnvironment");
 213        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 214    }
 215
 216    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 217        Ok(self
 218            .subagent_handle
 219            .clone()
 220            .expect("Subagent handle not available on FakeThreadEnvironment")
 221            as Rc<dyn SubagentHandle>)
 222    }
 223}
 224
 225/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 226struct MultiTerminalEnvironment {
 227    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 228}
 229
 230impl MultiTerminalEnvironment {
 231    fn new() -> Self {
 232        Self {
 233            handles: std::cell::RefCell::new(Vec::new()),
 234        }
 235    }
 236
 237    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 238        self.handles.borrow().clone()
 239    }
 240}
 241
 242impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 243    fn create_terminal(
 244        &self,
 245        _command: String,
 246        _cwd: Option<std::path::PathBuf>,
 247        _output_byte_limit: Option<u64>,
 248        cx: &mut AsyncApp,
 249    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 250        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 251        self.handles.borrow_mut().push(handle.clone());
 252        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 253    }
 254
 255    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 256        unimplemented!()
 257    }
 258}
 259
 260fn always_allow_tools(cx: &mut TestAppContext) {
 261    cx.update(|cx| {
 262        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 263        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
 264        agent_settings::AgentSettings::override_global(settings, cx);
 265    });
 266}
 267
 268#[gpui::test]
 269async fn test_echo(cx: &mut TestAppContext) {
 270    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 271    let fake_model = model.as_fake();
 272
 273    let events = thread
 274        .update(cx, |thread, cx| {
 275            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 276        })
 277        .unwrap();
 278    cx.run_until_parked();
 279    fake_model.send_last_completion_stream_text_chunk("Hello");
 280    fake_model
 281        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 282    fake_model.end_last_completion_stream();
 283
 284    let events = events.collect().await;
 285    thread.update(cx, |thread, _cx| {
 286        assert_eq!(
 287            thread.last_received_or_pending_message().unwrap().role(),
 288            Role::Assistant
 289        );
 290        assert_eq!(
 291            thread
 292                .last_received_or_pending_message()
 293                .unwrap()
 294                .to_markdown(),
 295            "Hello\n"
 296        )
 297    });
 298    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 299}
 300
 301#[gpui::test]
 302async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 303    init_test(cx);
 304    always_allow_tools(cx);
 305
 306    let fs = FakeFs::new(cx.executor());
 307    let project = Project::test(fs, [], cx).await;
 308
 309    let environment = Rc::new(cx.update(|cx| {
 310        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 311    }));
 312    let handle = environment.terminal_handle.clone().unwrap();
 313
 314    #[allow(clippy::arc_with_non_send_sync)]
 315    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 316    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 317
 318    let task = cx.update(|cx| {
 319        tool.run(
 320            ToolInput::resolved(crate::TerminalToolInput {
 321                command: "sleep 1000".to_string(),
 322                cd: ".".to_string(),
 323                timeout_ms: Some(5),
 324            }),
 325            event_stream,
 326            cx,
 327        )
 328    });
 329
 330    let update = rx.expect_update_fields().await;
 331    assert!(
 332        update.content.iter().any(|blocks| {
 333            blocks
 334                .iter()
 335                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 336        }),
 337        "expected tool call update to include terminal content"
 338    );
 339
 340    let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
 341
 342    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 343    loop {
 344        if let Some(result) = task_future.as_mut().now_or_never() {
 345            let result = result.expect("terminal tool task should complete");
 346
 347            assert!(
 348                handle.was_killed(),
 349                "expected terminal handle to be killed on timeout"
 350            );
 351            assert!(
 352                result.contains("partial output"),
 353                "expected result to include terminal output, got: {result}"
 354            );
 355            return;
 356        }
 357
 358        if std::time::Instant::now() >= deadline {
 359            panic!("timed out waiting for terminal tool task to complete");
 360        }
 361
 362        cx.run_until_parked();
 363        cx.background_executor.timer(Duration::from_millis(1)).await;
 364    }
 365}
 366
 367#[gpui::test]
 368#[ignore]
 369async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 370    init_test(cx);
 371    always_allow_tools(cx);
 372
 373    let fs = FakeFs::new(cx.executor());
 374    let project = Project::test(fs, [], cx).await;
 375
 376    let environment = Rc::new(cx.update(|cx| {
 377        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 378    }));
 379    let handle = environment.terminal_handle.clone().unwrap();
 380
 381    #[allow(clippy::arc_with_non_send_sync)]
 382    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 383    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 384
 385    let _task = cx.update(|cx| {
 386        tool.run(
 387            ToolInput::resolved(crate::TerminalToolInput {
 388                command: "sleep 1000".to_string(),
 389                cd: ".".to_string(),
 390                timeout_ms: None,
 391            }),
 392            event_stream,
 393            cx,
 394        )
 395    });
 396
 397    let update = rx.expect_update_fields().await;
 398    assert!(
 399        update.content.iter().any(|blocks| {
 400            blocks
 401                .iter()
 402                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 403        }),
 404        "expected tool call update to include terminal content"
 405    );
 406
 407    cx.background_executor
 408        .timer(Duration::from_millis(25))
 409        .await;
 410
 411    assert!(
 412        !handle.was_killed(),
 413        "did not expect terminal handle to be killed without a timeout"
 414    );
 415}
 416
 417#[gpui::test]
 418async fn test_thinking(cx: &mut TestAppContext) {
 419    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 420    let fake_model = model.as_fake();
 421
 422    let events = thread
 423        .update(cx, |thread, cx| {
 424            thread.send(
 425                UserMessageId::new(),
 426                [indoc! {"
 427                    Testing:
 428
 429                    Generate a thinking step where you just think the word 'Think',
 430                    and have your final answer be 'Hello'
 431                "}],
 432                cx,
 433            )
 434        })
 435        .unwrap();
 436    cx.run_until_parked();
 437    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 438        text: "Think".to_string(),
 439        signature: None,
 440    });
 441    fake_model.send_last_completion_stream_text_chunk("Hello");
 442    fake_model
 443        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 444    fake_model.end_last_completion_stream();
 445
 446    let events = events.collect().await;
 447    thread.update(cx, |thread, _cx| {
 448        assert_eq!(
 449            thread.last_received_or_pending_message().unwrap().role(),
 450            Role::Assistant
 451        );
 452        assert_eq!(
 453            thread
 454                .last_received_or_pending_message()
 455                .unwrap()
 456                .to_markdown(),
 457            indoc! {"
 458                <think>Think</think>
 459                Hello
 460            "}
 461        )
 462    });
 463    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 464}
 465
 466#[gpui::test]
 467async fn test_system_prompt(cx: &mut TestAppContext) {
 468    let ThreadTest {
 469        model,
 470        thread,
 471        project_context,
 472        ..
 473    } = setup(cx, TestModel::Fake).await;
 474    let fake_model = model.as_fake();
 475
 476    project_context.update(cx, |project_context, _cx| {
 477        project_context.shell = "test-shell".into()
 478    });
 479    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 480    thread
 481        .update(cx, |thread, cx| {
 482            thread.send(UserMessageId::new(), ["abc"], cx)
 483        })
 484        .unwrap();
 485    cx.run_until_parked();
 486    let mut pending_completions = fake_model.pending_completions();
 487    assert_eq!(
 488        pending_completions.len(),
 489        1,
 490        "unexpected pending completions: {:?}",
 491        pending_completions
 492    );
 493
 494    let pending_completion = pending_completions.pop().unwrap();
 495    assert_eq!(pending_completion.messages[0].role, Role::System);
 496
 497    let system_message = &pending_completion.messages[0];
 498    let system_prompt = system_message.content[0].to_str().unwrap();
 499    assert!(
 500        system_prompt.contains("test-shell"),
 501        "unexpected system message: {:?}",
 502        system_message
 503    );
 504    assert!(
 505        system_prompt.contains("## Fixing Diagnostics"),
 506        "unexpected system message: {:?}",
 507        system_message
 508    );
 509}
 510
 511#[gpui::test]
 512async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 513    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 514    let fake_model = model.as_fake();
 515
 516    thread
 517        .update(cx, |thread, cx| {
 518            thread.send(UserMessageId::new(), ["abc"], cx)
 519        })
 520        .unwrap();
 521    cx.run_until_parked();
 522    let mut pending_completions = fake_model.pending_completions();
 523    assert_eq!(
 524        pending_completions.len(),
 525        1,
 526        "unexpected pending completions: {:?}",
 527        pending_completions
 528    );
 529
 530    let pending_completion = pending_completions.pop().unwrap();
 531    assert_eq!(pending_completion.messages[0].role, Role::System);
 532
 533    let system_message = &pending_completion.messages[0];
 534    let system_prompt = system_message.content[0].to_str().unwrap();
 535    assert!(
 536        !system_prompt.contains("## Tool Use"),
 537        "unexpected system message: {:?}",
 538        system_message
 539    );
 540    assert!(
 541        !system_prompt.contains("## Fixing Diagnostics"),
 542        "unexpected system message: {:?}",
 543        system_message
 544    );
 545}
 546
 547#[gpui::test]
 548async fn test_prompt_caching(cx: &mut TestAppContext) {
 549    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 550    let fake_model = model.as_fake();
 551
 552    // Send initial user message and verify it's cached
 553    thread
 554        .update(cx, |thread, cx| {
 555            thread.send(UserMessageId::new(), ["Message 1"], cx)
 556        })
 557        .unwrap();
 558    cx.run_until_parked();
 559
 560    let completion = fake_model.pending_completions().pop().unwrap();
 561    assert_eq!(
 562        completion.messages[1..],
 563        vec![LanguageModelRequestMessage {
 564            role: Role::User,
 565            content: vec!["Message 1".into()],
 566            cache: true,
 567            reasoning_details: None,
 568        }]
 569    );
 570    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 571        "Response to Message 1".into(),
 572    ));
 573    fake_model.end_last_completion_stream();
 574    cx.run_until_parked();
 575
 576    // Send another user message and verify only the latest is cached
 577    thread
 578        .update(cx, |thread, cx| {
 579            thread.send(UserMessageId::new(), ["Message 2"], cx)
 580        })
 581        .unwrap();
 582    cx.run_until_parked();
 583
 584    let completion = fake_model.pending_completions().pop().unwrap();
 585    assert_eq!(
 586        completion.messages[1..],
 587        vec![
 588            LanguageModelRequestMessage {
 589                role: Role::User,
 590                content: vec!["Message 1".into()],
 591                cache: false,
 592                reasoning_details: None,
 593            },
 594            LanguageModelRequestMessage {
 595                role: Role::Assistant,
 596                content: vec!["Response to Message 1".into()],
 597                cache: false,
 598                reasoning_details: None,
 599            },
 600            LanguageModelRequestMessage {
 601                role: Role::User,
 602                content: vec!["Message 2".into()],
 603                cache: true,
 604                reasoning_details: None,
 605            }
 606        ]
 607    );
 608    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 609        "Response to Message 2".into(),
 610    ));
 611    fake_model.end_last_completion_stream();
 612    cx.run_until_parked();
 613
 614    // Simulate a tool call and verify that the latest tool result is cached
 615    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 616    thread
 617        .update(cx, |thread, cx| {
 618            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 619        })
 620        .unwrap();
 621    cx.run_until_parked();
 622
 623    let tool_use = LanguageModelToolUse {
 624        id: "tool_1".into(),
 625        name: EchoTool::NAME.into(),
 626        raw_input: json!({"text": "test"}).to_string(),
 627        input: json!({"text": "test"}),
 628        is_input_complete: true,
 629        thought_signature: None,
 630    };
 631    fake_model
 632        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 633    fake_model.end_last_completion_stream();
 634    cx.run_until_parked();
 635
 636    let completion = fake_model.pending_completions().pop().unwrap();
 637    let tool_result = LanguageModelToolResult {
 638        tool_use_id: "tool_1".into(),
 639        tool_name: EchoTool::NAME.into(),
 640        is_error: false,
 641        content: "test".into(),
 642        output: Some("test".into()),
 643    };
 644    assert_eq!(
 645        completion.messages[1..],
 646        vec![
 647            LanguageModelRequestMessage {
 648                role: Role::User,
 649                content: vec!["Message 1".into()],
 650                cache: false,
 651                reasoning_details: None,
 652            },
 653            LanguageModelRequestMessage {
 654                role: Role::Assistant,
 655                content: vec!["Response to Message 1".into()],
 656                cache: false,
 657                reasoning_details: None,
 658            },
 659            LanguageModelRequestMessage {
 660                role: Role::User,
 661                content: vec!["Message 2".into()],
 662                cache: false,
 663                reasoning_details: None,
 664            },
 665            LanguageModelRequestMessage {
 666                role: Role::Assistant,
 667                content: vec!["Response to Message 2".into()],
 668                cache: false,
 669                reasoning_details: None,
 670            },
 671            LanguageModelRequestMessage {
 672                role: Role::User,
 673                content: vec!["Use the echo tool".into()],
 674                cache: false,
 675                reasoning_details: None,
 676            },
 677            LanguageModelRequestMessage {
 678                role: Role::Assistant,
 679                content: vec![MessageContent::ToolUse(tool_use)],
 680                cache: false,
 681                reasoning_details: None,
 682            },
 683            LanguageModelRequestMessage {
 684                role: Role::User,
 685                content: vec![MessageContent::ToolResult(tool_result)],
 686                cache: true,
 687                reasoning_details: None,
 688            }
 689        ]
 690    );
 691}
 692
 693#[gpui::test]
 694#[cfg_attr(not(feature = "e2e"), ignore)]
 695async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 696    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 697
 698    // Test a tool call that's likely to complete *before* streaming stops.
 699    let events = thread
 700        .update(cx, |thread, cx| {
 701            thread.add_tool(EchoTool);
 702            thread.send(
 703                UserMessageId::new(),
 704                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 705                cx,
 706            )
 707        })
 708        .unwrap()
 709        .collect()
 710        .await;
 711    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 712
 713    // Test a tool calls that's likely to complete *after* streaming stops.
 714    let events = thread
 715        .update(cx, |thread, cx| {
 716            thread.remove_tool(&EchoTool::NAME);
 717            thread.add_tool(DelayTool);
 718            thread.send(
 719                UserMessageId::new(),
 720                [
 721                    "Now call the delay tool with 200ms.",
 722                    "When the timer goes off, then you echo the output of the tool.",
 723                ],
 724                cx,
 725            )
 726        })
 727        .unwrap()
 728        .collect()
 729        .await;
 730    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 731    thread.update(cx, |thread, _cx| {
 732        assert!(
 733            thread
 734                .last_received_or_pending_message()
 735                .unwrap()
 736                .as_agent_message()
 737                .unwrap()
 738                .content
 739                .iter()
 740                .any(|content| {
 741                    if let AgentMessageContent::Text(text) = content {
 742                        text.contains("Ding")
 743                    } else {
 744                        false
 745                    }
 746                }),
 747            "{}",
 748            thread.to_markdown()
 749        );
 750    });
 751}
 752
 753#[gpui::test]
 754#[cfg_attr(not(feature = "e2e"), ignore)]
 755async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 756    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 757
 758    // Test a tool call that's likely to complete *before* streaming stops.
 759    let mut events = thread
 760        .update(cx, |thread, cx| {
 761            thread.add_tool(WordListTool);
 762            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 763        })
 764        .unwrap();
 765
 766    let mut saw_partial_tool_use = false;
 767    while let Some(event) = events.next().await {
 768        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 769            thread.update(cx, |thread, _cx| {
 770                // Look for a tool use in the thread's last message
 771                let message = thread.last_received_or_pending_message().unwrap();
 772                let agent_message = message.as_agent_message().unwrap();
 773                let last_content = agent_message.content.last().unwrap();
 774                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 775                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 776                    if tool_call.status == acp::ToolCallStatus::Pending {
 777                        if !last_tool_use.is_input_complete
 778                            && last_tool_use.input.get("g").is_none()
 779                        {
 780                            saw_partial_tool_use = true;
 781                        }
 782                    } else {
 783                        last_tool_use
 784                            .input
 785                            .get("a")
 786                            .expect("'a' has streamed because input is now complete");
 787                        last_tool_use
 788                            .input
 789                            .get("g")
 790                            .expect("'g' has streamed because input is now complete");
 791                    }
 792                } else {
 793                    panic!("last content should be a tool use");
 794                }
 795            });
 796        }
 797    }
 798
 799    assert!(
 800        saw_partial_tool_use,
 801        "should see at least one partially streamed tool use in the history"
 802    );
 803}
 804
 805#[gpui::test]
 806async fn test_tool_authorization(cx: &mut TestAppContext) {
 807    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 808    let fake_model = model.as_fake();
 809
 810    let mut events = thread
 811        .update(cx, |thread, cx| {
 812            thread.add_tool(ToolRequiringPermission);
 813            thread.send(UserMessageId::new(), ["abc"], cx)
 814        })
 815        .unwrap();
 816    cx.run_until_parked();
 817    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 818        LanguageModelToolUse {
 819            id: "tool_id_1".into(),
 820            name: ToolRequiringPermission::NAME.into(),
 821            raw_input: "{}".into(),
 822            input: json!({}),
 823            is_input_complete: true,
 824            thought_signature: None,
 825        },
 826    ));
 827    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 828        LanguageModelToolUse {
 829            id: "tool_id_2".into(),
 830            name: ToolRequiringPermission::NAME.into(),
 831            raw_input: "{}".into(),
 832            input: json!({}),
 833            is_input_complete: true,
 834            thought_signature: None,
 835        },
 836    ));
 837    fake_model.end_last_completion_stream();
 838    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 839    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 840
 841    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 842    tool_call_auth_1
 843        .response
 844        .send(acp_thread::SelectedPermissionOutcome::new(
 845            acp::PermissionOptionId::new("allow"),
 846            acp::PermissionOptionKind::AllowOnce,
 847        ))
 848        .unwrap();
 849    cx.run_until_parked();
 850
 851    // Reject the second - send "deny" option_id directly since Deny is now a button
 852    tool_call_auth_2
 853        .response
 854        .send(acp_thread::SelectedPermissionOutcome::new(
 855            acp::PermissionOptionId::new("deny"),
 856            acp::PermissionOptionKind::RejectOnce,
 857        ))
 858        .unwrap();
 859    cx.run_until_parked();
 860
 861    let completion = fake_model.pending_completions().pop().unwrap();
 862    let message = completion.messages.last().unwrap();
 863    assert_eq!(
 864        message.content,
 865        vec![
 866            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 867                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 868                tool_name: ToolRequiringPermission::NAME.into(),
 869                is_error: false,
 870                content: "Allowed".into(),
 871                output: Some("Allowed".into())
 872            }),
 873            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 874                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 875                tool_name: ToolRequiringPermission::NAME.into(),
 876                is_error: true,
 877                content: "Permission to run tool denied by user".into(),
 878                output: Some("Permission to run tool denied by user".into())
 879            })
 880        ]
 881    );
 882
 883    // Simulate yet another tool call.
 884    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 885        LanguageModelToolUse {
 886            id: "tool_id_3".into(),
 887            name: ToolRequiringPermission::NAME.into(),
 888            raw_input: "{}".into(),
 889            input: json!({}),
 890            is_input_complete: true,
 891            thought_signature: None,
 892        },
 893    ));
 894    fake_model.end_last_completion_stream();
 895
 896    // Respond by always allowing tools - send transformed option_id
 897    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 898    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 899    tool_call_auth_3
 900        .response
 901        .send(acp_thread::SelectedPermissionOutcome::new(
 902            acp::PermissionOptionId::new("always_allow:tool_requiring_permission"),
 903            acp::PermissionOptionKind::AllowAlways,
 904        ))
 905        .unwrap();
 906    cx.run_until_parked();
 907    let completion = fake_model.pending_completions().pop().unwrap();
 908    let message = completion.messages.last().unwrap();
 909    assert_eq!(
 910        message.content,
 911        vec![language_model::MessageContent::ToolResult(
 912            LanguageModelToolResult {
 913                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 914                tool_name: ToolRequiringPermission::NAME.into(),
 915                is_error: false,
 916                content: "Allowed".into(),
 917                output: Some("Allowed".into())
 918            }
 919        )]
 920    );
 921
 922    // Simulate a final tool call, ensuring we don't trigger authorization.
 923    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 924        LanguageModelToolUse {
 925            id: "tool_id_4".into(),
 926            name: ToolRequiringPermission::NAME.into(),
 927            raw_input: "{}".into(),
 928            input: json!({}),
 929            is_input_complete: true,
 930            thought_signature: None,
 931        },
 932    ));
 933    fake_model.end_last_completion_stream();
 934    cx.run_until_parked();
 935    let completion = fake_model.pending_completions().pop().unwrap();
 936    let message = completion.messages.last().unwrap();
 937    assert_eq!(
 938        message.content,
 939        vec![language_model::MessageContent::ToolResult(
 940            LanguageModelToolResult {
 941                tool_use_id: "tool_id_4".into(),
 942                tool_name: ToolRequiringPermission::NAME.into(),
 943                is_error: false,
 944                content: "Allowed".into(),
 945                output: Some("Allowed".into())
 946            }
 947        )]
 948    );
 949}
 950
 951#[gpui::test]
 952async fn test_tool_hallucination(cx: &mut TestAppContext) {
 953    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 954    let fake_model = model.as_fake();
 955
 956    let mut events = thread
 957        .update(cx, |thread, cx| {
 958            thread.send(UserMessageId::new(), ["abc"], cx)
 959        })
 960        .unwrap();
 961    cx.run_until_parked();
 962    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 963        LanguageModelToolUse {
 964            id: "tool_id_1".into(),
 965            name: "nonexistent_tool".into(),
 966            raw_input: "{}".into(),
 967            input: json!({}),
 968            is_input_complete: true,
 969            thought_signature: None,
 970        },
 971    ));
 972    fake_model.end_last_completion_stream();
 973
 974    let tool_call = expect_tool_call(&mut events).await;
 975    assert_eq!(tool_call.title, "nonexistent_tool");
 976    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 977    let update = expect_tool_call_update_fields(&mut events).await;
 978    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 979}
 980
 981async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 982    let event = events
 983        .next()
 984        .await
 985        .expect("no tool call authorization event received")
 986        .unwrap();
 987    match event {
 988        ThreadEvent::ToolCall(tool_call) => tool_call,
 989        event => {
 990            panic!("Unexpected event {event:?}");
 991        }
 992    }
 993}
 994
 995async fn expect_tool_call_update_fields(
 996    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 997) -> acp::ToolCallUpdate {
 998    let event = events
 999        .next()
1000        .await
1001        .expect("no tool call authorization event received")
1002        .unwrap();
1003    match event {
1004        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
1005        event => {
1006            panic!("Unexpected event {event:?}");
1007        }
1008    }
1009}
1010
1011async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1012    let event = events
1013        .next()
1014        .await
1015        .expect("no plan event received")
1016        .unwrap();
1017    match event {
1018        ThreadEvent::Plan(plan) => plan,
1019        event => {
1020            panic!("Unexpected event {event:?}");
1021        }
1022    }
1023}
1024
1025async fn next_tool_call_authorization(
1026    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1027) -> ToolCallAuthorization {
1028    loop {
1029        let event = events
1030            .next()
1031            .await
1032            .expect("no tool call authorization event received")
1033            .unwrap();
1034        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1035            let permission_kinds = tool_call_authorization
1036                .options
1037                .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1038                .map(|option| option.kind);
1039            let allow_once = tool_call_authorization
1040                .options
1041                .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1042                .map(|option| option.kind);
1043
1044            assert_eq!(
1045                permission_kinds,
1046                Some(acp::PermissionOptionKind::AllowAlways)
1047            );
1048            assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1049            return tool_call_authorization;
1050        }
1051    }
1052}
1053
1054#[test]
1055fn test_permission_options_terminal_with_pattern() {
1056    let permission_options = ToolPermissionContext::new(
1057        TerminalTool::NAME,
1058        vec!["cargo build --release".to_string()],
1059    )
1060    .build_permission_options();
1061
1062    let PermissionOptions::Dropdown(choices) = permission_options else {
1063        panic!("Expected dropdown permission options");
1064    };
1065
1066    assert_eq!(choices.len(), 3);
1067    let labels: Vec<&str> = choices
1068        .iter()
1069        .map(|choice| choice.allow.name.as_ref())
1070        .collect();
1071    assert!(labels.contains(&"Always for terminal"));
1072    assert!(labels.contains(&"Always for `cargo build` commands"));
1073    assert!(labels.contains(&"Only this time"));
1074}
1075
1076#[test]
1077fn test_permission_options_terminal_command_with_flag_second_token() {
1078    let permission_options =
1079        ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1080            .build_permission_options();
1081
1082    let PermissionOptions::Dropdown(choices) = permission_options else {
1083        panic!("Expected dropdown permission options");
1084    };
1085
1086    assert_eq!(choices.len(), 3);
1087    let labels: Vec<&str> = choices
1088        .iter()
1089        .map(|choice| choice.allow.name.as_ref())
1090        .collect();
1091    assert!(labels.contains(&"Always for terminal"));
1092    assert!(labels.contains(&"Always for `ls` commands"));
1093    assert!(labels.contains(&"Only this time"));
1094}
1095
1096#[test]
1097fn test_permission_options_terminal_single_word_command() {
1098    let permission_options =
1099        ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1100            .build_permission_options();
1101
1102    let PermissionOptions::Dropdown(choices) = permission_options else {
1103        panic!("Expected dropdown permission options");
1104    };
1105
1106    assert_eq!(choices.len(), 3);
1107    let labels: Vec<&str> = choices
1108        .iter()
1109        .map(|choice| choice.allow.name.as_ref())
1110        .collect();
1111    assert!(labels.contains(&"Always for terminal"));
1112    assert!(labels.contains(&"Always for `whoami` commands"));
1113    assert!(labels.contains(&"Only this time"));
1114}
1115
1116#[test]
1117fn test_permission_options_edit_file_with_path_pattern() {
1118    let permission_options =
1119        ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1120            .build_permission_options();
1121
1122    let PermissionOptions::Dropdown(choices) = permission_options else {
1123        panic!("Expected dropdown permission options");
1124    };
1125
1126    let labels: Vec<&str> = choices
1127        .iter()
1128        .map(|choice| choice.allow.name.as_ref())
1129        .collect();
1130    assert!(labels.contains(&"Always for edit file"));
1131    assert!(labels.contains(&"Always for `src/`"));
1132}
1133
1134#[test]
1135fn test_permission_options_fetch_with_domain_pattern() {
1136    let permission_options =
1137        ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1138            .build_permission_options();
1139
1140    let PermissionOptions::Dropdown(choices) = permission_options else {
1141        panic!("Expected dropdown permission options");
1142    };
1143
1144    let labels: Vec<&str> = choices
1145        .iter()
1146        .map(|choice| choice.allow.name.as_ref())
1147        .collect();
1148    assert!(labels.contains(&"Always for fetch"));
1149    assert!(labels.contains(&"Always for `docs.rs`"));
1150}
1151
1152#[test]
1153fn test_permission_options_without_pattern() {
1154    let permission_options = ToolPermissionContext::new(
1155        TerminalTool::NAME,
1156        vec!["./deploy.sh --production".to_string()],
1157    )
1158    .build_permission_options();
1159
1160    let PermissionOptions::Dropdown(choices) = permission_options else {
1161        panic!("Expected dropdown permission options");
1162    };
1163
1164    assert_eq!(choices.len(), 2);
1165    let labels: Vec<&str> = choices
1166        .iter()
1167        .map(|choice| choice.allow.name.as_ref())
1168        .collect();
1169    assert!(labels.contains(&"Always for terminal"));
1170    assert!(labels.contains(&"Only this time"));
1171    assert!(!labels.iter().any(|label| label.contains("commands")));
1172}
1173
1174#[test]
1175fn test_permission_options_symlink_target_are_flat_once_only() {
1176    let permission_options =
1177        ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1178            .build_permission_options();
1179
1180    let PermissionOptions::Flat(options) = permission_options else {
1181        panic!("Expected flat permission options for symlink target authorization");
1182    };
1183
1184    assert_eq!(options.len(), 2);
1185    assert!(options.iter().any(|option| {
1186        option.option_id.0.as_ref() == "allow"
1187            && option.kind == acp::PermissionOptionKind::AllowOnce
1188    }));
1189    assert!(options.iter().any(|option| {
1190        option.option_id.0.as_ref() == "deny"
1191            && option.kind == acp::PermissionOptionKind::RejectOnce
1192    }));
1193}
1194
1195#[test]
1196fn test_permission_option_ids_for_terminal() {
1197    let permission_options = ToolPermissionContext::new(
1198        TerminalTool::NAME,
1199        vec!["cargo build --release".to_string()],
1200    )
1201    .build_permission_options();
1202
1203    let PermissionOptions::Dropdown(choices) = permission_options else {
1204        panic!("Expected dropdown permission options");
1205    };
1206
1207    // Expect 3 choices: always-tool, always-pattern, once
1208    assert_eq!(choices.len(), 3);
1209
1210    // First two choices both use the tool-level option IDs
1211    assert_eq!(
1212        choices[0].allow.option_id.0.as_ref(),
1213        "always_allow:terminal"
1214    );
1215    assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1216    assert!(choices[0].sub_patterns.is_empty());
1217
1218    assert_eq!(
1219        choices[1].allow.option_id.0.as_ref(),
1220        "always_allow:terminal"
1221    );
1222    assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1223    assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1224
1225    // Third choice is the one-time allow/deny
1226    assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1227    assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1228    assert!(choices[2].sub_patterns.is_empty());
1229}
1230
1231#[test]
1232fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1233    let permission_options = ToolPermissionContext::new(
1234        TerminalTool::NAME,
1235        vec!["cargo test 2>&1 | tail".to_string()],
1236    )
1237    .build_permission_options();
1238
1239    let PermissionOptions::DropdownWithPatterns {
1240        choices,
1241        patterns,
1242        tool_name,
1243    } = permission_options
1244    else {
1245        panic!("Expected DropdownWithPatterns permission options for pipeline command");
1246    };
1247
1248    assert_eq!(tool_name, TerminalTool::NAME);
1249
1250    // Should have "Always for terminal" and "Only this time" choices
1251    assert_eq!(choices.len(), 2);
1252    let labels: Vec<&str> = choices
1253        .iter()
1254        .map(|choice| choice.allow.name.as_ref())
1255        .collect();
1256    assert!(labels.contains(&"Always for terminal"));
1257    assert!(labels.contains(&"Only this time"));
1258
1259    // Should have per-command patterns for "cargo test" and "tail"
1260    assert_eq!(patterns.len(), 2);
1261    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1262    assert!(pattern_names.contains(&"cargo test"));
1263    assert!(pattern_names.contains(&"tail"));
1264
1265    // Verify patterns are valid regex patterns
1266    let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1267    assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1268    assert!(regex_patterns.contains(&"^tail\\b"));
1269}
1270
1271#[test]
1272fn test_permission_options_terminal_pipeline_with_chaining() {
1273    let permission_options = ToolPermissionContext::new(
1274        TerminalTool::NAME,
1275        vec!["npm install && npm test | tail".to_string()],
1276    )
1277    .build_permission_options();
1278
1279    let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1280        panic!("Expected DropdownWithPatterns for chained pipeline command");
1281    };
1282
1283    // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1284    assert_eq!(patterns.len(), 3);
1285    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1286    assert!(pattern_names.contains(&"npm install"));
1287    assert!(pattern_names.contains(&"npm test"));
1288    assert!(pattern_names.contains(&"tail"));
1289}
1290
1291#[gpui::test]
1292#[cfg_attr(not(feature = "e2e"), ignore)]
1293async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1294    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1295
1296    // Test concurrent tool calls with different delay times
1297    let events = thread
1298        .update(cx, |thread, cx| {
1299            thread.add_tool(DelayTool);
1300            thread.send(
1301                UserMessageId::new(),
1302                [
1303                    "Call the delay tool twice in the same message.",
1304                    "Once with 100ms. Once with 300ms.",
1305                    "When both timers are complete, describe the outputs.",
1306                ],
1307                cx,
1308            )
1309        })
1310        .unwrap()
1311        .collect()
1312        .await;
1313
1314    let stop_reasons = stop_events(events);
1315    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1316
1317    thread.update(cx, |thread, _cx| {
1318        let last_message = thread.last_received_or_pending_message().unwrap();
1319        let agent_message = last_message.as_agent_message().unwrap();
1320        let text = agent_message
1321            .content
1322            .iter()
1323            .filter_map(|content| {
1324                if let AgentMessageContent::Text(text) = content {
1325                    Some(text.as_str())
1326                } else {
1327                    None
1328                }
1329            })
1330            .collect::<String>();
1331
1332        assert!(text.contains("Ding"));
1333    });
1334}
1335
1336#[gpui::test]
1337async fn test_profiles(cx: &mut TestAppContext) {
1338    let ThreadTest {
1339        model, thread, fs, ..
1340    } = setup(cx, TestModel::Fake).await;
1341    let fake_model = model.as_fake();
1342
1343    thread.update(cx, |thread, _cx| {
1344        thread.add_tool(DelayTool);
1345        thread.add_tool(EchoTool);
1346        thread.add_tool(InfiniteTool);
1347    });
1348
1349    // Override profiles and wait for settings to be loaded.
1350    fs.insert_file(
1351        paths::settings_file(),
1352        json!({
1353            "agent": {
1354                "profiles": {
1355                    "test-1": {
1356                        "name": "Test Profile 1",
1357                        "tools": {
1358                            EchoTool::NAME: true,
1359                            DelayTool::NAME: true,
1360                        }
1361                    },
1362                    "test-2": {
1363                        "name": "Test Profile 2",
1364                        "tools": {
1365                            InfiniteTool::NAME: true,
1366                        }
1367                    }
1368                }
1369            }
1370        })
1371        .to_string()
1372        .into_bytes(),
1373    )
1374    .await;
1375    cx.run_until_parked();
1376
1377    // Test that test-1 profile (default) has echo and delay tools
1378    thread
1379        .update(cx, |thread, cx| {
1380            thread.set_profile(AgentProfileId("test-1".into()), cx);
1381            thread.send(UserMessageId::new(), ["test"], cx)
1382        })
1383        .unwrap();
1384    cx.run_until_parked();
1385
1386    let mut pending_completions = fake_model.pending_completions();
1387    assert_eq!(pending_completions.len(), 1);
1388    let completion = pending_completions.pop().unwrap();
1389    let tool_names: Vec<String> = completion
1390        .tools
1391        .iter()
1392        .map(|tool| tool.name.clone())
1393        .collect();
1394    assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1395    fake_model.end_last_completion_stream();
1396
1397    // Switch to test-2 profile, and verify that it has only the infinite tool.
1398    thread
1399        .update(cx, |thread, cx| {
1400            thread.set_profile(AgentProfileId("test-2".into()), cx);
1401            thread.send(UserMessageId::new(), ["test2"], cx)
1402        })
1403        .unwrap();
1404    cx.run_until_parked();
1405    let mut pending_completions = fake_model.pending_completions();
1406    assert_eq!(pending_completions.len(), 1);
1407    let completion = pending_completions.pop().unwrap();
1408    let tool_names: Vec<String> = completion
1409        .tools
1410        .iter()
1411        .map(|tool| tool.name.clone())
1412        .collect();
1413    assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1414}
1415
1416#[gpui::test]
1417async fn test_mcp_tools(cx: &mut TestAppContext) {
1418    let ThreadTest {
1419        model,
1420        thread,
1421        context_server_store,
1422        fs,
1423        ..
1424    } = setup(cx, TestModel::Fake).await;
1425    let fake_model = model.as_fake();
1426
1427    // Override profiles and wait for settings to be loaded.
1428    fs.insert_file(
1429        paths::settings_file(),
1430        json!({
1431            "agent": {
1432                "tool_permissions": { "default": "allow" },
1433                "profiles": {
1434                    "test": {
1435                        "name": "Test Profile",
1436                        "enable_all_context_servers": true,
1437                        "tools": {
1438                            EchoTool::NAME: true,
1439                        }
1440                    },
1441                }
1442            }
1443        })
1444        .to_string()
1445        .into_bytes(),
1446    )
1447    .await;
1448    cx.run_until_parked();
1449    thread.update(cx, |thread, cx| {
1450        thread.set_profile(AgentProfileId("test".into()), cx)
1451    });
1452
1453    let mut mcp_tool_calls = setup_context_server(
1454        "test_server",
1455        vec![context_server::types::Tool {
1456            name: "echo".into(),
1457            description: None,
1458            input_schema: serde_json::to_value(EchoTool::input_schema(
1459                LanguageModelToolSchemaFormat::JsonSchema,
1460            ))
1461            .unwrap(),
1462            output_schema: None,
1463            annotations: None,
1464        }],
1465        &context_server_store,
1466        cx,
1467    );
1468
1469    let events = thread.update(cx, |thread, cx| {
1470        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1471    });
1472    cx.run_until_parked();
1473
1474    // Simulate the model calling the MCP tool.
1475    let completion = fake_model.pending_completions().pop().unwrap();
1476    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1477    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1478        LanguageModelToolUse {
1479            id: "tool_1".into(),
1480            name: "echo".into(),
1481            raw_input: json!({"text": "test"}).to_string(),
1482            input: json!({"text": "test"}),
1483            is_input_complete: true,
1484            thought_signature: None,
1485        },
1486    ));
1487    fake_model.end_last_completion_stream();
1488    cx.run_until_parked();
1489
1490    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1491    assert_eq!(tool_call_params.name, "echo");
1492    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1493    tool_call_response
1494        .send(context_server::types::CallToolResponse {
1495            content: vec![context_server::types::ToolResponseContent::Text {
1496                text: "test".into(),
1497            }],
1498            is_error: None,
1499            meta: None,
1500            structured_content: None,
1501        })
1502        .unwrap();
1503    cx.run_until_parked();
1504
1505    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1506    fake_model.send_last_completion_stream_text_chunk("Done!");
1507    fake_model.end_last_completion_stream();
1508    events.collect::<Vec<_>>().await;
1509
1510    // Send again after adding the echo tool, ensuring the name collision is resolved.
1511    let events = thread.update(cx, |thread, cx| {
1512        thread.add_tool(EchoTool);
1513        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1514    });
1515    cx.run_until_parked();
1516    let completion = fake_model.pending_completions().pop().unwrap();
1517    assert_eq!(
1518        tool_names_for_completion(&completion),
1519        vec!["echo", "test_server_echo"]
1520    );
1521    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1522        LanguageModelToolUse {
1523            id: "tool_2".into(),
1524            name: "test_server_echo".into(),
1525            raw_input: json!({"text": "mcp"}).to_string(),
1526            input: json!({"text": "mcp"}),
1527            is_input_complete: true,
1528            thought_signature: None,
1529        },
1530    ));
1531    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1532        LanguageModelToolUse {
1533            id: "tool_3".into(),
1534            name: "echo".into(),
1535            raw_input: json!({"text": "native"}).to_string(),
1536            input: json!({"text": "native"}),
1537            is_input_complete: true,
1538            thought_signature: None,
1539        },
1540    ));
1541    fake_model.end_last_completion_stream();
1542    cx.run_until_parked();
1543
1544    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1545    assert_eq!(tool_call_params.name, "echo");
1546    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1547    tool_call_response
1548        .send(context_server::types::CallToolResponse {
1549            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1550            is_error: None,
1551            meta: None,
1552            structured_content: None,
1553        })
1554        .unwrap();
1555    cx.run_until_parked();
1556
1557    // Ensure the tool results were inserted with the correct names.
1558    let completion = fake_model.pending_completions().pop().unwrap();
1559    assert_eq!(
1560        completion.messages.last().unwrap().content,
1561        vec![
1562            MessageContent::ToolResult(LanguageModelToolResult {
1563                tool_use_id: "tool_3".into(),
1564                tool_name: "echo".into(),
1565                is_error: false,
1566                content: "native".into(),
1567                output: Some("native".into()),
1568            },),
1569            MessageContent::ToolResult(LanguageModelToolResult {
1570                tool_use_id: "tool_2".into(),
1571                tool_name: "test_server_echo".into(),
1572                is_error: false,
1573                content: "mcp".into(),
1574                output: Some("mcp".into()),
1575            },),
1576        ]
1577    );
1578    fake_model.end_last_completion_stream();
1579    events.collect::<Vec<_>>().await;
1580}
1581
1582#[gpui::test]
1583async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1584    let ThreadTest {
1585        model,
1586        thread,
1587        context_server_store,
1588        fs,
1589        ..
1590    } = setup(cx, TestModel::Fake).await;
1591    let fake_model = model.as_fake();
1592
1593    // Setup settings to allow MCP tools
1594    fs.insert_file(
1595        paths::settings_file(),
1596        json!({
1597            "agent": {
1598                "always_allow_tool_actions": true,
1599                "profiles": {
1600                    "test": {
1601                        "name": "Test Profile",
1602                        "enable_all_context_servers": true,
1603                        "tools": {}
1604                    },
1605                }
1606            }
1607        })
1608        .to_string()
1609        .into_bytes(),
1610    )
1611    .await;
1612    cx.run_until_parked();
1613    thread.update(cx, |thread, cx| {
1614        thread.set_profile(AgentProfileId("test".into()), cx)
1615    });
1616
1617    // Setup a context server with a tool
1618    let mut mcp_tool_calls = setup_context_server(
1619        "github_server",
1620        vec![context_server::types::Tool {
1621            name: "issue_read".into(),
1622            description: Some("Read a GitHub issue".into()),
1623            input_schema: json!({
1624                "type": "object",
1625                "properties": {
1626                    "issue_url": { "type": "string" }
1627                }
1628            }),
1629            output_schema: None,
1630            annotations: None,
1631        }],
1632        &context_server_store,
1633        cx,
1634    );
1635
1636    // Send a message and have the model call the MCP tool
1637    let events = thread.update(cx, |thread, cx| {
1638        thread
1639            .send(UserMessageId::new(), ["Read issue #47404"], cx)
1640            .unwrap()
1641    });
1642    cx.run_until_parked();
1643
1644    // Verify the MCP tool is available to the model
1645    let completion = fake_model.pending_completions().pop().unwrap();
1646    assert_eq!(
1647        tool_names_for_completion(&completion),
1648        vec!["issue_read"],
1649        "MCP tool should be available"
1650    );
1651
1652    // Simulate the model calling the MCP tool
1653    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1654        LanguageModelToolUse {
1655            id: "tool_1".into(),
1656            name: "issue_read".into(),
1657            raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1658                .to_string(),
1659            input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1660            is_input_complete: true,
1661            thought_signature: None,
1662        },
1663    ));
1664    fake_model.end_last_completion_stream();
1665    cx.run_until_parked();
1666
1667    // The MCP server receives the tool call and responds with content
1668    let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1669    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1670    assert_eq!(tool_call_params.name, "issue_read");
1671    tool_call_response
1672        .send(context_server::types::CallToolResponse {
1673            content: vec![context_server::types::ToolResponseContent::Text {
1674                text: expected_tool_output.into(),
1675            }],
1676            is_error: None,
1677            meta: None,
1678            structured_content: None,
1679        })
1680        .unwrap();
1681    cx.run_until_parked();
1682
1683    // After tool completes, the model continues with a new completion request
1684    // that includes the tool results. We need to respond to this.
1685    let _completion = fake_model.pending_completions().pop().unwrap();
1686    fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1687    fake_model
1688        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1689    fake_model.end_last_completion_stream();
1690    events.collect::<Vec<_>>().await;
1691
1692    // Verify the tool result is stored in the thread by checking the markdown output.
1693    // The tool result is in the first assistant message (not the last one, which is
1694    // the model's response after the tool completed).
1695    thread.update(cx, |thread, _cx| {
1696        let markdown = thread.to_markdown();
1697        assert!(
1698            markdown.contains("**Tool Result**: issue_read"),
1699            "Thread should contain tool result header"
1700        );
1701        assert!(
1702            markdown.contains(expected_tool_output),
1703            "Thread should contain tool output: {}",
1704            expected_tool_output
1705        );
1706    });
1707
1708    // Simulate app restart: disconnect the MCP server.
1709    // After restart, the MCP server won't be connected yet when the thread is replayed.
1710    context_server_store.update(cx, |store, cx| {
1711        let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1712    });
1713    cx.run_until_parked();
1714
1715    // Replay the thread (this is what happens when loading a saved thread)
1716    let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1717
1718    let mut found_tool_call = None;
1719    let mut found_tool_call_update_with_output = None;
1720
1721    while let Some(event) = replay_events.next().await {
1722        let event = event.unwrap();
1723        match &event {
1724            ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1725                found_tool_call = Some(tc.clone());
1726            }
1727            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1728                if update.tool_call_id.to_string() == "tool_1" =>
1729            {
1730                if update.fields.raw_output.is_some() {
1731                    found_tool_call_update_with_output = Some(update.clone());
1732                }
1733            }
1734            _ => {}
1735        }
1736    }
1737
1738    // The tool call should be found
1739    assert!(
1740        found_tool_call.is_some(),
1741        "Tool call should be emitted during replay"
1742    );
1743
1744    assert!(
1745        found_tool_call_update_with_output.is_some(),
1746        "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1747    );
1748
1749    let update = found_tool_call_update_with_output.unwrap();
1750    assert_eq!(
1751        update.fields.raw_output,
1752        Some(expected_tool_output.into()),
1753        "raw_output should contain the saved tool result"
1754    );
1755
1756    // Also verify the status is correct (completed, not failed)
1757    assert_eq!(
1758        update.fields.status,
1759        Some(acp::ToolCallStatus::Completed),
1760        "Tool call status should reflect the original completion status"
1761    );
1762}
1763
1764#[gpui::test]
1765async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1766    let ThreadTest {
1767        model,
1768        thread,
1769        context_server_store,
1770        fs,
1771        ..
1772    } = setup(cx, TestModel::Fake).await;
1773    let fake_model = model.as_fake();
1774
1775    // Set up a profile with all tools enabled
1776    fs.insert_file(
1777        paths::settings_file(),
1778        json!({
1779            "agent": {
1780                "profiles": {
1781                    "test": {
1782                        "name": "Test Profile",
1783                        "enable_all_context_servers": true,
1784                        "tools": {
1785                            EchoTool::NAME: true,
1786                            DelayTool::NAME: true,
1787                            WordListTool::NAME: true,
1788                            ToolRequiringPermission::NAME: true,
1789                            InfiniteTool::NAME: true,
1790                        }
1791                    },
1792                }
1793            }
1794        })
1795        .to_string()
1796        .into_bytes(),
1797    )
1798    .await;
1799    cx.run_until_parked();
1800
1801    thread.update(cx, |thread, cx| {
1802        thread.set_profile(AgentProfileId("test".into()), cx);
1803        thread.add_tool(EchoTool);
1804        thread.add_tool(DelayTool);
1805        thread.add_tool(WordListTool);
1806        thread.add_tool(ToolRequiringPermission);
1807        thread.add_tool(InfiniteTool);
1808    });
1809
1810    // Set up multiple context servers with some overlapping tool names
1811    let _server1_calls = setup_context_server(
1812        "xxx",
1813        vec![
1814            context_server::types::Tool {
1815                name: "echo".into(), // Conflicts with native EchoTool
1816                description: None,
1817                input_schema: serde_json::to_value(EchoTool::input_schema(
1818                    LanguageModelToolSchemaFormat::JsonSchema,
1819                ))
1820                .unwrap(),
1821                output_schema: None,
1822                annotations: None,
1823            },
1824            context_server::types::Tool {
1825                name: "unique_tool_1".into(),
1826                description: None,
1827                input_schema: json!({"type": "object", "properties": {}}),
1828                output_schema: None,
1829                annotations: None,
1830            },
1831        ],
1832        &context_server_store,
1833        cx,
1834    );
1835
1836    let _server2_calls = setup_context_server(
1837        "yyy",
1838        vec![
1839            context_server::types::Tool {
1840                name: "echo".into(), // Also conflicts with native EchoTool
1841                description: None,
1842                input_schema: serde_json::to_value(EchoTool::input_schema(
1843                    LanguageModelToolSchemaFormat::JsonSchema,
1844                ))
1845                .unwrap(),
1846                output_schema: None,
1847                annotations: None,
1848            },
1849            context_server::types::Tool {
1850                name: "unique_tool_2".into(),
1851                description: None,
1852                input_schema: json!({"type": "object", "properties": {}}),
1853                output_schema: None,
1854                annotations: None,
1855            },
1856            context_server::types::Tool {
1857                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1858                description: None,
1859                input_schema: json!({"type": "object", "properties": {}}),
1860                output_schema: None,
1861                annotations: None,
1862            },
1863            context_server::types::Tool {
1864                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1865                description: None,
1866                input_schema: json!({"type": "object", "properties": {}}),
1867                output_schema: None,
1868                annotations: None,
1869            },
1870        ],
1871        &context_server_store,
1872        cx,
1873    );
1874    let _server3_calls = setup_context_server(
1875        "zzz",
1876        vec![
1877            context_server::types::Tool {
1878                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1879                description: None,
1880                input_schema: json!({"type": "object", "properties": {}}),
1881                output_schema: None,
1882                annotations: None,
1883            },
1884            context_server::types::Tool {
1885                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1886                description: None,
1887                input_schema: json!({"type": "object", "properties": {}}),
1888                output_schema: None,
1889                annotations: None,
1890            },
1891            context_server::types::Tool {
1892                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1893                description: None,
1894                input_schema: json!({"type": "object", "properties": {}}),
1895                output_schema: None,
1896                annotations: None,
1897            },
1898        ],
1899        &context_server_store,
1900        cx,
1901    );
1902
1903    // Server with spaces in name - tests snake_case conversion for API compatibility
1904    let _server4_calls = setup_context_server(
1905        "Azure DevOps",
1906        vec![context_server::types::Tool {
1907            name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1908            description: None,
1909            input_schema: serde_json::to_value(EchoTool::input_schema(
1910                LanguageModelToolSchemaFormat::JsonSchema,
1911            ))
1912            .unwrap(),
1913            output_schema: None,
1914            annotations: None,
1915        }],
1916        &context_server_store,
1917        cx,
1918    );
1919
1920    thread
1921        .update(cx, |thread, cx| {
1922            thread.send(UserMessageId::new(), ["Go"], cx)
1923        })
1924        .unwrap();
1925    cx.run_until_parked();
1926    let completion = fake_model.pending_completions().pop().unwrap();
1927    assert_eq!(
1928        tool_names_for_completion(&completion),
1929        vec![
1930            "azure_dev_ops_echo",
1931            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1932            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1933            "delay",
1934            "echo",
1935            "infinite",
1936            "tool_requiring_permission",
1937            "unique_tool_1",
1938            "unique_tool_2",
1939            "word_list",
1940            "xxx_echo",
1941            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1942            "yyy_echo",
1943            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1944        ]
1945    );
1946}
1947
1948#[gpui::test]
1949#[cfg_attr(not(feature = "e2e"), ignore)]
1950async fn test_cancellation(cx: &mut TestAppContext) {
1951    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1952
1953    let mut events = thread
1954        .update(cx, |thread, cx| {
1955            thread.add_tool(InfiniteTool);
1956            thread.add_tool(EchoTool);
1957            thread.send(
1958                UserMessageId::new(),
1959                ["Call the echo tool, then call the infinite tool, then explain their output"],
1960                cx,
1961            )
1962        })
1963        .unwrap();
1964
1965    // Wait until both tools are called.
1966    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1967    let mut echo_id = None;
1968    let mut echo_completed = false;
1969    while let Some(event) = events.next().await {
1970        match event.unwrap() {
1971            ThreadEvent::ToolCall(tool_call) => {
1972                assert_eq!(tool_call.title, expected_tools.remove(0));
1973                if tool_call.title == "Echo" {
1974                    echo_id = Some(tool_call.tool_call_id);
1975                }
1976            }
1977            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1978                acp::ToolCallUpdate {
1979                    tool_call_id,
1980                    fields:
1981                        acp::ToolCallUpdateFields {
1982                            status: Some(acp::ToolCallStatus::Completed),
1983                            ..
1984                        },
1985                    ..
1986                },
1987            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1988                echo_completed = true;
1989            }
1990            _ => {}
1991        }
1992
1993        if expected_tools.is_empty() && echo_completed {
1994            break;
1995        }
1996    }
1997
1998    // Cancel the current send and ensure that the event stream is closed, even
1999    // if one of the tools is still running.
2000    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2001    let events = events.collect::<Vec<_>>().await;
2002    let last_event = events.last();
2003    assert!(
2004        matches!(
2005            last_event,
2006            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2007        ),
2008        "unexpected event {last_event:?}"
2009    );
2010
2011    // Ensure we can still send a new message after cancellation.
2012    let events = thread
2013        .update(cx, |thread, cx| {
2014            thread.send(
2015                UserMessageId::new(),
2016                ["Testing: reply with 'Hello' then stop."],
2017                cx,
2018            )
2019        })
2020        .unwrap()
2021        .collect::<Vec<_>>()
2022        .await;
2023    thread.update(cx, |thread, _cx| {
2024        let message = thread.last_received_or_pending_message().unwrap();
2025        let agent_message = message.as_agent_message().unwrap();
2026        assert_eq!(
2027            agent_message.content,
2028            vec![AgentMessageContent::Text("Hello".to_string())]
2029        );
2030    });
2031    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2032}
2033
2034#[gpui::test]
2035async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2036    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2037    always_allow_tools(cx);
2038    let fake_model = model.as_fake();
2039
2040    let environment = Rc::new(cx.update(|cx| {
2041        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2042    }));
2043    let handle = environment.terminal_handle.clone().unwrap();
2044
2045    let mut events = thread
2046        .update(cx, |thread, cx| {
2047            thread.add_tool(crate::TerminalTool::new(
2048                thread.project().clone(),
2049                environment,
2050            ));
2051            thread.send(UserMessageId::new(), ["run a command"], cx)
2052        })
2053        .unwrap();
2054
2055    cx.run_until_parked();
2056
2057    // Simulate the model calling the terminal tool
2058    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2059        LanguageModelToolUse {
2060            id: "terminal_tool_1".into(),
2061            name: TerminalTool::NAME.into(),
2062            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2063            input: json!({"command": "sleep 1000", "cd": "."}),
2064            is_input_complete: true,
2065            thought_signature: None,
2066        },
2067    ));
2068    fake_model.end_last_completion_stream();
2069
2070    // Wait for the terminal tool to start running
2071    wait_for_terminal_tool_started(&mut events, cx).await;
2072
2073    // Cancel the thread while the terminal is running
2074    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2075
2076    // Collect remaining events, driving the executor to let cancellation complete
2077    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2078
2079    // Verify the terminal was killed
2080    assert!(
2081        handle.was_killed(),
2082        "expected terminal handle to be killed on cancellation"
2083    );
2084
2085    // Verify we got a cancellation stop event
2086    assert_eq!(
2087        stop_events(remaining_events),
2088        vec![acp::StopReason::Cancelled],
2089    );
2090
2091    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2092    thread.update(cx, |thread, _cx| {
2093        let message = thread.last_received_or_pending_message().unwrap();
2094        let agent_message = message.as_agent_message().unwrap();
2095
2096        let tool_use = agent_message
2097            .content
2098            .iter()
2099            .find_map(|content| match content {
2100                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2101                _ => None,
2102            })
2103            .expect("expected tool use in agent message");
2104
2105        let tool_result = agent_message
2106            .tool_results
2107            .get(&tool_use.id)
2108            .expect("expected tool result");
2109
2110        let result_text = match &tool_result.content {
2111            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2112            _ => panic!("expected text content in tool result"),
2113        };
2114
2115        // "partial output" comes from FakeTerminalHandle's output field
2116        assert!(
2117            result_text.contains("partial output"),
2118            "expected tool result to contain terminal output, got: {result_text}"
2119        );
2120        // Match the actual format from process_content in terminal_tool.rs
2121        assert!(
2122            result_text.contains("The user stopped this command"),
2123            "expected tool result to indicate user stopped, got: {result_text}"
2124        );
2125    });
2126
2127    // Verify we can send a new message after cancellation
2128    verify_thread_recovery(&thread, &fake_model, cx).await;
2129}
2130
2131#[gpui::test]
2132async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2133    // This test verifies that tools which properly handle cancellation via
2134    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2135    // to cancellation and report that they were cancelled.
2136    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2137    always_allow_tools(cx);
2138    let fake_model = model.as_fake();
2139
2140    let (tool, was_cancelled) = CancellationAwareTool::new();
2141
2142    let mut events = thread
2143        .update(cx, |thread, cx| {
2144            thread.add_tool(tool);
2145            thread.send(
2146                UserMessageId::new(),
2147                ["call the cancellation aware tool"],
2148                cx,
2149            )
2150        })
2151        .unwrap();
2152
2153    cx.run_until_parked();
2154
2155    // Simulate the model calling the cancellation-aware tool
2156    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2157        LanguageModelToolUse {
2158            id: "cancellation_aware_1".into(),
2159            name: "cancellation_aware".into(),
2160            raw_input: r#"{}"#.into(),
2161            input: json!({}),
2162            is_input_complete: true,
2163            thought_signature: None,
2164        },
2165    ));
2166    fake_model.end_last_completion_stream();
2167
2168    cx.run_until_parked();
2169
2170    // Wait for the tool call to be reported
2171    let mut tool_started = false;
2172    let deadline = cx.executor().num_cpus() * 100;
2173    for _ in 0..deadline {
2174        cx.run_until_parked();
2175
2176        while let Some(Some(event)) = events.next().now_or_never() {
2177            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2178                if tool_call.title == "Cancellation Aware Tool" {
2179                    tool_started = true;
2180                    break;
2181                }
2182            }
2183        }
2184
2185        if tool_started {
2186            break;
2187        }
2188
2189        cx.background_executor
2190            .timer(Duration::from_millis(10))
2191            .await;
2192    }
2193    assert!(tool_started, "expected cancellation aware tool to start");
2194
2195    // Cancel the thread and wait for it to complete
2196    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2197
2198    // The cancel task should complete promptly because the tool handles cancellation
2199    let timeout = cx.background_executor.timer(Duration::from_secs(5));
2200    futures::select! {
2201        _ = cancel_task.fuse() => {}
2202        _ = timeout.fuse() => {
2203            panic!("cancel task timed out - tool did not respond to cancellation");
2204        }
2205    }
2206
2207    // Verify the tool detected cancellation via its flag
2208    assert!(
2209        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2210        "tool should have detected cancellation via event_stream.cancelled_by_user()"
2211    );
2212
2213    // Collect remaining events
2214    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2215
2216    // Verify we got a cancellation stop event
2217    assert_eq!(
2218        stop_events(remaining_events),
2219        vec![acp::StopReason::Cancelled],
2220    );
2221
2222    // Verify we can send a new message after cancellation
2223    verify_thread_recovery(&thread, &fake_model, cx).await;
2224}
2225
2226/// Helper to verify thread can recover after cancellation by sending a simple message.
2227async fn verify_thread_recovery(
2228    thread: &Entity<Thread>,
2229    fake_model: &FakeLanguageModel,
2230    cx: &mut TestAppContext,
2231) {
2232    let events = thread
2233        .update(cx, |thread, cx| {
2234            thread.send(
2235                UserMessageId::new(),
2236                ["Testing: reply with 'Hello' then stop."],
2237                cx,
2238            )
2239        })
2240        .unwrap();
2241    cx.run_until_parked();
2242    fake_model.send_last_completion_stream_text_chunk("Hello");
2243    fake_model
2244        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2245    fake_model.end_last_completion_stream();
2246
2247    let events = events.collect::<Vec<_>>().await;
2248    thread.update(cx, |thread, _cx| {
2249        let message = thread.last_received_or_pending_message().unwrap();
2250        let agent_message = message.as_agent_message().unwrap();
2251        assert_eq!(
2252            agent_message.content,
2253            vec![AgentMessageContent::Text("Hello".to_string())]
2254        );
2255    });
2256    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2257}
2258
2259/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2260async fn wait_for_terminal_tool_started(
2261    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2262    cx: &mut TestAppContext,
2263) {
2264    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2265    for _ in 0..deadline {
2266        cx.run_until_parked();
2267
2268        while let Some(Some(event)) = events.next().now_or_never() {
2269            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2270                update,
2271            ))) = &event
2272            {
2273                if update.fields.content.as_ref().is_some_and(|content| {
2274                    content
2275                        .iter()
2276                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2277                }) {
2278                    return;
2279                }
2280            }
2281        }
2282
2283        cx.background_executor
2284            .timer(Duration::from_millis(10))
2285            .await;
2286    }
2287    panic!("terminal tool did not start within the expected time");
2288}
2289
2290/// Collects events until a Stop event is received, driving the executor to completion.
2291async fn collect_events_until_stop(
2292    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2293    cx: &mut TestAppContext,
2294) -> Vec<Result<ThreadEvent>> {
2295    let mut collected = Vec::new();
2296    let deadline = cx.executor().num_cpus() * 200;
2297
2298    for _ in 0..deadline {
2299        cx.executor().advance_clock(Duration::from_millis(10));
2300        cx.run_until_parked();
2301
2302        while let Some(Some(event)) = events.next().now_or_never() {
2303            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2304            collected.push(event);
2305            if is_stop {
2306                return collected;
2307            }
2308        }
2309    }
2310    panic!(
2311        "did not receive Stop event within the expected time; collected {} events",
2312        collected.len()
2313    );
2314}
2315
2316#[gpui::test]
2317async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2318    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2319    always_allow_tools(cx);
2320    let fake_model = model.as_fake();
2321
2322    let environment = Rc::new(cx.update(|cx| {
2323        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2324    }));
2325    let handle = environment.terminal_handle.clone().unwrap();
2326
2327    let message_id = UserMessageId::new();
2328    let mut events = thread
2329        .update(cx, |thread, cx| {
2330            thread.add_tool(crate::TerminalTool::new(
2331                thread.project().clone(),
2332                environment,
2333            ));
2334            thread.send(message_id.clone(), ["run a command"], cx)
2335        })
2336        .unwrap();
2337
2338    cx.run_until_parked();
2339
2340    // Simulate the model calling the terminal tool
2341    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2342        LanguageModelToolUse {
2343            id: "terminal_tool_1".into(),
2344            name: TerminalTool::NAME.into(),
2345            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2346            input: json!({"command": "sleep 1000", "cd": "."}),
2347            is_input_complete: true,
2348            thought_signature: None,
2349        },
2350    ));
2351    fake_model.end_last_completion_stream();
2352
2353    // Wait for the terminal tool to start running
2354    wait_for_terminal_tool_started(&mut events, cx).await;
2355
2356    // Truncate the thread while the terminal is running
2357    thread
2358        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2359        .unwrap();
2360
2361    // Drive the executor to let cancellation complete
2362    let _ = collect_events_until_stop(&mut events, cx).await;
2363
2364    // Verify the terminal was killed
2365    assert!(
2366        handle.was_killed(),
2367        "expected terminal handle to be killed on truncate"
2368    );
2369
2370    // Verify the thread is empty after truncation
2371    thread.update(cx, |thread, _cx| {
2372        assert_eq!(
2373            thread.to_markdown(),
2374            "",
2375            "expected thread to be empty after truncating the only message"
2376        );
2377    });
2378
2379    // Verify we can send a new message after truncation
2380    verify_thread_recovery(&thread, &fake_model, cx).await;
2381}
2382
2383#[gpui::test]
2384async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2385    // Tests that cancellation properly kills all running terminal tools when multiple are active.
2386    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2387    always_allow_tools(cx);
2388    let fake_model = model.as_fake();
2389
2390    let environment = Rc::new(MultiTerminalEnvironment::new());
2391
2392    let mut events = thread
2393        .update(cx, |thread, cx| {
2394            thread.add_tool(crate::TerminalTool::new(
2395                thread.project().clone(),
2396                environment.clone(),
2397            ));
2398            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2399        })
2400        .unwrap();
2401
2402    cx.run_until_parked();
2403
2404    // Simulate the model calling two terminal tools
2405    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2406        LanguageModelToolUse {
2407            id: "terminal_tool_1".into(),
2408            name: TerminalTool::NAME.into(),
2409            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2410            input: json!({"command": "sleep 1000", "cd": "."}),
2411            is_input_complete: true,
2412            thought_signature: None,
2413        },
2414    ));
2415    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2416        LanguageModelToolUse {
2417            id: "terminal_tool_2".into(),
2418            name: TerminalTool::NAME.into(),
2419            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2420            input: json!({"command": "sleep 2000", "cd": "."}),
2421            is_input_complete: true,
2422            thought_signature: None,
2423        },
2424    ));
2425    fake_model.end_last_completion_stream();
2426
2427    // Wait for both terminal tools to start by counting terminal content updates
2428    let mut terminals_started = 0;
2429    let deadline = cx.executor().num_cpus() * 100;
2430    for _ in 0..deadline {
2431        cx.run_until_parked();
2432
2433        while let Some(Some(event)) = events.next().now_or_never() {
2434            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2435                update,
2436            ))) = &event
2437            {
2438                if update.fields.content.as_ref().is_some_and(|content| {
2439                    content
2440                        .iter()
2441                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2442                }) {
2443                    terminals_started += 1;
2444                    if terminals_started >= 2 {
2445                        break;
2446                    }
2447                }
2448            }
2449        }
2450        if terminals_started >= 2 {
2451            break;
2452        }
2453
2454        cx.background_executor
2455            .timer(Duration::from_millis(10))
2456            .await;
2457    }
2458    assert!(
2459        terminals_started >= 2,
2460        "expected 2 terminal tools to start, got {terminals_started}"
2461    );
2462
2463    // Cancel the thread while both terminals are running
2464    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2465
2466    // Collect remaining events
2467    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2468
2469    // Verify both terminal handles were killed
2470    let handles = environment.handles();
2471    assert_eq!(
2472        handles.len(),
2473        2,
2474        "expected 2 terminal handles to be created"
2475    );
2476    assert!(
2477        handles[0].was_killed(),
2478        "expected first terminal handle to be killed on cancellation"
2479    );
2480    assert!(
2481        handles[1].was_killed(),
2482        "expected second terminal handle to be killed on cancellation"
2483    );
2484
2485    // Verify we got a cancellation stop event
2486    assert_eq!(
2487        stop_events(remaining_events),
2488        vec![acp::StopReason::Cancelled],
2489    );
2490}
2491
2492#[gpui::test]
2493async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2494    // Tests that clicking the stop button on the terminal card (as opposed to the main
2495    // cancel button) properly reports user stopped via the was_stopped_by_user path.
2496    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2497    always_allow_tools(cx);
2498    let fake_model = model.as_fake();
2499
2500    let environment = Rc::new(cx.update(|cx| {
2501        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2502    }));
2503    let handle = environment.terminal_handle.clone().unwrap();
2504
2505    let mut events = thread
2506        .update(cx, |thread, cx| {
2507            thread.add_tool(crate::TerminalTool::new(
2508                thread.project().clone(),
2509                environment,
2510            ));
2511            thread.send(UserMessageId::new(), ["run a command"], cx)
2512        })
2513        .unwrap();
2514
2515    cx.run_until_parked();
2516
2517    // Simulate the model calling the terminal tool
2518    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2519        LanguageModelToolUse {
2520            id: "terminal_tool_1".into(),
2521            name: TerminalTool::NAME.into(),
2522            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2523            input: json!({"command": "sleep 1000", "cd": "."}),
2524            is_input_complete: true,
2525            thought_signature: None,
2526        },
2527    ));
2528    fake_model.end_last_completion_stream();
2529
2530    // Wait for the terminal tool to start running
2531    wait_for_terminal_tool_started(&mut events, cx).await;
2532
2533    // Simulate user clicking stop on the terminal card itself.
2534    // This sets the flag and signals exit (simulating what the real UI would do).
2535    handle.set_stopped_by_user(true);
2536    handle.killed.store(true, Ordering::SeqCst);
2537    handle.signal_exit();
2538
2539    // Wait for the tool to complete
2540    cx.run_until_parked();
2541
2542    // The thread continues after tool completion - simulate the model ending its turn
2543    fake_model
2544        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2545    fake_model.end_last_completion_stream();
2546
2547    // Collect remaining events
2548    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2549
2550    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2551    assert_eq!(
2552        stop_events(remaining_events),
2553        vec![acp::StopReason::EndTurn],
2554    );
2555
2556    // Verify the tool result indicates user stopped
2557    thread.update(cx, |thread, _cx| {
2558        let message = thread.last_received_or_pending_message().unwrap();
2559        let agent_message = message.as_agent_message().unwrap();
2560
2561        let tool_use = agent_message
2562            .content
2563            .iter()
2564            .find_map(|content| match content {
2565                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2566                _ => None,
2567            })
2568            .expect("expected tool use in agent message");
2569
2570        let tool_result = agent_message
2571            .tool_results
2572            .get(&tool_use.id)
2573            .expect("expected tool result");
2574
2575        let result_text = match &tool_result.content {
2576            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2577            _ => panic!("expected text content in tool result"),
2578        };
2579
2580        assert!(
2581            result_text.contains("The user stopped this command"),
2582            "expected tool result to indicate user stopped, got: {result_text}"
2583        );
2584    });
2585}
2586
2587#[gpui::test]
2588async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2589    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2590    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2591    always_allow_tools(cx);
2592    let fake_model = model.as_fake();
2593
2594    let environment = Rc::new(cx.update(|cx| {
2595        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2596    }));
2597    let handle = environment.terminal_handle.clone().unwrap();
2598
2599    let mut events = thread
2600        .update(cx, |thread, cx| {
2601            thread.add_tool(crate::TerminalTool::new(
2602                thread.project().clone(),
2603                environment,
2604            ));
2605            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2606        })
2607        .unwrap();
2608
2609    cx.run_until_parked();
2610
2611    // Simulate the model calling the terminal tool with a short timeout
2612    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2613        LanguageModelToolUse {
2614            id: "terminal_tool_1".into(),
2615            name: TerminalTool::NAME.into(),
2616            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2617            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2618            is_input_complete: true,
2619            thought_signature: None,
2620        },
2621    ));
2622    fake_model.end_last_completion_stream();
2623
2624    // Wait for the terminal tool to start running
2625    wait_for_terminal_tool_started(&mut events, cx).await;
2626
2627    // Advance clock past the timeout
2628    cx.executor().advance_clock(Duration::from_millis(200));
2629    cx.run_until_parked();
2630
2631    // The thread continues after tool completion - simulate the model ending its turn
2632    fake_model
2633        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2634    fake_model.end_last_completion_stream();
2635
2636    // Collect remaining events
2637    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2638
2639    // Verify the terminal was killed due to timeout
2640    assert!(
2641        handle.was_killed(),
2642        "expected terminal handle to be killed on timeout"
2643    );
2644
2645    // Verify we got an EndTurn (the tool completed, just with timeout)
2646    assert_eq!(
2647        stop_events(remaining_events),
2648        vec![acp::StopReason::EndTurn],
2649    );
2650
2651    // Verify the tool result indicates timeout, not user stopped
2652    thread.update(cx, |thread, _cx| {
2653        let message = thread.last_received_or_pending_message().unwrap();
2654        let agent_message = message.as_agent_message().unwrap();
2655
2656        let tool_use = agent_message
2657            .content
2658            .iter()
2659            .find_map(|content| match content {
2660                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2661                _ => None,
2662            })
2663            .expect("expected tool use in agent message");
2664
2665        let tool_result = agent_message
2666            .tool_results
2667            .get(&tool_use.id)
2668            .expect("expected tool result");
2669
2670        let result_text = match &tool_result.content {
2671            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2672            _ => panic!("expected text content in tool result"),
2673        };
2674
2675        assert!(
2676            result_text.contains("timed out"),
2677            "expected tool result to indicate timeout, got: {result_text}"
2678        );
2679        assert!(
2680            !result_text.contains("The user stopped"),
2681            "tool result should not mention user stopped when it timed out, got: {result_text}"
2682        );
2683    });
2684}
2685
2686#[gpui::test]
2687async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2688    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2689    let fake_model = model.as_fake();
2690
2691    let events_1 = thread
2692        .update(cx, |thread, cx| {
2693            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2694        })
2695        .unwrap();
2696    cx.run_until_parked();
2697    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2698    cx.run_until_parked();
2699
2700    let events_2 = thread
2701        .update(cx, |thread, cx| {
2702            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2703        })
2704        .unwrap();
2705    cx.run_until_parked();
2706    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2707    fake_model
2708        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2709    fake_model.end_last_completion_stream();
2710
2711    let events_1 = events_1.collect::<Vec<_>>().await;
2712    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2713    let events_2 = events_2.collect::<Vec<_>>().await;
2714    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2715}
2716
2717#[gpui::test]
2718async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2719    // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2720    // the retry loop waits on a timer. If the user switches models and sends a new message
2721    // during that delay, the old turn should exit immediately instead of retrying with the
2722    // stale model.
2723    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2724    let model_a = model.as_fake();
2725
2726    // Start a turn with model_a.
2727    let events_1 = thread
2728        .update(cx, |thread, cx| {
2729            thread.send(UserMessageId::new(), ["Hello"], cx)
2730        })
2731        .unwrap();
2732    cx.run_until_parked();
2733    assert_eq!(model_a.completion_count(), 1);
2734
2735    // Model returns a retryable upstream 500. The turn enters the retry delay.
2736    model_a.send_last_completion_stream_error(
2737        LanguageModelCompletionError::UpstreamProviderError {
2738            message: "Internal server error".to_string(),
2739            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2740            retry_after: None,
2741        },
2742    );
2743    model_a.end_last_completion_stream();
2744    cx.run_until_parked();
2745
2746    // The old completion was consumed; model_a has no pending requests yet because the
2747    // retry timer hasn't fired.
2748    assert_eq!(model_a.completion_count(), 0);
2749
2750    // Switch to model_b and send a new message. This cancels the old turn.
2751    let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2752        "fake", "model-b", "Model B", false,
2753    ));
2754    thread.update(cx, |thread, cx| {
2755        thread.set_model(model_b.clone(), cx);
2756    });
2757    let events_2 = thread
2758        .update(cx, |thread, cx| {
2759            thread.send(UserMessageId::new(), ["Continue"], cx)
2760        })
2761        .unwrap();
2762    cx.run_until_parked();
2763
2764    // model_b should have received its completion request.
2765    assert_eq!(model_b.as_fake().completion_count(), 1);
2766
2767    // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2768    cx.executor().advance_clock(Duration::from_secs(10));
2769    cx.run_until_parked();
2770
2771    // model_a must NOT have received another completion request — the cancelled turn
2772    // should have exited during the retry delay rather than retrying with the old model.
2773    assert_eq!(
2774        model_a.completion_count(),
2775        0,
2776        "old model should not receive a retry request after cancellation"
2777    );
2778
2779    // Complete model_b's turn.
2780    model_b
2781        .as_fake()
2782        .send_last_completion_stream_text_chunk("Done!");
2783    model_b
2784        .as_fake()
2785        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2786    model_b.as_fake().end_last_completion_stream();
2787
2788    let events_1 = events_1.collect::<Vec<_>>().await;
2789    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2790
2791    let events_2 = events_2.collect::<Vec<_>>().await;
2792    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2793}
2794
2795#[gpui::test]
2796async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2797    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2798    let fake_model = model.as_fake();
2799
2800    let events_1 = thread
2801        .update(cx, |thread, cx| {
2802            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2803        })
2804        .unwrap();
2805    cx.run_until_parked();
2806    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2807    fake_model
2808        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2809    fake_model.end_last_completion_stream();
2810    let events_1 = events_1.collect::<Vec<_>>().await;
2811
2812    let events_2 = thread
2813        .update(cx, |thread, cx| {
2814            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2815        })
2816        .unwrap();
2817    cx.run_until_parked();
2818    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2819    fake_model
2820        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2821    fake_model.end_last_completion_stream();
2822    let events_2 = events_2.collect::<Vec<_>>().await;
2823
2824    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2825    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2826}
2827
2828#[gpui::test]
2829async fn test_refusal(cx: &mut TestAppContext) {
2830    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2831    let fake_model = model.as_fake();
2832
2833    let events = thread
2834        .update(cx, |thread, cx| {
2835            thread.send(UserMessageId::new(), ["Hello"], cx)
2836        })
2837        .unwrap();
2838    cx.run_until_parked();
2839    thread.read_with(cx, |thread, _| {
2840        assert_eq!(
2841            thread.to_markdown(),
2842            indoc! {"
2843                ## User
2844
2845                Hello
2846            "}
2847        );
2848    });
2849
2850    fake_model.send_last_completion_stream_text_chunk("Hey!");
2851    cx.run_until_parked();
2852    thread.read_with(cx, |thread, _| {
2853        assert_eq!(
2854            thread.to_markdown(),
2855            indoc! {"
2856                ## User
2857
2858                Hello
2859
2860                ## Assistant
2861
2862                Hey!
2863            "}
2864        );
2865    });
2866
2867    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2868    fake_model
2869        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2870    let events = events.collect::<Vec<_>>().await;
2871    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2872    thread.read_with(cx, |thread, _| {
2873        assert_eq!(thread.to_markdown(), "");
2874    });
2875}
2876
2877#[gpui::test]
2878async fn test_truncate_first_message(cx: &mut TestAppContext) {
2879    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2880    let fake_model = model.as_fake();
2881
2882    let message_id = UserMessageId::new();
2883    thread
2884        .update(cx, |thread, cx| {
2885            thread.send(message_id.clone(), ["Hello"], cx)
2886        })
2887        .unwrap();
2888    cx.run_until_parked();
2889    thread.read_with(cx, |thread, _| {
2890        assert_eq!(
2891            thread.to_markdown(),
2892            indoc! {"
2893                ## User
2894
2895                Hello
2896            "}
2897        );
2898        assert_eq!(thread.latest_token_usage(), None);
2899    });
2900
2901    fake_model.send_last_completion_stream_text_chunk("Hey!");
2902    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2903        language_model::TokenUsage {
2904            input_tokens: 32_000,
2905            output_tokens: 16_000,
2906            cache_creation_input_tokens: 0,
2907            cache_read_input_tokens: 0,
2908        },
2909    ));
2910    cx.run_until_parked();
2911    thread.read_with(cx, |thread, _| {
2912        assert_eq!(
2913            thread.to_markdown(),
2914            indoc! {"
2915                ## User
2916
2917                Hello
2918
2919                ## Assistant
2920
2921                Hey!
2922            "}
2923        );
2924        assert_eq!(
2925            thread.latest_token_usage(),
2926            Some(acp_thread::TokenUsage {
2927                used_tokens: 32_000 + 16_000,
2928                max_tokens: 1_000_000,
2929                max_output_tokens: None,
2930                input_tokens: 32_000,
2931                output_tokens: 16_000,
2932            })
2933        );
2934    });
2935
2936    thread
2937        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2938        .unwrap();
2939    cx.run_until_parked();
2940    thread.read_with(cx, |thread, _| {
2941        assert_eq!(thread.to_markdown(), "");
2942        assert_eq!(thread.latest_token_usage(), None);
2943    });
2944
2945    // Ensure we can still send a new message after truncation.
2946    thread
2947        .update(cx, |thread, cx| {
2948            thread.send(UserMessageId::new(), ["Hi"], cx)
2949        })
2950        .unwrap();
2951    thread.update(cx, |thread, _cx| {
2952        assert_eq!(
2953            thread.to_markdown(),
2954            indoc! {"
2955                ## User
2956
2957                Hi
2958            "}
2959        );
2960    });
2961    cx.run_until_parked();
2962    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2963    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2964        language_model::TokenUsage {
2965            input_tokens: 40_000,
2966            output_tokens: 20_000,
2967            cache_creation_input_tokens: 0,
2968            cache_read_input_tokens: 0,
2969        },
2970    ));
2971    cx.run_until_parked();
2972    thread.read_with(cx, |thread, _| {
2973        assert_eq!(
2974            thread.to_markdown(),
2975            indoc! {"
2976                ## User
2977
2978                Hi
2979
2980                ## Assistant
2981
2982                Ahoy!
2983            "}
2984        );
2985
2986        assert_eq!(
2987            thread.latest_token_usage(),
2988            Some(acp_thread::TokenUsage {
2989                used_tokens: 40_000 + 20_000,
2990                max_tokens: 1_000_000,
2991                max_output_tokens: None,
2992                input_tokens: 40_000,
2993                output_tokens: 20_000,
2994            })
2995        );
2996    });
2997}
2998
2999#[gpui::test]
3000async fn test_truncate_second_message(cx: &mut TestAppContext) {
3001    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3002    let fake_model = model.as_fake();
3003
3004    thread
3005        .update(cx, |thread, cx| {
3006            thread.send(UserMessageId::new(), ["Message 1"], cx)
3007        })
3008        .unwrap();
3009    cx.run_until_parked();
3010    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3011    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3012        language_model::TokenUsage {
3013            input_tokens: 32_000,
3014            output_tokens: 16_000,
3015            cache_creation_input_tokens: 0,
3016            cache_read_input_tokens: 0,
3017        },
3018    ));
3019    fake_model.end_last_completion_stream();
3020    cx.run_until_parked();
3021
3022    let assert_first_message_state = |cx: &mut TestAppContext| {
3023        thread.clone().read_with(cx, |thread, _| {
3024            assert_eq!(
3025                thread.to_markdown(),
3026                indoc! {"
3027                    ## User
3028
3029                    Message 1
3030
3031                    ## Assistant
3032
3033                    Message 1 response
3034                "}
3035            );
3036
3037            assert_eq!(
3038                thread.latest_token_usage(),
3039                Some(acp_thread::TokenUsage {
3040                    used_tokens: 32_000 + 16_000,
3041                    max_tokens: 1_000_000,
3042                    max_output_tokens: None,
3043                    input_tokens: 32_000,
3044                    output_tokens: 16_000,
3045                })
3046            );
3047        });
3048    };
3049
3050    assert_first_message_state(cx);
3051
3052    let second_message_id = UserMessageId::new();
3053    thread
3054        .update(cx, |thread, cx| {
3055            thread.send(second_message_id.clone(), ["Message 2"], cx)
3056        })
3057        .unwrap();
3058    cx.run_until_parked();
3059
3060    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3061    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3062        language_model::TokenUsage {
3063            input_tokens: 40_000,
3064            output_tokens: 20_000,
3065            cache_creation_input_tokens: 0,
3066            cache_read_input_tokens: 0,
3067        },
3068    ));
3069    fake_model.end_last_completion_stream();
3070    cx.run_until_parked();
3071
3072    thread.read_with(cx, |thread, _| {
3073        assert_eq!(
3074            thread.to_markdown(),
3075            indoc! {"
3076                ## User
3077
3078                Message 1
3079
3080                ## Assistant
3081
3082                Message 1 response
3083
3084                ## User
3085
3086                Message 2
3087
3088                ## Assistant
3089
3090                Message 2 response
3091            "}
3092        );
3093
3094        assert_eq!(
3095            thread.latest_token_usage(),
3096            Some(acp_thread::TokenUsage {
3097                used_tokens: 40_000 + 20_000,
3098                max_tokens: 1_000_000,
3099                max_output_tokens: None,
3100                input_tokens: 40_000,
3101                output_tokens: 20_000,
3102            })
3103        );
3104    });
3105
3106    thread
3107        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3108        .unwrap();
3109    cx.run_until_parked();
3110
3111    assert_first_message_state(cx);
3112}
3113
3114#[gpui::test]
3115async fn test_title_generation(cx: &mut TestAppContext) {
3116    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3117    let fake_model = model.as_fake();
3118
3119    let summary_model = Arc::new(FakeLanguageModel::default());
3120    thread.update(cx, |thread, cx| {
3121        thread.set_summarization_model(Some(summary_model.clone()), cx)
3122    });
3123
3124    let send = thread
3125        .update(cx, |thread, cx| {
3126            thread.send(UserMessageId::new(), ["Hello"], cx)
3127        })
3128        .unwrap();
3129    cx.run_until_parked();
3130
3131    fake_model.send_last_completion_stream_text_chunk("Hey!");
3132    fake_model.end_last_completion_stream();
3133    cx.run_until_parked();
3134    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), None));
3135
3136    // Ensure the summary model has been invoked to generate a title.
3137    summary_model.send_last_completion_stream_text_chunk("Hello ");
3138    summary_model.send_last_completion_stream_text_chunk("world\nG");
3139    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3140    summary_model.end_last_completion_stream();
3141    send.collect::<Vec<_>>().await;
3142    cx.run_until_parked();
3143    thread.read_with(cx, |thread, _| {
3144        assert_eq!(thread.title(), Some("Hello world".into()))
3145    });
3146
3147    // Send another message, ensuring no title is generated this time.
3148    let send = thread
3149        .update(cx, |thread, cx| {
3150            thread.send(UserMessageId::new(), ["Hello again"], cx)
3151        })
3152        .unwrap();
3153    cx.run_until_parked();
3154    fake_model.send_last_completion_stream_text_chunk("Hey again!");
3155    fake_model.end_last_completion_stream();
3156    cx.run_until_parked();
3157    assert_eq!(summary_model.pending_completions(), Vec::new());
3158    send.collect::<Vec<_>>().await;
3159    thread.read_with(cx, |thread, _| {
3160        assert_eq!(thread.title(), Some("Hello world".into()))
3161    });
3162}
3163
3164#[gpui::test]
3165async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3166    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3167    let fake_model = model.as_fake();
3168
3169    let _events = thread
3170        .update(cx, |thread, cx| {
3171            thread.add_tool(ToolRequiringPermission);
3172            thread.add_tool(EchoTool);
3173            thread.send(UserMessageId::new(), ["Hey!"], cx)
3174        })
3175        .unwrap();
3176    cx.run_until_parked();
3177
3178    let permission_tool_use = LanguageModelToolUse {
3179        id: "tool_id_1".into(),
3180        name: ToolRequiringPermission::NAME.into(),
3181        raw_input: "{}".into(),
3182        input: json!({}),
3183        is_input_complete: true,
3184        thought_signature: None,
3185    };
3186    let echo_tool_use = LanguageModelToolUse {
3187        id: "tool_id_2".into(),
3188        name: EchoTool::NAME.into(),
3189        raw_input: json!({"text": "test"}).to_string(),
3190        input: json!({"text": "test"}),
3191        is_input_complete: true,
3192        thought_signature: None,
3193    };
3194    fake_model.send_last_completion_stream_text_chunk("Hi!");
3195    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3196        permission_tool_use,
3197    ));
3198    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3199        echo_tool_use.clone(),
3200    ));
3201    fake_model.end_last_completion_stream();
3202    cx.run_until_parked();
3203
3204    // Ensure pending tools are skipped when building a request.
3205    let request = thread
3206        .read_with(cx, |thread, cx| {
3207            thread.build_completion_request(CompletionIntent::EditFile, cx)
3208        })
3209        .unwrap();
3210    assert_eq!(
3211        request.messages[1..],
3212        vec![
3213            LanguageModelRequestMessage {
3214                role: Role::User,
3215                content: vec!["Hey!".into()],
3216                cache: true,
3217                reasoning_details: None,
3218            },
3219            LanguageModelRequestMessage {
3220                role: Role::Assistant,
3221                content: vec![
3222                    MessageContent::Text("Hi!".into()),
3223                    MessageContent::ToolUse(echo_tool_use.clone())
3224                ],
3225                cache: false,
3226                reasoning_details: None,
3227            },
3228            LanguageModelRequestMessage {
3229                role: Role::User,
3230                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3231                    tool_use_id: echo_tool_use.id.clone(),
3232                    tool_name: echo_tool_use.name,
3233                    is_error: false,
3234                    content: "test".into(),
3235                    output: Some("test".into())
3236                })],
3237                cache: false,
3238                reasoning_details: None,
3239            },
3240        ],
3241    );
3242}
3243
3244#[gpui::test]
3245async fn test_agent_connection(cx: &mut TestAppContext) {
3246    cx.update(settings::init);
3247    let templates = Templates::new();
3248
3249    // Initialize language model system with test provider
3250    cx.update(|cx| {
3251        gpui_tokio::init(cx);
3252
3253        let http_client = FakeHttpClient::with_404_response();
3254        let clock = Arc::new(clock::FakeSystemClock::new());
3255        let client = Client::new(clock, http_client, cx);
3256        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3257        language_model::init(user_store.clone(), client.clone(), cx);
3258        language_models::init(user_store, client.clone(), cx);
3259        LanguageModelRegistry::test(cx);
3260    });
3261    cx.executor().forbid_parking();
3262
3263    // Create a project for new_thread
3264    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3265    fake_fs.insert_tree(path!("/test"), json!({})).await;
3266    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3267    let cwd = PathList::new(&[Path::new("/test")]);
3268    let thread_store = cx.new(|cx| ThreadStore::new(cx));
3269
3270    // Create agent and connection
3271    let agent = cx
3272        .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3273    let connection = NativeAgentConnection(agent.clone());
3274
3275    // Create a thread using new_thread
3276    let connection_rc = Rc::new(connection.clone());
3277    let acp_thread = cx
3278        .update(|cx| connection_rc.new_session(project, cwd, cx))
3279        .await
3280        .expect("new_thread should succeed");
3281
3282    // Get the session_id from the AcpThread
3283    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3284
3285    // Test model_selector returns Some
3286    let selector_opt = connection.model_selector(&session_id);
3287    assert!(
3288        selector_opt.is_some(),
3289        "agent should always support ModelSelector"
3290    );
3291    let selector = selector_opt.unwrap();
3292
3293    // Test list_models
3294    let listed_models = cx
3295        .update(|cx| selector.list_models(cx))
3296        .await
3297        .expect("list_models should succeed");
3298    let AgentModelList::Grouped(listed_models) = listed_models else {
3299        panic!("Unexpected model list type");
3300    };
3301    assert!(!listed_models.is_empty(), "should have at least one model");
3302    assert_eq!(
3303        listed_models[&AgentModelGroupName("Fake".into())][0]
3304            .id
3305            .0
3306            .as_ref(),
3307        "fake/fake"
3308    );
3309
3310    // Test selected_model returns the default
3311    let model = cx
3312        .update(|cx| selector.selected_model(cx))
3313        .await
3314        .expect("selected_model should succeed");
3315    let model = cx
3316        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3317        .unwrap();
3318    let model = model.as_fake();
3319    assert_eq!(model.id().0, "fake", "should return default model");
3320
3321    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3322    cx.run_until_parked();
3323    model.send_last_completion_stream_text_chunk("def");
3324    cx.run_until_parked();
3325    acp_thread.read_with(cx, |thread, cx| {
3326        assert_eq!(
3327            thread.to_markdown(cx),
3328            indoc! {"
3329                ## User
3330
3331                abc
3332
3333                ## Assistant
3334
3335                def
3336
3337            "}
3338        )
3339    });
3340
3341    // Test cancel
3342    cx.update(|cx| connection.cancel(&session_id, cx));
3343    request.await.expect("prompt should fail gracefully");
3344
3345    // Explicitly close the session and drop the ACP thread.
3346    cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3347        .await
3348        .unwrap();
3349    drop(acp_thread);
3350    let result = cx
3351        .update(|cx| {
3352            connection.prompt(
3353                Some(acp_thread::UserMessageId::new()),
3354                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3355                cx,
3356            )
3357        })
3358        .await;
3359    assert_eq!(
3360        result.as_ref().unwrap_err().to_string(),
3361        "Session not found",
3362        "unexpected result: {:?}",
3363        result
3364    );
3365}
3366
3367#[gpui::test]
3368async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3369    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3370    thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3371    let fake_model = model.as_fake();
3372
3373    let mut events = thread
3374        .update(cx, |thread, cx| {
3375            thread.send(UserMessageId::new(), ["Echo something"], cx)
3376        })
3377        .unwrap();
3378    cx.run_until_parked();
3379
3380    // Simulate streaming partial input.
3381    let input = json!({});
3382    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3383        LanguageModelToolUse {
3384            id: "1".into(),
3385            name: EchoTool::NAME.into(),
3386            raw_input: input.to_string(),
3387            input,
3388            is_input_complete: false,
3389            thought_signature: None,
3390        },
3391    ));
3392
3393    // Input streaming completed
3394    let input = json!({ "text": "Hello!" });
3395    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3396        LanguageModelToolUse {
3397            id: "1".into(),
3398            name: "echo".into(),
3399            raw_input: input.to_string(),
3400            input,
3401            is_input_complete: true,
3402            thought_signature: None,
3403        },
3404    ));
3405    fake_model.end_last_completion_stream();
3406    cx.run_until_parked();
3407
3408    let tool_call = expect_tool_call(&mut events).await;
3409    assert_eq!(
3410        tool_call,
3411        acp::ToolCall::new("1", "Echo")
3412            .raw_input(json!({}))
3413            .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3414    );
3415    let update = expect_tool_call_update_fields(&mut events).await;
3416    assert_eq!(
3417        update,
3418        acp::ToolCallUpdate::new(
3419            "1",
3420            acp::ToolCallUpdateFields::new()
3421                .title("Echo")
3422                .kind(acp::ToolKind::Other)
3423                .raw_input(json!({ "text": "Hello!"}))
3424        )
3425    );
3426    let update = expect_tool_call_update_fields(&mut events).await;
3427    assert_eq!(
3428        update,
3429        acp::ToolCallUpdate::new(
3430            "1",
3431            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3432        )
3433    );
3434    let update = expect_tool_call_update_fields(&mut events).await;
3435    assert_eq!(
3436        update,
3437        acp::ToolCallUpdate::new(
3438            "1",
3439            acp::ToolCallUpdateFields::new()
3440                .status(acp::ToolCallStatus::Completed)
3441                .raw_output("Hello!")
3442        )
3443    );
3444}
3445
3446#[gpui::test]
3447async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3448    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3449    thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3450    let fake_model = model.as_fake();
3451
3452    let mut events = thread
3453        .update(cx, |thread, cx| {
3454            thread.send(UserMessageId::new(), ["Make a plan"], cx)
3455        })
3456        .unwrap();
3457    cx.run_until_parked();
3458
3459    let input = json!({
3460        "plan": [
3461            {
3462                "step": "Inspect the code",
3463                "status": "completed",
3464                "priority": "high"
3465            },
3466            {
3467                "step": "Implement the tool",
3468                "status": "in_progress"
3469            },
3470            {
3471                "step": "Run tests",
3472                "status": "pending",
3473                "priority": "low"
3474            }
3475        ]
3476    });
3477    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3478        LanguageModelToolUse {
3479            id: "plan_1".into(),
3480            name: UpdatePlanTool::NAME.into(),
3481            raw_input: input.to_string(),
3482            input,
3483            is_input_complete: true,
3484            thought_signature: None,
3485        },
3486    ));
3487    fake_model.end_last_completion_stream();
3488    cx.run_until_parked();
3489
3490    let tool_call = expect_tool_call(&mut events).await;
3491    assert_eq!(
3492        tool_call,
3493        acp::ToolCall::new("plan_1", "Update plan")
3494            .kind(acp::ToolKind::Think)
3495            .raw_input(json!({
3496                "plan": [
3497                    {
3498                        "step": "Inspect the code",
3499                        "status": "completed",
3500                        "priority": "high"
3501                    },
3502                    {
3503                        "step": "Implement the tool",
3504                        "status": "in_progress"
3505                    },
3506                    {
3507                        "step": "Run tests",
3508                        "status": "pending",
3509                        "priority": "low"
3510                    }
3511                ]
3512            }))
3513            .meta(acp::Meta::from_iter([(
3514                "tool_name".into(),
3515                "update_plan".into()
3516            )]))
3517    );
3518
3519    let update = expect_tool_call_update_fields(&mut events).await;
3520    assert_eq!(
3521        update,
3522        acp::ToolCallUpdate::new(
3523            "plan_1",
3524            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3525        )
3526    );
3527
3528    let plan = expect_plan(&mut events).await;
3529    assert_eq!(
3530        plan,
3531        acp::Plan::new(vec![
3532            acp::PlanEntry::new(
3533                "Inspect the code",
3534                acp::PlanEntryPriority::High,
3535                acp::PlanEntryStatus::Completed,
3536            ),
3537            acp::PlanEntry::new(
3538                "Implement the tool",
3539                acp::PlanEntryPriority::Medium,
3540                acp::PlanEntryStatus::InProgress,
3541            ),
3542            acp::PlanEntry::new(
3543                "Run tests",
3544                acp::PlanEntryPriority::Low,
3545                acp::PlanEntryStatus::Pending,
3546            ),
3547        ])
3548    );
3549
3550    let update = expect_tool_call_update_fields(&mut events).await;
3551    assert_eq!(
3552        update,
3553        acp::ToolCallUpdate::new(
3554            "plan_1",
3555            acp::ToolCallUpdateFields::new()
3556                .status(acp::ToolCallStatus::Completed)
3557                .raw_output("Plan updated")
3558        )
3559    );
3560}
3561
3562#[gpui::test]
3563async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3564    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3565    let fake_model = model.as_fake();
3566
3567    let mut events = thread
3568        .update(cx, |thread, cx| {
3569            thread.send(UserMessageId::new(), ["Hello!"], cx)
3570        })
3571        .unwrap();
3572    cx.run_until_parked();
3573
3574    fake_model.send_last_completion_stream_text_chunk("Hey!");
3575    fake_model.end_last_completion_stream();
3576
3577    let mut retry_events = Vec::new();
3578    while let Some(Ok(event)) = events.next().await {
3579        match event {
3580            ThreadEvent::Retry(retry_status) => {
3581                retry_events.push(retry_status);
3582            }
3583            ThreadEvent::Stop(..) => break,
3584            _ => {}
3585        }
3586    }
3587
3588    assert_eq!(retry_events.len(), 0);
3589    thread.read_with(cx, |thread, _cx| {
3590        assert_eq!(
3591            thread.to_markdown(),
3592            indoc! {"
3593                ## User
3594
3595                Hello!
3596
3597                ## Assistant
3598
3599                Hey!
3600            "}
3601        )
3602    });
3603}
3604
3605#[gpui::test]
3606async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3607    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3608    let fake_model = model.as_fake();
3609
3610    let mut events = thread
3611        .update(cx, |thread, cx| {
3612            thread.send(UserMessageId::new(), ["Hello!"], cx)
3613        })
3614        .unwrap();
3615    cx.run_until_parked();
3616
3617    fake_model.send_last_completion_stream_text_chunk("Hey,");
3618    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3619        provider: LanguageModelProviderName::new("Anthropic"),
3620        retry_after: Some(Duration::from_secs(3)),
3621    });
3622    fake_model.end_last_completion_stream();
3623
3624    cx.executor().advance_clock(Duration::from_secs(3));
3625    cx.run_until_parked();
3626
3627    fake_model.send_last_completion_stream_text_chunk("there!");
3628    fake_model.end_last_completion_stream();
3629    cx.run_until_parked();
3630
3631    let mut retry_events = Vec::new();
3632    while let Some(Ok(event)) = events.next().await {
3633        match event {
3634            ThreadEvent::Retry(retry_status) => {
3635                retry_events.push(retry_status);
3636            }
3637            ThreadEvent::Stop(..) => break,
3638            _ => {}
3639        }
3640    }
3641
3642    assert_eq!(retry_events.len(), 1);
3643    assert!(matches!(
3644        retry_events[0],
3645        acp_thread::RetryStatus { attempt: 1, .. }
3646    ));
3647    thread.read_with(cx, |thread, _cx| {
3648        assert_eq!(
3649            thread.to_markdown(),
3650            indoc! {"
3651                ## User
3652
3653                Hello!
3654
3655                ## Assistant
3656
3657                Hey,
3658
3659                [resume]
3660
3661                ## Assistant
3662
3663                there!
3664            "}
3665        )
3666    });
3667}
3668
3669#[gpui::test]
3670async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3671    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3672    let fake_model = model.as_fake();
3673
3674    let events = thread
3675        .update(cx, |thread, cx| {
3676            thread.add_tool(EchoTool);
3677            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3678        })
3679        .unwrap();
3680    cx.run_until_parked();
3681
3682    let tool_use_1 = LanguageModelToolUse {
3683        id: "tool_1".into(),
3684        name: EchoTool::NAME.into(),
3685        raw_input: json!({"text": "test"}).to_string(),
3686        input: json!({"text": "test"}),
3687        is_input_complete: true,
3688        thought_signature: None,
3689    };
3690    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3691        tool_use_1.clone(),
3692    ));
3693    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3694        provider: LanguageModelProviderName::new("Anthropic"),
3695        retry_after: Some(Duration::from_secs(3)),
3696    });
3697    fake_model.end_last_completion_stream();
3698
3699    cx.executor().advance_clock(Duration::from_secs(3));
3700    let completion = fake_model.pending_completions().pop().unwrap();
3701    assert_eq!(
3702        completion.messages[1..],
3703        vec![
3704            LanguageModelRequestMessage {
3705                role: Role::User,
3706                content: vec!["Call the echo tool!".into()],
3707                cache: false,
3708                reasoning_details: None,
3709            },
3710            LanguageModelRequestMessage {
3711                role: Role::Assistant,
3712                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3713                cache: false,
3714                reasoning_details: None,
3715            },
3716            LanguageModelRequestMessage {
3717                role: Role::User,
3718                content: vec![language_model::MessageContent::ToolResult(
3719                    LanguageModelToolResult {
3720                        tool_use_id: tool_use_1.id.clone(),
3721                        tool_name: tool_use_1.name.clone(),
3722                        is_error: false,
3723                        content: "test".into(),
3724                        output: Some("test".into())
3725                    }
3726                )],
3727                cache: true,
3728                reasoning_details: None,
3729            },
3730        ]
3731    );
3732
3733    fake_model.send_last_completion_stream_text_chunk("Done");
3734    fake_model.end_last_completion_stream();
3735    cx.run_until_parked();
3736    events.collect::<Vec<_>>().await;
3737    thread.read_with(cx, |thread, _cx| {
3738        assert_eq!(
3739            thread.last_received_or_pending_message(),
3740            Some(Message::Agent(AgentMessage {
3741                content: vec![AgentMessageContent::Text("Done".into())],
3742                tool_results: IndexMap::default(),
3743                reasoning_details: None,
3744            }))
3745        );
3746    })
3747}
3748
3749#[gpui::test]
3750async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3751    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3752    let fake_model = model.as_fake();
3753
3754    let mut events = thread
3755        .update(cx, |thread, cx| {
3756            thread.send(UserMessageId::new(), ["Hello!"], cx)
3757        })
3758        .unwrap();
3759    cx.run_until_parked();
3760
3761    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3762        fake_model.send_last_completion_stream_error(
3763            LanguageModelCompletionError::ServerOverloaded {
3764                provider: LanguageModelProviderName::new("Anthropic"),
3765                retry_after: Some(Duration::from_secs(3)),
3766            },
3767        );
3768        fake_model.end_last_completion_stream();
3769        cx.executor().advance_clock(Duration::from_secs(3));
3770        cx.run_until_parked();
3771    }
3772
3773    let mut errors = Vec::new();
3774    let mut retry_events = Vec::new();
3775    while let Some(event) = events.next().await {
3776        match event {
3777            Ok(ThreadEvent::Retry(retry_status)) => {
3778                retry_events.push(retry_status);
3779            }
3780            Ok(ThreadEvent::Stop(..)) => break,
3781            Err(error) => errors.push(error),
3782            _ => {}
3783        }
3784    }
3785
3786    assert_eq!(
3787        retry_events.len(),
3788        crate::thread::MAX_RETRY_ATTEMPTS as usize
3789    );
3790    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3791        assert_eq!(retry_events[i].attempt, i + 1);
3792    }
3793    assert_eq!(errors.len(), 1);
3794    let error = errors[0]
3795        .downcast_ref::<LanguageModelCompletionError>()
3796        .unwrap();
3797    assert!(matches!(
3798        error,
3799        LanguageModelCompletionError::ServerOverloaded { .. }
3800    ));
3801}
3802
3803#[gpui::test]
3804async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3805    cx: &mut TestAppContext,
3806) {
3807    init_test(cx);
3808    always_allow_tools(cx);
3809
3810    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3811    let fake_model = model.as_fake();
3812
3813    thread.update(cx, |thread, _cx| {
3814        thread.add_tool(StreamingEchoTool::new());
3815    });
3816
3817    let _events = thread
3818        .update(cx, |thread, cx| {
3819            thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3820        })
3821        .unwrap();
3822    cx.run_until_parked();
3823
3824    // Send a partial tool use (is_input_complete = false), simulating the LLM
3825    // streaming input for a tool.
3826    let tool_use = LanguageModelToolUse {
3827        id: "tool_1".into(),
3828        name: "streaming_echo".into(),
3829        raw_input: r#"{"text": "partial"}"#.into(),
3830        input: json!({"text": "partial"}),
3831        is_input_complete: false,
3832        thought_signature: None,
3833    };
3834    fake_model
3835        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3836    cx.run_until_parked();
3837
3838    // Send a stream error WITHOUT ever sending is_input_complete = true.
3839    // Before the fix, this would deadlock: the tool waits for more partials
3840    // (or cancellation), run_turn_internal waits for the tool, and the sender
3841    // keeping the channel open lives inside RunningTurn.
3842    fake_model.send_last_completion_stream_error(
3843        LanguageModelCompletionError::UpstreamProviderError {
3844            message: "Internal server error".to_string(),
3845            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3846            retry_after: None,
3847        },
3848    );
3849    fake_model.end_last_completion_stream();
3850
3851    // Advance past the retry delay so run_turn_internal retries.
3852    cx.executor().advance_clock(Duration::from_secs(5));
3853    cx.run_until_parked();
3854
3855    // The retry request should contain the streaming tool's error result,
3856    // proving the tool terminated and its result was forwarded.
3857    let completion = fake_model
3858        .pending_completions()
3859        .pop()
3860        .expect("No running turn");
3861    assert_eq!(
3862        completion.messages[1..],
3863        vec![
3864            LanguageModelRequestMessage {
3865                role: Role::User,
3866                content: vec!["Use the streaming_echo tool".into()],
3867                cache: false,
3868                reasoning_details: None,
3869            },
3870            LanguageModelRequestMessage {
3871                role: Role::Assistant,
3872                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3873                cache: false,
3874                reasoning_details: None,
3875            },
3876            LanguageModelRequestMessage {
3877                role: Role::User,
3878                content: vec![language_model::MessageContent::ToolResult(
3879                    LanguageModelToolResult {
3880                        tool_use_id: tool_use.id.clone(),
3881                        tool_name: tool_use.name,
3882                        is_error: true,
3883                        content: "Failed to receive tool input: tool input was not fully received"
3884                            .into(),
3885                        output: Some(
3886                            "Failed to receive tool input: tool input was not fully received"
3887                                .into()
3888                        ),
3889                    }
3890                )],
3891                cache: true,
3892                reasoning_details: None,
3893            },
3894        ]
3895    );
3896
3897    // Finish the retry round so the turn completes cleanly.
3898    fake_model.send_last_completion_stream_text_chunk("Done");
3899    fake_model.end_last_completion_stream();
3900    cx.run_until_parked();
3901
3902    thread.read_with(cx, |thread, _cx| {
3903        assert!(
3904            thread.is_turn_complete(),
3905            "Thread should not be stuck; the turn should have completed",
3906        );
3907    });
3908}
3909
3910/// Filters out the stop events for asserting against in tests
3911fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3912    result_events
3913        .into_iter()
3914        .filter_map(|event| match event.unwrap() {
3915            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3916            _ => None,
3917        })
3918        .collect()
3919}
3920
3921struct ThreadTest {
3922    model: Arc<dyn LanguageModel>,
3923    thread: Entity<Thread>,
3924    project_context: Entity<ProjectContext>,
3925    context_server_store: Entity<ContextServerStore>,
3926    fs: Arc<FakeFs>,
3927}
3928
3929enum TestModel {
3930    Sonnet4,
3931    Fake,
3932}
3933
3934impl TestModel {
3935    fn id(&self) -> LanguageModelId {
3936        match self {
3937            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3938            TestModel::Fake => unreachable!(),
3939        }
3940    }
3941}
3942
3943async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3944    cx.executor().allow_parking();
3945
3946    let fs = FakeFs::new(cx.background_executor.clone());
3947    fs.create_dir(paths::settings_file().parent().unwrap())
3948        .await
3949        .unwrap();
3950    fs.insert_file(
3951        paths::settings_file(),
3952        json!({
3953            "agent": {
3954                "default_profile": "test-profile",
3955                "profiles": {
3956                    "test-profile": {
3957                        "name": "Test Profile",
3958                        "tools": {
3959                            EchoTool::NAME: true,
3960                            DelayTool::NAME: true,
3961                            WordListTool::NAME: true,
3962                            ToolRequiringPermission::NAME: true,
3963                            InfiniteTool::NAME: true,
3964                            CancellationAwareTool::NAME: true,
3965                            StreamingEchoTool::NAME: true,
3966                            StreamingFailingEchoTool::NAME: true,
3967                            TerminalTool::NAME: true,
3968                            UpdatePlanTool::NAME: true,
3969                        }
3970                    }
3971                }
3972            }
3973        })
3974        .to_string()
3975        .into_bytes(),
3976    )
3977    .await;
3978
3979    cx.update(|cx| {
3980        settings::init(cx);
3981
3982        match model {
3983            TestModel::Fake => {}
3984            TestModel::Sonnet4 => {
3985                gpui_tokio::init(cx);
3986                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3987                cx.set_http_client(Arc::new(http_client));
3988                let client = Client::production(cx);
3989                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3990                language_model::init(user_store.clone(), client.clone(), cx);
3991                language_models::init(user_store, client.clone(), cx);
3992            }
3993        };
3994
3995        watch_settings(fs.clone(), cx);
3996    });
3997
3998    let templates = Templates::new();
3999
4000    fs.insert_tree(path!("/test"), json!({})).await;
4001    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
4002
4003    let model = cx
4004        .update(|cx| {
4005            if let TestModel::Fake = model {
4006                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
4007            } else {
4008                let model_id = model.id();
4009                let models = LanguageModelRegistry::read_global(cx);
4010                let model = models
4011                    .available_models(cx)
4012                    .find(|model| model.id() == model_id)
4013                    .unwrap();
4014
4015                let provider = models.provider(&model.provider_id()).unwrap();
4016                let authenticated = provider.authenticate(cx);
4017
4018                cx.spawn(async move |_cx| {
4019                    authenticated.await.unwrap();
4020                    model
4021                })
4022            }
4023        })
4024        .await;
4025
4026    let project_context = cx.new(|_cx| ProjectContext::default());
4027    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4028    let context_server_registry =
4029        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4030    let thread = cx.new(|cx| {
4031        Thread::new(
4032            project,
4033            project_context.clone(),
4034            context_server_registry,
4035            templates,
4036            Some(model.clone()),
4037            cx,
4038        )
4039    });
4040    ThreadTest {
4041        model,
4042        thread,
4043        project_context,
4044        context_server_store,
4045        fs,
4046    }
4047}
4048
4049#[cfg(test)]
4050#[ctor::ctor]
4051fn init_logger() {
4052    if std::env::var("RUST_LOG").is_ok() {
4053        env_logger::init();
4054    }
4055}
4056
4057fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4058    let fs = fs.clone();
4059    cx.spawn({
4060        async move |cx| {
4061            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4062                cx.background_executor(),
4063                fs,
4064                paths::settings_file().clone(),
4065            );
4066            let _watcher_task = watcher_task;
4067
4068            while let Some(new_settings_content) = new_settings_content_rx.next().await {
4069                cx.update(|cx| {
4070                    SettingsStore::update_global(cx, |settings, cx| {
4071                        settings.set_user_settings(&new_settings_content, cx)
4072                    })
4073                })
4074                .ok();
4075            }
4076        }
4077    })
4078    .detach();
4079}
4080
4081fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4082    completion
4083        .tools
4084        .iter()
4085        .map(|tool| tool.name.clone())
4086        .collect()
4087}
4088
4089fn setup_context_server(
4090    name: &'static str,
4091    tools: Vec<context_server::types::Tool>,
4092    context_server_store: &Entity<ContextServerStore>,
4093    cx: &mut TestAppContext,
4094) -> mpsc::UnboundedReceiver<(
4095    context_server::types::CallToolParams,
4096    oneshot::Sender<context_server::types::CallToolResponse>,
4097)> {
4098    cx.update(|cx| {
4099        let mut settings = ProjectSettings::get_global(cx).clone();
4100        settings.context_servers.insert(
4101            name.into(),
4102            project::project_settings::ContextServerSettings::Stdio {
4103                enabled: true,
4104                remote: false,
4105                command: ContextServerCommand {
4106                    path: "somebinary".into(),
4107                    args: Vec::new(),
4108                    env: None,
4109                    timeout: None,
4110                },
4111            },
4112        );
4113        ProjectSettings::override_global(settings, cx);
4114    });
4115
4116    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4117    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4118        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4119            context_server::types::InitializeResponse {
4120                protocol_version: context_server::types::ProtocolVersion(
4121                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4122                ),
4123                server_info: context_server::types::Implementation {
4124                    name: name.into(),
4125                    version: "1.0.0".to_string(),
4126                },
4127                capabilities: context_server::types::ServerCapabilities {
4128                    tools: Some(context_server::types::ToolsCapabilities {
4129                        list_changed: Some(true),
4130                    }),
4131                    ..Default::default()
4132                },
4133                meta: None,
4134            }
4135        })
4136        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4137            let tools = tools.clone();
4138            async move {
4139                context_server::types::ListToolsResponse {
4140                    tools,
4141                    next_cursor: None,
4142                    meta: None,
4143                }
4144            }
4145        })
4146        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4147            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4148            async move {
4149                let (response_tx, response_rx) = oneshot::channel();
4150                mcp_tool_calls_tx
4151                    .unbounded_send((params, response_tx))
4152                    .unwrap();
4153                response_rx.await.unwrap()
4154            }
4155        });
4156    context_server_store.update(cx, |store, cx| {
4157        store.start_server(
4158            Arc::new(ContextServer::new(
4159                ContextServerId(name.into()),
4160                Arc::new(fake_transport),
4161            )),
4162            cx,
4163        );
4164    });
4165    cx.run_until_parked();
4166    mcp_tool_calls_rx
4167}
4168
4169#[gpui::test]
4170async fn test_tokens_before_message(cx: &mut TestAppContext) {
4171    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4172    let fake_model = model.as_fake();
4173
4174    // First message
4175    let message_1_id = UserMessageId::new();
4176    thread
4177        .update(cx, |thread, cx| {
4178            thread.send(message_1_id.clone(), ["First message"], cx)
4179        })
4180        .unwrap();
4181    cx.run_until_parked();
4182
4183    // Before any response, tokens_before_message should return None for first message
4184    thread.read_with(cx, |thread, _| {
4185        assert_eq!(
4186            thread.tokens_before_message(&message_1_id),
4187            None,
4188            "First message should have no tokens before it"
4189        );
4190    });
4191
4192    // Complete first message with usage
4193    fake_model.send_last_completion_stream_text_chunk("Response 1");
4194    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4195        language_model::TokenUsage {
4196            input_tokens: 100,
4197            output_tokens: 50,
4198            cache_creation_input_tokens: 0,
4199            cache_read_input_tokens: 0,
4200        },
4201    ));
4202    fake_model.end_last_completion_stream();
4203    cx.run_until_parked();
4204
4205    // First message still has no tokens before it
4206    thread.read_with(cx, |thread, _| {
4207        assert_eq!(
4208            thread.tokens_before_message(&message_1_id),
4209            None,
4210            "First message should still have no tokens before it after response"
4211        );
4212    });
4213
4214    // Second message
4215    let message_2_id = UserMessageId::new();
4216    thread
4217        .update(cx, |thread, cx| {
4218            thread.send(message_2_id.clone(), ["Second message"], cx)
4219        })
4220        .unwrap();
4221    cx.run_until_parked();
4222
4223    // Second message should have first message's input tokens before it
4224    thread.read_with(cx, |thread, _| {
4225        assert_eq!(
4226            thread.tokens_before_message(&message_2_id),
4227            Some(100),
4228            "Second message should have 100 tokens before it (from first request)"
4229        );
4230    });
4231
4232    // Complete second message
4233    fake_model.send_last_completion_stream_text_chunk("Response 2");
4234    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4235        language_model::TokenUsage {
4236            input_tokens: 250, // Total for this request (includes previous context)
4237            output_tokens: 75,
4238            cache_creation_input_tokens: 0,
4239            cache_read_input_tokens: 0,
4240        },
4241    ));
4242    fake_model.end_last_completion_stream();
4243    cx.run_until_parked();
4244
4245    // Third message
4246    let message_3_id = UserMessageId::new();
4247    thread
4248        .update(cx, |thread, cx| {
4249            thread.send(message_3_id.clone(), ["Third message"], cx)
4250        })
4251        .unwrap();
4252    cx.run_until_parked();
4253
4254    // Third message should have second message's input tokens (250) before it
4255    thread.read_with(cx, |thread, _| {
4256        assert_eq!(
4257            thread.tokens_before_message(&message_3_id),
4258            Some(250),
4259            "Third message should have 250 tokens before it (from second request)"
4260        );
4261        // Second message should still have 100
4262        assert_eq!(
4263            thread.tokens_before_message(&message_2_id),
4264            Some(100),
4265            "Second message should still have 100 tokens before it"
4266        );
4267        // First message still has none
4268        assert_eq!(
4269            thread.tokens_before_message(&message_1_id),
4270            None,
4271            "First message should still have no tokens before it"
4272        );
4273    });
4274}
4275
4276#[gpui::test]
4277async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4278    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4279    let fake_model = model.as_fake();
4280
4281    // Set up three messages with responses
4282    let message_1_id = UserMessageId::new();
4283    thread
4284        .update(cx, |thread, cx| {
4285            thread.send(message_1_id.clone(), ["Message 1"], cx)
4286        })
4287        .unwrap();
4288    cx.run_until_parked();
4289    fake_model.send_last_completion_stream_text_chunk("Response 1");
4290    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4291        language_model::TokenUsage {
4292            input_tokens: 100,
4293            output_tokens: 50,
4294            cache_creation_input_tokens: 0,
4295            cache_read_input_tokens: 0,
4296        },
4297    ));
4298    fake_model.end_last_completion_stream();
4299    cx.run_until_parked();
4300
4301    let message_2_id = UserMessageId::new();
4302    thread
4303        .update(cx, |thread, cx| {
4304            thread.send(message_2_id.clone(), ["Message 2"], cx)
4305        })
4306        .unwrap();
4307    cx.run_until_parked();
4308    fake_model.send_last_completion_stream_text_chunk("Response 2");
4309    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4310        language_model::TokenUsage {
4311            input_tokens: 250,
4312            output_tokens: 75,
4313            cache_creation_input_tokens: 0,
4314            cache_read_input_tokens: 0,
4315        },
4316    ));
4317    fake_model.end_last_completion_stream();
4318    cx.run_until_parked();
4319
4320    // Verify initial state
4321    thread.read_with(cx, |thread, _| {
4322        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4323    });
4324
4325    // Truncate at message 2 (removes message 2 and everything after)
4326    thread
4327        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4328        .unwrap();
4329    cx.run_until_parked();
4330
4331    // After truncation, message_2_id no longer exists, so lookup should return None
4332    thread.read_with(cx, |thread, _| {
4333        assert_eq!(
4334            thread.tokens_before_message(&message_2_id),
4335            None,
4336            "After truncation, message 2 no longer exists"
4337        );
4338        // Message 1 still exists but has no tokens before it
4339        assert_eq!(
4340            thread.tokens_before_message(&message_1_id),
4341            None,
4342            "First message still has no tokens before it"
4343        );
4344    });
4345}
4346
4347#[gpui::test]
4348async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4349    init_test(cx);
4350
4351    let fs = FakeFs::new(cx.executor());
4352    fs.insert_tree("/root", json!({})).await;
4353    let project = Project::test(fs, ["/root".as_ref()], cx).await;
4354
4355    // Test 1: Deny rule blocks command
4356    {
4357        let environment = Rc::new(cx.update(|cx| {
4358            FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4359        }));
4360
4361        cx.update(|cx| {
4362            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4363            settings.tool_permissions.tools.insert(
4364                TerminalTool::NAME.into(),
4365                agent_settings::ToolRules {
4366                    default: Some(settings::ToolPermissionMode::Confirm),
4367                    always_allow: vec![],
4368                    always_deny: vec![
4369                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4370                    ],
4371                    always_confirm: vec![],
4372                    invalid_patterns: vec![],
4373                },
4374            );
4375            agent_settings::AgentSettings::override_global(settings, cx);
4376        });
4377
4378        #[allow(clippy::arc_with_non_send_sync)]
4379        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4380        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4381
4382        let task = cx.update(|cx| {
4383            tool.run(
4384                ToolInput::resolved(crate::TerminalToolInput {
4385                    command: "rm -rf /".to_string(),
4386                    cd: ".".to_string(),
4387                    timeout_ms: None,
4388                }),
4389                event_stream,
4390                cx,
4391            )
4392        });
4393
4394        let result = task.await;
4395        assert!(
4396            result.is_err(),
4397            "expected command to be blocked by deny rule"
4398        );
4399        let err_msg = result.unwrap_err().to_lowercase();
4400        assert!(
4401            err_msg.contains("blocked"),
4402            "error should mention the command was blocked"
4403        );
4404    }
4405
4406    // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4407    {
4408        let environment = Rc::new(cx.update(|cx| {
4409            FakeThreadEnvironment::default()
4410                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4411        }));
4412
4413        cx.update(|cx| {
4414            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4415            settings.tool_permissions.tools.insert(
4416                TerminalTool::NAME.into(),
4417                agent_settings::ToolRules {
4418                    default: Some(settings::ToolPermissionMode::Deny),
4419                    always_allow: vec![
4420                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4421                    ],
4422                    always_deny: vec![],
4423                    always_confirm: vec![],
4424                    invalid_patterns: vec![],
4425                },
4426            );
4427            agent_settings::AgentSettings::override_global(settings, cx);
4428        });
4429
4430        #[allow(clippy::arc_with_non_send_sync)]
4431        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4432        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4433
4434        let task = cx.update(|cx| {
4435            tool.run(
4436                ToolInput::resolved(crate::TerminalToolInput {
4437                    command: "echo hello".to_string(),
4438                    cd: ".".to_string(),
4439                    timeout_ms: None,
4440                }),
4441                event_stream,
4442                cx,
4443            )
4444        });
4445
4446        let update = rx.expect_update_fields().await;
4447        assert!(
4448            update.content.iter().any(|blocks| {
4449                blocks
4450                    .iter()
4451                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4452            }),
4453            "expected terminal content (allow rule should skip confirmation and override default deny)"
4454        );
4455
4456        let result = task.await;
4457        assert!(
4458            result.is_ok(),
4459            "expected command to succeed without confirmation"
4460        );
4461    }
4462
4463    // Test 3: global default: allow does NOT override always_confirm patterns
4464    {
4465        let environment = Rc::new(cx.update(|cx| {
4466            FakeThreadEnvironment::default()
4467                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4468        }));
4469
4470        cx.update(|cx| {
4471            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4472            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4473            settings.tool_permissions.tools.insert(
4474                TerminalTool::NAME.into(),
4475                agent_settings::ToolRules {
4476                    default: Some(settings::ToolPermissionMode::Allow),
4477                    always_allow: vec![],
4478                    always_deny: vec![],
4479                    always_confirm: vec![
4480                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4481                    ],
4482                    invalid_patterns: vec![],
4483                },
4484            );
4485            agent_settings::AgentSettings::override_global(settings, cx);
4486        });
4487
4488        #[allow(clippy::arc_with_non_send_sync)]
4489        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4490        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4491
4492        let _task = cx.update(|cx| {
4493            tool.run(
4494                ToolInput::resolved(crate::TerminalToolInput {
4495                    command: "sudo rm file".to_string(),
4496                    cd: ".".to_string(),
4497                    timeout_ms: None,
4498                }),
4499                event_stream,
4500                cx,
4501            )
4502        });
4503
4504        // With global default: allow, confirm patterns are still respected
4505        // The expect_authorization() call will panic if no authorization is requested,
4506        // which validates that the confirm pattern still triggers confirmation
4507        let _auth = rx.expect_authorization().await;
4508
4509        drop(_task);
4510    }
4511
4512    // Test 4: tool-specific default: deny is respected even with global default: allow
4513    {
4514        let environment = Rc::new(cx.update(|cx| {
4515            FakeThreadEnvironment::default()
4516                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4517        }));
4518
4519        cx.update(|cx| {
4520            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4521            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4522            settings.tool_permissions.tools.insert(
4523                TerminalTool::NAME.into(),
4524                agent_settings::ToolRules {
4525                    default: Some(settings::ToolPermissionMode::Deny),
4526                    always_allow: vec![],
4527                    always_deny: vec![],
4528                    always_confirm: vec![],
4529                    invalid_patterns: vec![],
4530                },
4531            );
4532            agent_settings::AgentSettings::override_global(settings, cx);
4533        });
4534
4535        #[allow(clippy::arc_with_non_send_sync)]
4536        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4537        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4538
4539        let task = cx.update(|cx| {
4540            tool.run(
4541                ToolInput::resolved(crate::TerminalToolInput {
4542                    command: "echo hello".to_string(),
4543                    cd: ".".to_string(),
4544                    timeout_ms: None,
4545                }),
4546                event_stream,
4547                cx,
4548            )
4549        });
4550
4551        // tool-specific default: deny is respected even with global default: allow
4552        let result = task.await;
4553        assert!(
4554            result.is_err(),
4555            "expected command to be blocked by tool-specific deny default"
4556        );
4557        let err_msg = result.unwrap_err().to_lowercase();
4558        assert!(
4559            err_msg.contains("disabled"),
4560            "error should mention the tool is disabled, got: {err_msg}"
4561        );
4562    }
4563}
4564
4565#[gpui::test]
4566async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4567    init_test(cx);
4568    cx.update(|cx| {
4569        LanguageModelRegistry::test(cx);
4570    });
4571    cx.update(|cx| {
4572        cx.update_flags(true, vec!["subagents".to_string()]);
4573    });
4574
4575    let fs = FakeFs::new(cx.executor());
4576    fs.insert_tree(
4577        "/",
4578        json!({
4579            "a": {
4580                "b.md": "Lorem"
4581            }
4582        }),
4583    )
4584    .await;
4585    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4586    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4587    let agent = cx.update(|cx| {
4588        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4589    });
4590    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4591
4592    let acp_thread = cx
4593        .update(|cx| {
4594            connection
4595                .clone()
4596                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4597        })
4598        .await
4599        .unwrap();
4600    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4601    let thread = agent.read_with(cx, |agent, _| {
4602        agent.sessions.get(&session_id).unwrap().thread.clone()
4603    });
4604    let model = Arc::new(FakeLanguageModel::default());
4605
4606    // Ensure empty threads are not saved, even if they get mutated.
4607    thread.update(cx, |thread, cx| {
4608        thread.set_model(model.clone(), cx);
4609    });
4610    cx.run_until_parked();
4611
4612    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4613    cx.run_until_parked();
4614    model.send_last_completion_stream_text_chunk("spawning subagent");
4615    let subagent_tool_input = SpawnAgentToolInput {
4616        label: "label".to_string(),
4617        message: "subagent task prompt".to_string(),
4618        session_id: None,
4619    };
4620    let subagent_tool_use = LanguageModelToolUse {
4621        id: "subagent_1".into(),
4622        name: SpawnAgentTool::NAME.into(),
4623        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4624        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4625        is_input_complete: true,
4626        thought_signature: None,
4627    };
4628    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4629        subagent_tool_use,
4630    ));
4631    model.end_last_completion_stream();
4632
4633    cx.run_until_parked();
4634
4635    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4636        thread
4637            .running_subagent_ids(cx)
4638            .get(0)
4639            .expect("subagent thread should be running")
4640            .clone()
4641    });
4642
4643    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4644        agent
4645            .sessions
4646            .get(&subagent_session_id)
4647            .expect("subagent session should exist")
4648            .acp_thread
4649            .clone()
4650    });
4651
4652    model.send_last_completion_stream_text_chunk("subagent task response");
4653    model.end_last_completion_stream();
4654
4655    cx.run_until_parked();
4656
4657    assert_eq!(
4658        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4659        indoc! {"
4660            ## User
4661
4662            subagent task prompt
4663
4664            ## Assistant
4665
4666            subagent task response
4667
4668        "}
4669    );
4670
4671    model.send_last_completion_stream_text_chunk("Response");
4672    model.end_last_completion_stream();
4673
4674    send.await.unwrap();
4675
4676    assert_eq!(
4677        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4678        indoc! {r#"
4679            ## User
4680
4681            Prompt
4682
4683            ## Assistant
4684
4685            spawning subagent
4686
4687            **Tool Call: label**
4688            Status: Completed
4689
4690            subagent task response
4691
4692            ## Assistant
4693
4694            Response
4695
4696        "#},
4697    );
4698}
4699
4700#[gpui::test]
4701async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4702    init_test(cx);
4703    cx.update(|cx| {
4704        LanguageModelRegistry::test(cx);
4705    });
4706    cx.update(|cx| {
4707        cx.update_flags(true, vec!["subagents".to_string()]);
4708    });
4709
4710    let fs = FakeFs::new(cx.executor());
4711    fs.insert_tree(
4712        "/",
4713        json!({
4714            "a": {
4715                "b.md": "Lorem"
4716            }
4717        }),
4718    )
4719    .await;
4720    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4721    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4722    let agent = cx.update(|cx| {
4723        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4724    });
4725    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4726
4727    let acp_thread = cx
4728        .update(|cx| {
4729            connection
4730                .clone()
4731                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4732        })
4733        .await
4734        .unwrap();
4735    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4736    let thread = agent.read_with(cx, |agent, _| {
4737        agent.sessions.get(&session_id).unwrap().thread.clone()
4738    });
4739    let model = Arc::new(FakeLanguageModel::default());
4740
4741    // Ensure empty threads are not saved, even if they get mutated.
4742    thread.update(cx, |thread, cx| {
4743        thread.set_model(model.clone(), cx);
4744    });
4745    cx.run_until_parked();
4746
4747    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4748    cx.run_until_parked();
4749    model.send_last_completion_stream_text_chunk("spawning subagent");
4750    let subagent_tool_input = SpawnAgentToolInput {
4751        label: "label".to_string(),
4752        message: "subagent task prompt".to_string(),
4753        session_id: None,
4754    };
4755    let subagent_tool_use = LanguageModelToolUse {
4756        id: "subagent_1".into(),
4757        name: SpawnAgentTool::NAME.into(),
4758        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4759        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4760        is_input_complete: true,
4761        thought_signature: None,
4762    };
4763    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4764        subagent_tool_use,
4765    ));
4766    model.end_last_completion_stream();
4767
4768    cx.run_until_parked();
4769
4770    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4771        thread
4772            .running_subagent_ids(cx)
4773            .get(0)
4774            .expect("subagent thread should be running")
4775            .clone()
4776    });
4777
4778    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4779        agent
4780            .sessions
4781            .get(&subagent_session_id)
4782            .expect("subagent session should exist")
4783            .acp_thread
4784            .clone()
4785    });
4786
4787    model.send_last_completion_stream_text_chunk("subagent task response 1");
4788    model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4789        text: "thinking more about the subagent task".into(),
4790        signature: None,
4791    });
4792    model.send_last_completion_stream_text_chunk("subagent task response 2");
4793    model.end_last_completion_stream();
4794
4795    cx.run_until_parked();
4796
4797    assert_eq!(
4798        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4799        indoc! {"
4800            ## User
4801
4802            subagent task prompt
4803
4804            ## Assistant
4805
4806            subagent task response 1
4807
4808            <thinking>
4809            thinking more about the subagent task
4810            </thinking>
4811
4812            subagent task response 2
4813
4814        "}
4815    );
4816
4817    model.send_last_completion_stream_text_chunk("Response");
4818    model.end_last_completion_stream();
4819
4820    send.await.unwrap();
4821
4822    assert_eq!(
4823        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4824        indoc! {r#"
4825            ## User
4826
4827            Prompt
4828
4829            ## Assistant
4830
4831            spawning subagent
4832
4833            **Tool Call: label**
4834            Status: Completed
4835
4836            subagent task response 1
4837
4838            subagent task response 2
4839
4840            ## Assistant
4841
4842            Response
4843
4844        "#},
4845    );
4846}
4847
4848#[gpui::test]
4849async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4850    init_test(cx);
4851    cx.update(|cx| {
4852        LanguageModelRegistry::test(cx);
4853    });
4854    cx.update(|cx| {
4855        cx.update_flags(true, vec!["subagents".to_string()]);
4856    });
4857
4858    let fs = FakeFs::new(cx.executor());
4859    fs.insert_tree(
4860        "/",
4861        json!({
4862            "a": {
4863                "b.md": "Lorem"
4864            }
4865        }),
4866    )
4867    .await;
4868    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4869    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4870    let agent = cx.update(|cx| {
4871        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4872    });
4873    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4874
4875    let acp_thread = cx
4876        .update(|cx| {
4877            connection
4878                .clone()
4879                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4880        })
4881        .await
4882        .unwrap();
4883    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4884    let thread = agent.read_with(cx, |agent, _| {
4885        agent.sessions.get(&session_id).unwrap().thread.clone()
4886    });
4887    let model = Arc::new(FakeLanguageModel::default());
4888
4889    // Ensure empty threads are not saved, even if they get mutated.
4890    thread.update(cx, |thread, cx| {
4891        thread.set_model(model.clone(), cx);
4892    });
4893    cx.run_until_parked();
4894
4895    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4896    cx.run_until_parked();
4897    model.send_last_completion_stream_text_chunk("spawning subagent");
4898    let subagent_tool_input = SpawnAgentToolInput {
4899        label: "label".to_string(),
4900        message: "subagent task prompt".to_string(),
4901        session_id: None,
4902    };
4903    let subagent_tool_use = LanguageModelToolUse {
4904        id: "subagent_1".into(),
4905        name: SpawnAgentTool::NAME.into(),
4906        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4907        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4908        is_input_complete: true,
4909        thought_signature: None,
4910    };
4911    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4912        subagent_tool_use,
4913    ));
4914    model.end_last_completion_stream();
4915
4916    cx.run_until_parked();
4917
4918    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4919        thread
4920            .running_subagent_ids(cx)
4921            .get(0)
4922            .expect("subagent thread should be running")
4923            .clone()
4924    });
4925    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4926        agent
4927            .sessions
4928            .get(&subagent_session_id)
4929            .expect("subagent session should exist")
4930            .acp_thread
4931            .clone()
4932    });
4933
4934    // model.send_last_completion_stream_text_chunk("subagent task response");
4935    // model.end_last_completion_stream();
4936
4937    // cx.run_until_parked();
4938
4939    acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4940
4941    cx.run_until_parked();
4942
4943    send.await.unwrap();
4944
4945    acp_thread.read_with(cx, |thread, cx| {
4946        assert_eq!(thread.status(), ThreadStatus::Idle);
4947        assert_eq!(
4948            thread.to_markdown(cx),
4949            indoc! {"
4950                ## User
4951
4952                Prompt
4953
4954                ## Assistant
4955
4956                spawning subagent
4957
4958                **Tool Call: label**
4959                Status: Canceled
4960
4961            "}
4962        );
4963    });
4964    subagent_acp_thread.read_with(cx, |thread, cx| {
4965        assert_eq!(thread.status(), ThreadStatus::Idle);
4966        assert_eq!(
4967            thread.to_markdown(cx),
4968            indoc! {"
4969                ## User
4970
4971                subagent task prompt
4972
4973            "}
4974        );
4975    });
4976}
4977
4978#[gpui::test]
4979async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4980    init_test(cx);
4981    cx.update(|cx| {
4982        LanguageModelRegistry::test(cx);
4983    });
4984    cx.update(|cx| {
4985        cx.update_flags(true, vec!["subagents".to_string()]);
4986    });
4987
4988    let fs = FakeFs::new(cx.executor());
4989    fs.insert_tree(
4990        "/",
4991        json!({
4992            "a": {
4993                "b.md": "Lorem"
4994            }
4995        }),
4996    )
4997    .await;
4998    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4999    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5000    let agent = cx.update(|cx| {
5001        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5002    });
5003    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5004
5005    let acp_thread = cx
5006        .update(|cx| {
5007            connection
5008                .clone()
5009                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5010        })
5011        .await
5012        .unwrap();
5013    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5014    let thread = agent.read_with(cx, |agent, _| {
5015        agent.sessions.get(&session_id).unwrap().thread.clone()
5016    });
5017    let model = Arc::new(FakeLanguageModel::default());
5018
5019    thread.update(cx, |thread, cx| {
5020        thread.set_model(model.clone(), cx);
5021    });
5022    cx.run_until_parked();
5023
5024    // === First turn: create subagent ===
5025    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5026    cx.run_until_parked();
5027    model.send_last_completion_stream_text_chunk("spawning subagent");
5028    let subagent_tool_input = SpawnAgentToolInput {
5029        label: "initial task".to_string(),
5030        message: "do the first task".to_string(),
5031        session_id: None,
5032    };
5033    let subagent_tool_use = LanguageModelToolUse {
5034        id: "subagent_1".into(),
5035        name: SpawnAgentTool::NAME.into(),
5036        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5037        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5038        is_input_complete: true,
5039        thought_signature: None,
5040    };
5041    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5042        subagent_tool_use,
5043    ));
5044    model.end_last_completion_stream();
5045
5046    cx.run_until_parked();
5047
5048    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5049        thread
5050            .running_subagent_ids(cx)
5051            .get(0)
5052            .expect("subagent thread should be running")
5053            .clone()
5054    });
5055
5056    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5057        agent
5058            .sessions
5059            .get(&subagent_session_id)
5060            .expect("subagent session should exist")
5061            .acp_thread
5062            .clone()
5063    });
5064
5065    // Subagent responds
5066    model.send_last_completion_stream_text_chunk("first task response");
5067    model.end_last_completion_stream();
5068
5069    cx.run_until_parked();
5070
5071    // Parent model responds to complete first turn
5072    model.send_last_completion_stream_text_chunk("First response");
5073    model.end_last_completion_stream();
5074
5075    send.await.unwrap();
5076
5077    // Verify subagent is no longer running
5078    thread.read_with(cx, |thread, cx| {
5079        assert!(
5080            thread.running_subagent_ids(cx).is_empty(),
5081            "subagent should not be running after completion"
5082        );
5083    });
5084
5085    // === Second turn: resume subagent with session_id ===
5086    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5087    cx.run_until_parked();
5088    model.send_last_completion_stream_text_chunk("resuming subagent");
5089    let resume_tool_input = SpawnAgentToolInput {
5090        label: "follow-up task".to_string(),
5091        message: "do the follow-up task".to_string(),
5092        session_id: Some(subagent_session_id.clone()),
5093    };
5094    let resume_tool_use = LanguageModelToolUse {
5095        id: "subagent_2".into(),
5096        name: SpawnAgentTool::NAME.into(),
5097        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5098        input: serde_json::to_value(&resume_tool_input).unwrap(),
5099        is_input_complete: true,
5100        thought_signature: None,
5101    };
5102    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5103    model.end_last_completion_stream();
5104
5105    cx.run_until_parked();
5106
5107    // Subagent should be running again with the same session
5108    thread.read_with(cx, |thread, cx| {
5109        let running = thread.running_subagent_ids(cx);
5110        assert_eq!(running.len(), 1, "subagent should be running");
5111        assert_eq!(running[0], subagent_session_id, "should be same session");
5112    });
5113
5114    // Subagent responds to follow-up
5115    model.send_last_completion_stream_text_chunk("follow-up task response");
5116    model.end_last_completion_stream();
5117
5118    cx.run_until_parked();
5119
5120    // Parent model responds to complete second turn
5121    model.send_last_completion_stream_text_chunk("Second response");
5122    model.end_last_completion_stream();
5123
5124    send2.await.unwrap();
5125
5126    // Verify subagent is no longer running
5127    thread.read_with(cx, |thread, cx| {
5128        assert!(
5129            thread.running_subagent_ids(cx).is_empty(),
5130            "subagent should not be running after resume completion"
5131        );
5132    });
5133
5134    // Verify the subagent's acp thread has both conversation turns
5135    assert_eq!(
5136        subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5137        indoc! {"
5138            ## User
5139
5140            do the first task
5141
5142            ## Assistant
5143
5144            first task response
5145
5146            ## User
5147
5148            do the follow-up task
5149
5150            ## Assistant
5151
5152            follow-up task response
5153
5154        "}
5155    );
5156}
5157
5158#[gpui::test]
5159async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5160    init_test(cx);
5161
5162    cx.update(|cx| {
5163        cx.update_flags(true, vec!["subagents".to_string()]);
5164    });
5165
5166    let fs = FakeFs::new(cx.executor());
5167    fs.insert_tree(path!("/test"), json!({})).await;
5168    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5169    let project_context = cx.new(|_cx| ProjectContext::default());
5170    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5171    let context_server_registry =
5172        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5173    let model = Arc::new(FakeLanguageModel::default());
5174
5175    let parent_thread = cx.new(|cx| {
5176        Thread::new(
5177            project.clone(),
5178            project_context,
5179            context_server_registry,
5180            Templates::new(),
5181            Some(model.clone()),
5182            cx,
5183        )
5184    });
5185
5186    let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5187    subagent_thread.read_with(cx, |subagent_thread, cx| {
5188        assert!(subagent_thread.is_subagent());
5189        assert_eq!(subagent_thread.depth(), 1);
5190        assert_eq!(
5191            subagent_thread.model().map(|model| model.id()),
5192            Some(model.id())
5193        );
5194        assert_eq!(
5195            subagent_thread.parent_thread_id(),
5196            Some(parent_thread.read(cx).id().clone())
5197        );
5198    });
5199}
5200
5201#[gpui::test]
5202async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5203    init_test(cx);
5204
5205    cx.update(|cx| {
5206        cx.update_flags(true, vec!["subagents".to_string()]);
5207    });
5208
5209    let fs = FakeFs::new(cx.executor());
5210    fs.insert_tree(path!("/test"), json!({})).await;
5211    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5212    let project_context = cx.new(|_cx| ProjectContext::default());
5213    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5214    let context_server_registry =
5215        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5216    let model = Arc::new(FakeLanguageModel::default());
5217    let environment = Rc::new(cx.update(|cx| {
5218        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5219    }));
5220
5221    let deep_parent_thread = cx.new(|cx| {
5222        let mut thread = Thread::new(
5223            project.clone(),
5224            project_context,
5225            context_server_registry,
5226            Templates::new(),
5227            Some(model.clone()),
5228            cx,
5229        );
5230        thread.set_subagent_context(SubagentContext {
5231            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5232            depth: MAX_SUBAGENT_DEPTH - 1,
5233        });
5234        thread
5235    });
5236    let deep_subagent_thread = cx.new(|cx| {
5237        let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5238        thread.add_default_tools(environment, cx);
5239        thread
5240    });
5241
5242    deep_subagent_thread.read_with(cx, |thread, _| {
5243        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5244        assert!(
5245            !thread.has_registered_tool(SpawnAgentTool::NAME),
5246            "subagent tool should not be present at max depth"
5247        );
5248    });
5249}
5250
5251#[gpui::test]
5252async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5253    init_test(cx);
5254
5255    cx.update(|cx| {
5256        cx.update_flags(true, vec!["subagents".to_string()]);
5257    });
5258
5259    let fs = FakeFs::new(cx.executor());
5260    fs.insert_tree(path!("/test"), json!({})).await;
5261    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5262    let project_context = cx.new(|_cx| ProjectContext::default());
5263    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5264    let context_server_registry =
5265        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5266    let model = Arc::new(FakeLanguageModel::default());
5267
5268    let parent = cx.new(|cx| {
5269        Thread::new(
5270            project.clone(),
5271            project_context.clone(),
5272            context_server_registry.clone(),
5273            Templates::new(),
5274            Some(model.clone()),
5275            cx,
5276        )
5277    });
5278
5279    let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5280
5281    parent.update(cx, |thread, _cx| {
5282        thread.register_running_subagent(subagent.downgrade());
5283    });
5284
5285    subagent
5286        .update(cx, |thread, cx| {
5287            thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5288        })
5289        .unwrap();
5290    cx.run_until_parked();
5291
5292    subagent.read_with(cx, |thread, _| {
5293        assert!(!thread.is_turn_complete(), "subagent should be running");
5294    });
5295
5296    parent.update(cx, |thread, cx| {
5297        thread.cancel(cx).detach();
5298    });
5299
5300    subagent.read_with(cx, |thread, _| {
5301        assert!(
5302            thread.is_turn_complete(),
5303            "subagent should be cancelled when parent cancels"
5304        );
5305    });
5306}
5307
5308#[gpui::test]
5309async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5310    init_test(cx);
5311    cx.update(|cx| {
5312        LanguageModelRegistry::test(cx);
5313    });
5314    cx.update(|cx| {
5315        cx.update_flags(true, vec!["subagents".to_string()]);
5316    });
5317
5318    let fs = FakeFs::new(cx.executor());
5319    fs.insert_tree(
5320        "/",
5321        json!({
5322            "a": {
5323                "b.md": "Lorem"
5324            }
5325        }),
5326    )
5327    .await;
5328    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5329    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5330    let agent = cx.update(|cx| {
5331        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5332    });
5333    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5334
5335    let acp_thread = cx
5336        .update(|cx| {
5337            connection
5338                .clone()
5339                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5340        })
5341        .await
5342        .unwrap();
5343    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5344    let thread = agent.read_with(cx, |agent, _| {
5345        agent.sessions.get(&session_id).unwrap().thread.clone()
5346    });
5347    let model = Arc::new(FakeLanguageModel::default());
5348
5349    thread.update(cx, |thread, cx| {
5350        thread.set_model(model.clone(), cx);
5351    });
5352    cx.run_until_parked();
5353
5354    // Start the parent turn
5355    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5356    cx.run_until_parked();
5357    model.send_last_completion_stream_text_chunk("spawning subagent");
5358    let subagent_tool_input = SpawnAgentToolInput {
5359        label: "label".to_string(),
5360        message: "subagent task prompt".to_string(),
5361        session_id: None,
5362    };
5363    let subagent_tool_use = LanguageModelToolUse {
5364        id: "subagent_1".into(),
5365        name: SpawnAgentTool::NAME.into(),
5366        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5367        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5368        is_input_complete: true,
5369        thought_signature: None,
5370    };
5371    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5372        subagent_tool_use,
5373    ));
5374    model.end_last_completion_stream();
5375
5376    cx.run_until_parked();
5377
5378    // Verify subagent is running
5379    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5380        thread
5381            .running_subagent_ids(cx)
5382            .get(0)
5383            .expect("subagent thread should be running")
5384            .clone()
5385    });
5386
5387    // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5388    model.send_last_completion_stream_text_chunk("partial work");
5389    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5390        TokenUsage {
5391            input_tokens: 850_000,
5392            output_tokens: 0,
5393            cache_creation_input_tokens: 0,
5394            cache_read_input_tokens: 0,
5395        },
5396    ));
5397
5398    cx.run_until_parked();
5399
5400    // The subagent should no longer be running
5401    thread.read_with(cx, |thread, cx| {
5402        assert!(
5403            thread.running_subagent_ids(cx).is_empty(),
5404            "subagent should be stopped after context window warning"
5405        );
5406    });
5407
5408    // The parent model should get a new completion request to respond to the tool error
5409    model.send_last_completion_stream_text_chunk("Response after warning");
5410    model.end_last_completion_stream();
5411
5412    send.await.unwrap();
5413
5414    // Verify the parent thread shows the warning error in the tool call
5415    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5416    assert!(
5417        markdown.contains("nearing the end of its context window"),
5418        "tool output should contain context window warning message, got:\n{markdown}"
5419    );
5420    assert!(
5421        markdown.contains("Status: Failed"),
5422        "tool call should have Failed status, got:\n{markdown}"
5423    );
5424
5425    // Verify the subagent session still exists (can be resumed)
5426    agent.read_with(cx, |agent, _cx| {
5427        assert!(
5428            agent.sessions.contains_key(&subagent_session_id),
5429            "subagent session should still exist for potential resume"
5430        );
5431    });
5432}
5433
5434#[gpui::test]
5435async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5436    init_test(cx);
5437    cx.update(|cx| {
5438        LanguageModelRegistry::test(cx);
5439    });
5440    cx.update(|cx| {
5441        cx.update_flags(true, vec!["subagents".to_string()]);
5442    });
5443
5444    let fs = FakeFs::new(cx.executor());
5445    fs.insert_tree(
5446        "/",
5447        json!({
5448            "a": {
5449                "b.md": "Lorem"
5450            }
5451        }),
5452    )
5453    .await;
5454    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5455    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5456    let agent = cx.update(|cx| {
5457        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5458    });
5459    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5460
5461    let acp_thread = cx
5462        .update(|cx| {
5463            connection
5464                .clone()
5465                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5466        })
5467        .await
5468        .unwrap();
5469    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5470    let thread = agent.read_with(cx, |agent, _| {
5471        agent.sessions.get(&session_id).unwrap().thread.clone()
5472    });
5473    let model = Arc::new(FakeLanguageModel::default());
5474
5475    thread.update(cx, |thread, cx| {
5476        thread.set_model(model.clone(), cx);
5477    });
5478    cx.run_until_parked();
5479
5480    // === First turn: create subagent, trigger context window warning ===
5481    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5482    cx.run_until_parked();
5483    model.send_last_completion_stream_text_chunk("spawning subagent");
5484    let subagent_tool_input = SpawnAgentToolInput {
5485        label: "initial task".to_string(),
5486        message: "do the first task".to_string(),
5487        session_id: None,
5488    };
5489    let subagent_tool_use = LanguageModelToolUse {
5490        id: "subagent_1".into(),
5491        name: SpawnAgentTool::NAME.into(),
5492        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5493        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5494        is_input_complete: true,
5495        thought_signature: None,
5496    };
5497    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5498        subagent_tool_use,
5499    ));
5500    model.end_last_completion_stream();
5501
5502    cx.run_until_parked();
5503
5504    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5505        thread
5506            .running_subagent_ids(cx)
5507            .get(0)
5508            .expect("subagent thread should be running")
5509            .clone()
5510    });
5511
5512    // Subagent sends a usage update that crosses the warning threshold.
5513    // This triggers Normal→Warning, stopping the subagent.
5514    model.send_last_completion_stream_text_chunk("partial work");
5515    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5516        TokenUsage {
5517            input_tokens: 850_000,
5518            output_tokens: 0,
5519            cache_creation_input_tokens: 0,
5520            cache_read_input_tokens: 0,
5521        },
5522    ));
5523
5524    cx.run_until_parked();
5525
5526    // Verify the first turn was stopped with a context window warning
5527    thread.read_with(cx, |thread, cx| {
5528        assert!(
5529            thread.running_subagent_ids(cx).is_empty(),
5530            "subagent should be stopped after context window warning"
5531        );
5532    });
5533
5534    // Parent model responds to complete first turn
5535    model.send_last_completion_stream_text_chunk("First response");
5536    model.end_last_completion_stream();
5537
5538    send.await.unwrap();
5539
5540    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5541    assert!(
5542        markdown.contains("nearing the end of its context window"),
5543        "first turn should have context window warning, got:\n{markdown}"
5544    );
5545
5546    // === Second turn: resume the same subagent (now at Warning level) ===
5547    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5548    cx.run_until_parked();
5549    model.send_last_completion_stream_text_chunk("resuming subagent");
5550    let resume_tool_input = SpawnAgentToolInput {
5551        label: "follow-up task".to_string(),
5552        message: "do the follow-up task".to_string(),
5553        session_id: Some(subagent_session_id.clone()),
5554    };
5555    let resume_tool_use = LanguageModelToolUse {
5556        id: "subagent_2".into(),
5557        name: SpawnAgentTool::NAME.into(),
5558        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5559        input: serde_json::to_value(&resume_tool_input).unwrap(),
5560        is_input_complete: true,
5561        thought_signature: None,
5562    };
5563    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5564    model.end_last_completion_stream();
5565
5566    cx.run_until_parked();
5567
5568    // Subagent responds with tokens still at warning level (no worse).
5569    // Since ratio_before_prompt was already Warning, this should NOT
5570    // trigger the context window warning again.
5571    model.send_last_completion_stream_text_chunk("follow-up task response");
5572    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5573        TokenUsage {
5574            input_tokens: 870_000,
5575            output_tokens: 0,
5576            cache_creation_input_tokens: 0,
5577            cache_read_input_tokens: 0,
5578        },
5579    ));
5580    model.end_last_completion_stream();
5581
5582    cx.run_until_parked();
5583
5584    // Parent model responds to complete second turn
5585    model.send_last_completion_stream_text_chunk("Second response");
5586    model.end_last_completion_stream();
5587
5588    send2.await.unwrap();
5589
5590    // The resumed subagent should have completed normally since the ratio
5591    // didn't transition (it was Warning before and stayed at Warning)
5592    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5593    assert!(
5594        markdown.contains("follow-up task response"),
5595        "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5596    );
5597    // The second tool call should NOT have a context window warning
5598    let second_tool_pos = markdown
5599        .find("follow-up task")
5600        .expect("should find follow-up tool call");
5601    let after_second_tool = &markdown[second_tool_pos..];
5602    assert!(
5603        !after_second_tool.contains("nearing the end of its context window"),
5604        "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5605    );
5606}
5607
5608#[gpui::test]
5609async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5610    init_test(cx);
5611    cx.update(|cx| {
5612        LanguageModelRegistry::test(cx);
5613    });
5614    cx.update(|cx| {
5615        cx.update_flags(true, vec!["subagents".to_string()]);
5616    });
5617
5618    let fs = FakeFs::new(cx.executor());
5619    fs.insert_tree(
5620        "/",
5621        json!({
5622            "a": {
5623                "b.md": "Lorem"
5624            }
5625        }),
5626    )
5627    .await;
5628    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5629    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5630    let agent = cx.update(|cx| {
5631        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5632    });
5633    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5634
5635    let acp_thread = cx
5636        .update(|cx| {
5637            connection
5638                .clone()
5639                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5640        })
5641        .await
5642        .unwrap();
5643    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5644    let thread = agent.read_with(cx, |agent, _| {
5645        agent.sessions.get(&session_id).unwrap().thread.clone()
5646    });
5647    let model = Arc::new(FakeLanguageModel::default());
5648
5649    thread.update(cx, |thread, cx| {
5650        thread.set_model(model.clone(), cx);
5651    });
5652    cx.run_until_parked();
5653
5654    // Start the parent turn
5655    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5656    cx.run_until_parked();
5657    model.send_last_completion_stream_text_chunk("spawning subagent");
5658    let subagent_tool_input = SpawnAgentToolInput {
5659        label: "label".to_string(),
5660        message: "subagent task prompt".to_string(),
5661        session_id: None,
5662    };
5663    let subagent_tool_use = LanguageModelToolUse {
5664        id: "subagent_1".into(),
5665        name: SpawnAgentTool::NAME.into(),
5666        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5667        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5668        is_input_complete: true,
5669        thought_signature: None,
5670    };
5671    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5672        subagent_tool_use,
5673    ));
5674    model.end_last_completion_stream();
5675
5676    cx.run_until_parked();
5677
5678    // Verify subagent is running
5679    thread.read_with(cx, |thread, cx| {
5680        assert!(
5681            !thread.running_subagent_ids(cx).is_empty(),
5682            "subagent should be running"
5683        );
5684    });
5685
5686    // The subagent's model returns a non-retryable error
5687    model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5688        tokens: None,
5689    });
5690
5691    cx.run_until_parked();
5692
5693    // The subagent should no longer be running
5694    thread.read_with(cx, |thread, cx| {
5695        assert!(
5696            thread.running_subagent_ids(cx).is_empty(),
5697            "subagent should not be running after error"
5698        );
5699    });
5700
5701    // The parent model should get a new completion request to respond to the tool error
5702    model.send_last_completion_stream_text_chunk("Response after error");
5703    model.end_last_completion_stream();
5704
5705    send.await.unwrap();
5706
5707    // Verify the parent thread shows the error in the tool call
5708    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5709    assert!(
5710        markdown.contains("Status: Failed"),
5711        "tool call should have Failed status after model error, got:\n{markdown}"
5712    );
5713}
5714
5715#[gpui::test]
5716async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5717    init_test(cx);
5718
5719    let fs = FakeFs::new(cx.executor());
5720    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5721        .await;
5722    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5723
5724    cx.update(|cx| {
5725        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5726        settings.tool_permissions.tools.insert(
5727            EditFileTool::NAME.into(),
5728            agent_settings::ToolRules {
5729                default: Some(settings::ToolPermissionMode::Allow),
5730                always_allow: vec![],
5731                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5732                always_confirm: vec![],
5733                invalid_patterns: vec![],
5734            },
5735        );
5736        agent_settings::AgentSettings::override_global(settings, cx);
5737    });
5738
5739    let context_server_registry =
5740        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5741    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5742    let templates = crate::Templates::new();
5743    let thread = cx.new(|cx| {
5744        crate::Thread::new(
5745            project.clone(),
5746            cx.new(|_cx| prompt_store::ProjectContext::default()),
5747            context_server_registry,
5748            templates.clone(),
5749            None,
5750            cx,
5751        )
5752    });
5753
5754    #[allow(clippy::arc_with_non_send_sync)]
5755    let tool = Arc::new(crate::EditFileTool::new(
5756        project.clone(),
5757        thread.downgrade(),
5758        language_registry,
5759        templates,
5760    ));
5761    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5762
5763    let task = cx.update(|cx| {
5764        tool.run(
5765            ToolInput::resolved(crate::EditFileToolInput {
5766                display_description: "Edit sensitive file".to_string(),
5767                path: "root/sensitive_config.txt".into(),
5768                mode: crate::EditFileMode::Edit,
5769            }),
5770            event_stream,
5771            cx,
5772        )
5773    });
5774
5775    let result = task.await;
5776    assert!(result.is_err(), "expected edit to be blocked");
5777    assert!(
5778        result.unwrap_err().to_string().contains("blocked"),
5779        "error should mention the edit was blocked"
5780    );
5781}
5782
5783#[gpui::test]
5784async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5785    init_test(cx);
5786
5787    let fs = FakeFs::new(cx.executor());
5788    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5789        .await;
5790    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5791
5792    cx.update(|cx| {
5793        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5794        settings.tool_permissions.tools.insert(
5795            DeletePathTool::NAME.into(),
5796            agent_settings::ToolRules {
5797                default: Some(settings::ToolPermissionMode::Allow),
5798                always_allow: vec![],
5799                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5800                always_confirm: vec![],
5801                invalid_patterns: vec![],
5802            },
5803        );
5804        agent_settings::AgentSettings::override_global(settings, cx);
5805    });
5806
5807    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5808
5809    #[allow(clippy::arc_with_non_send_sync)]
5810    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5811    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5812
5813    let task = cx.update(|cx| {
5814        tool.run(
5815            ToolInput::resolved(crate::DeletePathToolInput {
5816                path: "root/important_data.txt".to_string(),
5817            }),
5818            event_stream,
5819            cx,
5820        )
5821    });
5822
5823    let result = task.await;
5824    assert!(result.is_err(), "expected deletion to be blocked");
5825    assert!(
5826        result.unwrap_err().contains("blocked"),
5827        "error should mention the deletion was blocked"
5828    );
5829}
5830
5831#[gpui::test]
5832async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5833    init_test(cx);
5834
5835    let fs = FakeFs::new(cx.executor());
5836    fs.insert_tree(
5837        "/root",
5838        json!({
5839            "safe.txt": "content",
5840            "protected": {}
5841        }),
5842    )
5843    .await;
5844    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5845
5846    cx.update(|cx| {
5847        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5848        settings.tool_permissions.tools.insert(
5849            MovePathTool::NAME.into(),
5850            agent_settings::ToolRules {
5851                default: Some(settings::ToolPermissionMode::Allow),
5852                always_allow: vec![],
5853                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5854                always_confirm: vec![],
5855                invalid_patterns: vec![],
5856            },
5857        );
5858        agent_settings::AgentSettings::override_global(settings, cx);
5859    });
5860
5861    #[allow(clippy::arc_with_non_send_sync)]
5862    let tool = Arc::new(crate::MovePathTool::new(project));
5863    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5864
5865    let task = cx.update(|cx| {
5866        tool.run(
5867            ToolInput::resolved(crate::MovePathToolInput {
5868                source_path: "root/safe.txt".to_string(),
5869                destination_path: "root/protected/safe.txt".to_string(),
5870            }),
5871            event_stream,
5872            cx,
5873        )
5874    });
5875
5876    let result = task.await;
5877    assert!(
5878        result.is_err(),
5879        "expected move to be blocked due to destination path"
5880    );
5881    assert!(
5882        result.unwrap_err().contains("blocked"),
5883        "error should mention the move was blocked"
5884    );
5885}
5886
5887#[gpui::test]
5888async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5889    init_test(cx);
5890
5891    let fs = FakeFs::new(cx.executor());
5892    fs.insert_tree(
5893        "/root",
5894        json!({
5895            "secret.txt": "secret content",
5896            "public": {}
5897        }),
5898    )
5899    .await;
5900    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5901
5902    cx.update(|cx| {
5903        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5904        settings.tool_permissions.tools.insert(
5905            MovePathTool::NAME.into(),
5906            agent_settings::ToolRules {
5907                default: Some(settings::ToolPermissionMode::Allow),
5908                always_allow: vec![],
5909                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5910                always_confirm: vec![],
5911                invalid_patterns: vec![],
5912            },
5913        );
5914        agent_settings::AgentSettings::override_global(settings, cx);
5915    });
5916
5917    #[allow(clippy::arc_with_non_send_sync)]
5918    let tool = Arc::new(crate::MovePathTool::new(project));
5919    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5920
5921    let task = cx.update(|cx| {
5922        tool.run(
5923            ToolInput::resolved(crate::MovePathToolInput {
5924                source_path: "root/secret.txt".to_string(),
5925                destination_path: "root/public/not_secret.txt".to_string(),
5926            }),
5927            event_stream,
5928            cx,
5929        )
5930    });
5931
5932    let result = task.await;
5933    assert!(
5934        result.is_err(),
5935        "expected move to be blocked due to source path"
5936    );
5937    assert!(
5938        result.unwrap_err().contains("blocked"),
5939        "error should mention the move was blocked"
5940    );
5941}
5942
5943#[gpui::test]
5944async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5945    init_test(cx);
5946
5947    let fs = FakeFs::new(cx.executor());
5948    fs.insert_tree(
5949        "/root",
5950        json!({
5951            "confidential.txt": "confidential data",
5952            "dest": {}
5953        }),
5954    )
5955    .await;
5956    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5957
5958    cx.update(|cx| {
5959        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5960        settings.tool_permissions.tools.insert(
5961            CopyPathTool::NAME.into(),
5962            agent_settings::ToolRules {
5963                default: Some(settings::ToolPermissionMode::Allow),
5964                always_allow: vec![],
5965                always_deny: vec![
5966                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5967                ],
5968                always_confirm: vec![],
5969                invalid_patterns: vec![],
5970            },
5971        );
5972        agent_settings::AgentSettings::override_global(settings, cx);
5973    });
5974
5975    #[allow(clippy::arc_with_non_send_sync)]
5976    let tool = Arc::new(crate::CopyPathTool::new(project));
5977    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5978
5979    let task = cx.update(|cx| {
5980        tool.run(
5981            ToolInput::resolved(crate::CopyPathToolInput {
5982                source_path: "root/confidential.txt".to_string(),
5983                destination_path: "root/dest/copy.txt".to_string(),
5984            }),
5985            event_stream,
5986            cx,
5987        )
5988    });
5989
5990    let result = task.await;
5991    assert!(result.is_err(), "expected copy to be blocked");
5992    assert!(
5993        result.unwrap_err().contains("blocked"),
5994        "error should mention the copy was blocked"
5995    );
5996}
5997
5998#[gpui::test]
5999async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
6000    init_test(cx);
6001
6002    let fs = FakeFs::new(cx.executor());
6003    fs.insert_tree(
6004        "/root",
6005        json!({
6006            "normal.txt": "normal content",
6007            "readonly": {
6008                "config.txt": "readonly content"
6009            }
6010        }),
6011    )
6012    .await;
6013    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6014
6015    cx.update(|cx| {
6016        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6017        settings.tool_permissions.tools.insert(
6018            SaveFileTool::NAME.into(),
6019            agent_settings::ToolRules {
6020                default: Some(settings::ToolPermissionMode::Allow),
6021                always_allow: vec![],
6022                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6023                always_confirm: vec![],
6024                invalid_patterns: vec![],
6025            },
6026        );
6027        agent_settings::AgentSettings::override_global(settings, cx);
6028    });
6029
6030    #[allow(clippy::arc_with_non_send_sync)]
6031    let tool = Arc::new(crate::SaveFileTool::new(project));
6032    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6033
6034    let task = cx.update(|cx| {
6035        tool.run(
6036            ToolInput::resolved(crate::SaveFileToolInput {
6037                paths: vec![
6038                    std::path::PathBuf::from("root/normal.txt"),
6039                    std::path::PathBuf::from("root/readonly/config.txt"),
6040                ],
6041            }),
6042            event_stream,
6043            cx,
6044        )
6045    });
6046
6047    let result = task.await;
6048    assert!(
6049        result.is_err(),
6050        "expected save to be blocked due to denied path"
6051    );
6052    assert!(
6053        result.unwrap_err().contains("blocked"),
6054        "error should mention the save was blocked"
6055    );
6056}
6057
6058#[gpui::test]
6059async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6060    init_test(cx);
6061
6062    let fs = FakeFs::new(cx.executor());
6063    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6064        .await;
6065    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6066
6067    cx.update(|cx| {
6068        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6069        settings.tool_permissions.tools.insert(
6070            SaveFileTool::NAME.into(),
6071            agent_settings::ToolRules {
6072                default: Some(settings::ToolPermissionMode::Allow),
6073                always_allow: vec![],
6074                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6075                always_confirm: vec![],
6076                invalid_patterns: vec![],
6077            },
6078        );
6079        agent_settings::AgentSettings::override_global(settings, cx);
6080    });
6081
6082    #[allow(clippy::arc_with_non_send_sync)]
6083    let tool = Arc::new(crate::SaveFileTool::new(project));
6084    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6085
6086    let task = cx.update(|cx| {
6087        tool.run(
6088            ToolInput::resolved(crate::SaveFileToolInput {
6089                paths: vec![std::path::PathBuf::from("root/config.secret")],
6090            }),
6091            event_stream,
6092            cx,
6093        )
6094    });
6095
6096    let result = task.await;
6097    assert!(result.is_err(), "expected save to be blocked");
6098    assert!(
6099        result.unwrap_err().contains("blocked"),
6100        "error should mention the save was blocked"
6101    );
6102}
6103
6104#[gpui::test]
6105async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6106    init_test(cx);
6107
6108    cx.update(|cx| {
6109        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6110        settings.tool_permissions.tools.insert(
6111            WebSearchTool::NAME.into(),
6112            agent_settings::ToolRules {
6113                default: Some(settings::ToolPermissionMode::Allow),
6114                always_allow: vec![],
6115                always_deny: vec![
6116                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6117                ],
6118                always_confirm: vec![],
6119                invalid_patterns: vec![],
6120            },
6121        );
6122        agent_settings::AgentSettings::override_global(settings, cx);
6123    });
6124
6125    #[allow(clippy::arc_with_non_send_sync)]
6126    let tool = Arc::new(crate::WebSearchTool);
6127    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6128
6129    let input: crate::WebSearchToolInput =
6130        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6131
6132    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6133
6134    let result = task.await;
6135    assert!(result.is_err(), "expected search to be blocked");
6136    match result.unwrap_err() {
6137        crate::WebSearchToolOutput::Error { error } => {
6138            assert!(
6139                error.contains("blocked"),
6140                "error should mention the search was blocked"
6141            );
6142        }
6143        other => panic!("expected Error variant, got: {other:?}"),
6144    }
6145}
6146
6147#[gpui::test]
6148async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6149    init_test(cx);
6150
6151    let fs = FakeFs::new(cx.executor());
6152    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6153        .await;
6154    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6155
6156    cx.update(|cx| {
6157        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6158        settings.tool_permissions.tools.insert(
6159            EditFileTool::NAME.into(),
6160            agent_settings::ToolRules {
6161                default: Some(settings::ToolPermissionMode::Confirm),
6162                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6163                always_deny: vec![],
6164                always_confirm: vec![],
6165                invalid_patterns: vec![],
6166            },
6167        );
6168        agent_settings::AgentSettings::override_global(settings, cx);
6169    });
6170
6171    let context_server_registry =
6172        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6173    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6174    let templates = crate::Templates::new();
6175    let thread = cx.new(|cx| {
6176        crate::Thread::new(
6177            project.clone(),
6178            cx.new(|_cx| prompt_store::ProjectContext::default()),
6179            context_server_registry,
6180            templates.clone(),
6181            None,
6182            cx,
6183        )
6184    });
6185
6186    #[allow(clippy::arc_with_non_send_sync)]
6187    let tool = Arc::new(crate::EditFileTool::new(
6188        project,
6189        thread.downgrade(),
6190        language_registry,
6191        templates,
6192    ));
6193    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6194
6195    let _task = cx.update(|cx| {
6196        tool.run(
6197            ToolInput::resolved(crate::EditFileToolInput {
6198                display_description: "Edit README".to_string(),
6199                path: "root/README.md".into(),
6200                mode: crate::EditFileMode::Edit,
6201            }),
6202            event_stream,
6203            cx,
6204        )
6205    });
6206
6207    cx.run_until_parked();
6208
6209    let event = rx.try_next();
6210    assert!(
6211        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6212        "expected no authorization request for allowed .md file"
6213    );
6214}
6215
6216#[gpui::test]
6217async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6218    init_test(cx);
6219
6220    let fs = FakeFs::new(cx.executor());
6221    fs.insert_tree(
6222        "/root",
6223        json!({
6224            ".zed": {
6225                "settings.json": "{}"
6226            },
6227            "README.md": "# Hello"
6228        }),
6229    )
6230    .await;
6231    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6232
6233    cx.update(|cx| {
6234        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6235        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6236        agent_settings::AgentSettings::override_global(settings, cx);
6237    });
6238
6239    let context_server_registry =
6240        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6241    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6242    let templates = crate::Templates::new();
6243    let thread = cx.new(|cx| {
6244        crate::Thread::new(
6245            project.clone(),
6246            cx.new(|_cx| prompt_store::ProjectContext::default()),
6247            context_server_registry,
6248            templates.clone(),
6249            None,
6250            cx,
6251        )
6252    });
6253
6254    #[allow(clippy::arc_with_non_send_sync)]
6255    let tool = Arc::new(crate::EditFileTool::new(
6256        project,
6257        thread.downgrade(),
6258        language_registry,
6259        templates,
6260    ));
6261
6262    // Editing a file inside .zed/ should still prompt even with global default: allow,
6263    // because local settings paths are sensitive and require confirmation regardless.
6264    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6265    let _task = cx.update(|cx| {
6266        tool.run(
6267            ToolInput::resolved(crate::EditFileToolInput {
6268                display_description: "Edit local settings".to_string(),
6269                path: "root/.zed/settings.json".into(),
6270                mode: crate::EditFileMode::Edit,
6271            }),
6272            event_stream,
6273            cx,
6274        )
6275    });
6276
6277    let _update = rx.expect_update_fields().await;
6278    let _auth = rx.expect_authorization().await;
6279}
6280
6281#[gpui::test]
6282async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6283    init_test(cx);
6284
6285    cx.update(|cx| {
6286        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6287        settings.tool_permissions.tools.insert(
6288            FetchTool::NAME.into(),
6289            agent_settings::ToolRules {
6290                default: Some(settings::ToolPermissionMode::Allow),
6291                always_allow: vec![],
6292                always_deny: vec![
6293                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6294                ],
6295                always_confirm: vec![],
6296                invalid_patterns: vec![],
6297            },
6298        );
6299        agent_settings::AgentSettings::override_global(settings, cx);
6300    });
6301
6302    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6303
6304    #[allow(clippy::arc_with_non_send_sync)]
6305    let tool = Arc::new(crate::FetchTool::new(http_client));
6306    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6307
6308    let input: crate::FetchToolInput =
6309        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6310
6311    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6312
6313    let result = task.await;
6314    assert!(result.is_err(), "expected fetch to be blocked");
6315    assert!(
6316        result.unwrap_err().contains("blocked"),
6317        "error should mention the fetch was blocked"
6318    );
6319}
6320
6321#[gpui::test]
6322async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6323    init_test(cx);
6324
6325    cx.update(|cx| {
6326        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6327        settings.tool_permissions.tools.insert(
6328            FetchTool::NAME.into(),
6329            agent_settings::ToolRules {
6330                default: Some(settings::ToolPermissionMode::Confirm),
6331                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6332                always_deny: vec![],
6333                always_confirm: vec![],
6334                invalid_patterns: vec![],
6335            },
6336        );
6337        agent_settings::AgentSettings::override_global(settings, cx);
6338    });
6339
6340    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6341
6342    #[allow(clippy::arc_with_non_send_sync)]
6343    let tool = Arc::new(crate::FetchTool::new(http_client));
6344    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6345
6346    let input: crate::FetchToolInput =
6347        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6348
6349    let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6350
6351    cx.run_until_parked();
6352
6353    let event = rx.try_next();
6354    assert!(
6355        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6356        "expected no authorization request for allowed docs.rs URL"
6357    );
6358}
6359
6360#[gpui::test]
6361async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6362    init_test(cx);
6363    always_allow_tools(cx);
6364
6365    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6366    let fake_model = model.as_fake();
6367
6368    // Add a tool so we can simulate tool calls
6369    thread.update(cx, |thread, _cx| {
6370        thread.add_tool(EchoTool);
6371    });
6372
6373    // Start a turn by sending a message
6374    let mut events = thread
6375        .update(cx, |thread, cx| {
6376            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6377        })
6378        .unwrap();
6379    cx.run_until_parked();
6380
6381    // Simulate the model making a tool call
6382    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6383        LanguageModelToolUse {
6384            id: "tool_1".into(),
6385            name: "echo".into(),
6386            raw_input: r#"{"text": "hello"}"#.into(),
6387            input: json!({"text": "hello"}),
6388            is_input_complete: true,
6389            thought_signature: None,
6390        },
6391    ));
6392    fake_model
6393        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6394
6395    // Signal that a message is queued before ending the stream
6396    thread.update(cx, |thread, _cx| {
6397        thread.set_has_queued_message(true);
6398    });
6399
6400    // Now end the stream - tool will run, and the boundary check should see the queue
6401    fake_model.end_last_completion_stream();
6402
6403    // Collect all events until the turn stops
6404    let all_events = collect_events_until_stop(&mut events, cx).await;
6405
6406    // Verify we received the tool call event
6407    let tool_call_ids: Vec<_> = all_events
6408        .iter()
6409        .filter_map(|e| match e {
6410            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6411            _ => None,
6412        })
6413        .collect();
6414    assert_eq!(
6415        tool_call_ids,
6416        vec!["tool_1"],
6417        "Should have received a tool call event for our echo tool"
6418    );
6419
6420    // The turn should have stopped with EndTurn
6421    let stop_reasons = stop_events(all_events);
6422    assert_eq!(
6423        stop_reasons,
6424        vec![acp::StopReason::EndTurn],
6425        "Turn should have ended after tool completion due to queued message"
6426    );
6427
6428    // Verify the queued message flag is still set
6429    thread.update(cx, |thread, _cx| {
6430        assert!(
6431            thread.has_queued_message(),
6432            "Should still have queued message flag set"
6433        );
6434    });
6435
6436    // Thread should be idle now
6437    thread.update(cx, |thread, _cx| {
6438        assert!(
6439            thread.is_turn_complete(),
6440            "Thread should not be running after turn ends"
6441        );
6442    });
6443}
6444
6445#[gpui::test]
6446async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6447    init_test(cx);
6448    always_allow_tools(cx);
6449
6450    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6451    let fake_model = model.as_fake();
6452
6453    thread.update(cx, |thread, _cx| {
6454        thread.add_tool(StreamingFailingEchoTool {
6455            receive_chunks_until_failure: 1,
6456        });
6457    });
6458
6459    let _events = thread
6460        .update(cx, |thread, cx| {
6461            thread.send(
6462                UserMessageId::new(),
6463                ["Use the streaming_failing_echo tool"],
6464                cx,
6465            )
6466        })
6467        .unwrap();
6468    cx.run_until_parked();
6469
6470    let tool_use = LanguageModelToolUse {
6471        id: "call_1".into(),
6472        name: StreamingFailingEchoTool::NAME.into(),
6473        raw_input: "hello".into(),
6474        input: json!({}),
6475        is_input_complete: false,
6476        thought_signature: None,
6477    };
6478
6479    fake_model
6480        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6481
6482    cx.run_until_parked();
6483
6484    let completions = fake_model.pending_completions();
6485    let last_completion = completions.last().unwrap();
6486
6487    assert_eq!(
6488        last_completion.messages[1..],
6489        vec![
6490            LanguageModelRequestMessage {
6491                role: Role::User,
6492                content: vec!["Use the streaming_failing_echo tool".into()],
6493                cache: false,
6494                reasoning_details: None,
6495            },
6496            LanguageModelRequestMessage {
6497                role: Role::Assistant,
6498                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6499                cache: false,
6500                reasoning_details: None,
6501            },
6502            LanguageModelRequestMessage {
6503                role: Role::User,
6504                content: vec![language_model::MessageContent::ToolResult(
6505                    LanguageModelToolResult {
6506                        tool_use_id: tool_use.id.clone(),
6507                        tool_name: tool_use.name,
6508                        is_error: true,
6509                        content: "failed".into(),
6510                        output: Some("failed".into()),
6511                    }
6512                )],
6513                cache: true,
6514                reasoning_details: None,
6515            },
6516        ]
6517    );
6518}
6519
6520#[gpui::test]
6521async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6522    init_test(cx);
6523    always_allow_tools(cx);
6524
6525    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6526    let fake_model = model.as_fake();
6527
6528    let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6529        oneshot::channel();
6530
6531    thread.update(cx, |thread, _cx| {
6532        thread.add_tool(
6533            StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6534        );
6535        thread.add_tool(StreamingFailingEchoTool {
6536            receive_chunks_until_failure: 1,
6537        });
6538    });
6539
6540    let _events = thread
6541        .update(cx, |thread, cx| {
6542            thread.send(
6543                UserMessageId::new(),
6544                ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6545                cx,
6546            )
6547        })
6548        .unwrap();
6549    cx.run_until_parked();
6550
6551    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6552        LanguageModelToolUse {
6553            id: "call_1".into(),
6554            name: StreamingEchoTool::NAME.into(),
6555            raw_input: "hello".into(),
6556            input: json!({ "text": "hello" }),
6557            is_input_complete: false,
6558            thought_signature: None,
6559        },
6560    ));
6561    let first_tool_use = LanguageModelToolUse {
6562        id: "call_1".into(),
6563        name: StreamingEchoTool::NAME.into(),
6564        raw_input: "hello world".into(),
6565        input: json!({ "text": "hello world" }),
6566        is_input_complete: true,
6567        thought_signature: None,
6568    };
6569    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6570        first_tool_use.clone(),
6571    ));
6572    let second_tool_use = LanguageModelToolUse {
6573        name: StreamingFailingEchoTool::NAME.into(),
6574        raw_input: "hello".into(),
6575        input: json!({ "text": "hello" }),
6576        is_input_complete: false,
6577        thought_signature: None,
6578        id: "call_2".into(),
6579    };
6580    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6581        second_tool_use.clone(),
6582    ));
6583
6584    cx.run_until_parked();
6585
6586    complete_streaming_echo_tool_call_tx.send(()).unwrap();
6587
6588    cx.run_until_parked();
6589
6590    let completions = fake_model.pending_completions();
6591    let last_completion = completions.last().unwrap();
6592
6593    assert_eq!(
6594        last_completion.messages[1..],
6595        vec![
6596            LanguageModelRequestMessage {
6597                role: Role::User,
6598                content: vec![
6599                    "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6600                ],
6601                cache: false,
6602                reasoning_details: None,
6603            },
6604            LanguageModelRequestMessage {
6605                role: Role::Assistant,
6606                content: vec![
6607                    language_model::MessageContent::ToolUse(first_tool_use.clone()),
6608                    language_model::MessageContent::ToolUse(second_tool_use.clone())
6609                ],
6610                cache: false,
6611                reasoning_details: None,
6612            },
6613            LanguageModelRequestMessage {
6614                role: Role::User,
6615                content: vec![
6616                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6617                        tool_use_id: second_tool_use.id.clone(),
6618                        tool_name: second_tool_use.name,
6619                        is_error: true,
6620                        content: "failed".into(),
6621                        output: Some("failed".into()),
6622                    }),
6623                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6624                        tool_use_id: first_tool_use.id.clone(),
6625                        tool_name: first_tool_use.name,
6626                        is_error: false,
6627                        content: "hello world".into(),
6628                        output: Some("hello world".into()),
6629                    }),
6630                ],
6631                cache: true,
6632                reasoning_details: None,
6633            },
6634        ]
6635    );
6636}
6637
6638#[gpui::test]
6639async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6640    let ThreadTest {
6641        model, thread, fs, ..
6642    } = setup(cx, TestModel::Fake).await;
6643    let fake_model_a = model.as_fake();
6644
6645    thread.update(cx, |thread, _cx| {
6646        thread.add_tool(EchoTool);
6647        thread.add_tool(DelayTool);
6648    });
6649
6650    // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6651    fs.insert_file(
6652        paths::settings_file(),
6653        json!({
6654            "agent": {
6655                "profiles": {
6656                    "profile-a": {
6657                        "name": "Profile A",
6658                        "tools": {
6659                            EchoTool::NAME: true,
6660                            DelayTool::NAME: true,
6661                        }
6662                    },
6663                    "profile-b": {
6664                        "name": "Profile B",
6665                        "tools": {
6666                            DelayTool::NAME: true,
6667                        }
6668                    }
6669                }
6670            }
6671        })
6672        .to_string()
6673        .into_bytes(),
6674    )
6675    .await;
6676    cx.run_until_parked();
6677
6678    thread.update(cx, |thread, cx| {
6679        thread.set_profile(AgentProfileId("profile-a".into()), cx);
6680        thread.set_thinking_enabled(false, cx);
6681    });
6682
6683    // Send a message — first iteration starts with model A, profile-a, thinking off.
6684    thread
6685        .update(cx, |thread, cx| {
6686            thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6687        })
6688        .unwrap();
6689    cx.run_until_parked();
6690
6691    // Verify first request has both tools and thinking disabled.
6692    let completions = fake_model_a.pending_completions();
6693    assert_eq!(completions.len(), 1);
6694    let first_tools = tool_names_for_completion(&completions[0]);
6695    assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6696    assert!(!completions[0].thinking_allowed);
6697
6698    // Model A responds with an echo tool call.
6699    fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6700        LanguageModelToolUse {
6701            id: "tool_1".into(),
6702            name: "echo".into(),
6703            raw_input: r#"{"text":"hello"}"#.into(),
6704            input: json!({"text": "hello"}),
6705            is_input_complete: true,
6706            thought_signature: None,
6707        },
6708    ));
6709    fake_model_a.end_last_completion_stream();
6710
6711    // Before the next iteration runs, switch to profile-b (only DelayTool),
6712    // swap in a new model, and enable thinking.
6713    let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6714        "test-provider",
6715        "model-b",
6716        "Model B",
6717        true,
6718    ));
6719    thread.update(cx, |thread, cx| {
6720        thread.set_profile(AgentProfileId("profile-b".into()), cx);
6721        thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6722        thread.set_thinking_enabled(true, cx);
6723    });
6724
6725    // Run until parked — processes the echo tool call, loops back, picks up
6726    // the new model/profile/thinking, and makes a second request to model B.
6727    cx.run_until_parked();
6728
6729    // The second request should have gone to model B.
6730    let model_b_completions = fake_model_b.pending_completions();
6731    assert_eq!(
6732        model_b_completions.len(),
6733        1,
6734        "second request should go to model B"
6735    );
6736
6737    // Profile-b only has DelayTool, so echo should be gone.
6738    let second_tools = tool_names_for_completion(&model_b_completions[0]);
6739    assert_eq!(second_tools, vec![DelayTool::NAME]);
6740
6741    // Thinking should now be enabled.
6742    assert!(model_b_completions[0].thinking_allowed);
6743}