mod.rs

   1use super::*;
   2use acp_thread::{
   3    AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
   4    UserMessageId,
   5};
   6use agent_client_protocol::{self as acp};
   7use agent_settings::AgentProfileId;
   8use anyhow::Result;
   9use client::{Client, UserStore};
  10use cloud_llm_client::CompletionIntent;
  11use collections::IndexMap;
  12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  13use feature_flags::FeatureFlagAppExt as _;
  14use fs::{FakeFs, Fs};
  15use futures::{
  16    FutureExt as _, StreamExt,
  17    channel::{
  18        mpsc::{self, UnboundedReceiver},
  19        oneshot,
  20    },
  21    future::{Fuse, Shared},
  22};
  23use gpui::{
  24    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  25    http_client::FakeHttpClient,
  26};
  27use indoc::indoc;
  28use language_model::{
  29    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  30    LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  31    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  32    LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
  33    fake_provider::FakeLanguageModel,
  34};
  35use pretty_assertions::assert_eq;
  36use project::{
  37    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  38};
  39use prompt_store::ProjectContext;
  40use reqwest_client::ReqwestClient;
  41use schemars::JsonSchema;
  42use serde::{Deserialize, Serialize};
  43use serde_json::json;
  44use settings::{Settings, SettingsStore};
  45use std::{
  46    path::Path,
  47    pin::Pin,
  48    rc::Rc,
  49    sync::{
  50        Arc,
  51        atomic::{AtomicBool, AtomicUsize, Ordering},
  52    },
  53    time::Duration,
  54};
  55use util::path;
  56
  57mod edit_file_thread_test;
  58mod test_tools;
  59use test_tools::*;
  60
  61pub(crate) fn init_test(cx: &mut TestAppContext) {
  62    cx.update(|cx| {
  63        let settings_store = SettingsStore::test(cx);
  64        cx.set_global(settings_store);
  65    });
  66}
  67
  68pub(crate) struct FakeTerminalHandle {
  69    killed: Arc<AtomicBool>,
  70    stopped_by_user: Arc<AtomicBool>,
  71    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  72    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  73    output: acp::TerminalOutputResponse,
  74    id: acp::TerminalId,
  75}
  76
  77impl FakeTerminalHandle {
  78    pub(crate) fn new_never_exits(cx: &mut App) -> Self {
  79        let killed = Arc::new(AtomicBool::new(false));
  80        let stopped_by_user = Arc::new(AtomicBool::new(false));
  81
  82        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  83
  84        let wait_for_exit = cx
  85            .spawn(async move |_cx| {
  86                // Wait for the exit signal (sent when kill() is called)
  87                let _ = exit_receiver.await;
  88                acp::TerminalExitStatus::new()
  89            })
  90            .shared();
  91
  92        Self {
  93            killed,
  94            stopped_by_user,
  95            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  96            wait_for_exit,
  97            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  98            id: acp::TerminalId::new("fake_terminal".to_string()),
  99        }
 100    }
 101
 102    pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
 103        let killed = Arc::new(AtomicBool::new(false));
 104        let stopped_by_user = Arc::new(AtomicBool::new(false));
 105        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 106
 107        let wait_for_exit = cx
 108            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 109            .shared();
 110
 111        Self {
 112            killed,
 113            stopped_by_user,
 114            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 115            wait_for_exit,
 116            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 117            id: acp::TerminalId::new("fake_terminal".to_string()),
 118        }
 119    }
 120
 121    pub(crate) fn was_killed(&self) -> bool {
 122        self.killed.load(Ordering::SeqCst)
 123    }
 124
 125    pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
 126        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 127    }
 128
 129    pub(crate) fn signal_exit(&self) {
 130        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 131            let _ = sender.send(());
 132        }
 133    }
 134}
 135
 136impl crate::TerminalHandle for FakeTerminalHandle {
 137    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 138        Ok(self.id.clone())
 139    }
 140
 141    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 142        Ok(self.output.clone())
 143    }
 144
 145    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 146        Ok(self.wait_for_exit.clone())
 147    }
 148
 149    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 150        self.killed.store(true, Ordering::SeqCst);
 151        self.signal_exit();
 152        Ok(())
 153    }
 154
 155    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 156        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 157    }
 158}
 159
 160struct FakeSubagentHandle {
 161    session_id: acp::SessionId,
 162    send_task: Shared<Task<String>>,
 163}
 164
 165impl SubagentHandle for FakeSubagentHandle {
 166    fn id(&self) -> acp::SessionId {
 167        self.session_id.clone()
 168    }
 169
 170    fn num_entries(&self, _cx: &App) -> usize {
 171        unimplemented!()
 172    }
 173
 174    fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
 175        let task = self.send_task.clone();
 176        cx.background_spawn(async move { Ok(task.await) })
 177    }
 178}
 179
 180#[derive(Default)]
 181pub(crate) struct FakeThreadEnvironment {
 182    terminal_handle: Option<Rc<FakeTerminalHandle>>,
 183    subagent_handle: Option<Rc<FakeSubagentHandle>>,
 184    terminal_creations: Arc<AtomicUsize>,
 185}
 186
 187impl FakeThreadEnvironment {
 188    pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
 189        Self {
 190            terminal_handle: Some(terminal_handle.into()),
 191            ..self
 192        }
 193    }
 194
 195    pub(crate) fn terminal_creation_count(&self) -> usize {
 196        self.terminal_creations.load(Ordering::SeqCst)
 197    }
 198}
 199
 200impl crate::ThreadEnvironment for FakeThreadEnvironment {
 201    fn create_terminal(
 202        &self,
 203        _command: String,
 204        _cwd: Option<std::path::PathBuf>,
 205        _output_byte_limit: Option<u64>,
 206        _cx: &mut AsyncApp,
 207    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 208        self.terminal_creations.fetch_add(1, Ordering::SeqCst);
 209        let handle = self
 210            .terminal_handle
 211            .clone()
 212            .expect("Terminal handle not available on FakeThreadEnvironment");
 213        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 214    }
 215
 216    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 217        Ok(self
 218            .subagent_handle
 219            .clone()
 220            .expect("Subagent handle not available on FakeThreadEnvironment")
 221            as Rc<dyn SubagentHandle>)
 222    }
 223}
 224
 225/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 226struct MultiTerminalEnvironment {
 227    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 228}
 229
 230impl MultiTerminalEnvironment {
 231    fn new() -> Self {
 232        Self {
 233            handles: std::cell::RefCell::new(Vec::new()),
 234        }
 235    }
 236
 237    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 238        self.handles.borrow().clone()
 239    }
 240}
 241
 242impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 243    fn create_terminal(
 244        &self,
 245        _command: String,
 246        _cwd: Option<std::path::PathBuf>,
 247        _output_byte_limit: Option<u64>,
 248        cx: &mut AsyncApp,
 249    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 250        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 251        self.handles.borrow_mut().push(handle.clone());
 252        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 253    }
 254
 255    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 256        unimplemented!()
 257    }
 258}
 259
 260fn always_allow_tools(cx: &mut TestAppContext) {
 261    cx.update(|cx| {
 262        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 263        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
 264        agent_settings::AgentSettings::override_global(settings, cx);
 265    });
 266}
 267
 268#[gpui::test]
 269async fn test_echo(cx: &mut TestAppContext) {
 270    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 271    let fake_model = model.as_fake();
 272
 273    let events = thread
 274        .update(cx, |thread, cx| {
 275            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 276        })
 277        .unwrap();
 278    cx.run_until_parked();
 279    fake_model.send_last_completion_stream_text_chunk("Hello");
 280    fake_model
 281        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 282    fake_model.end_last_completion_stream();
 283
 284    let events = events.collect().await;
 285    thread.update(cx, |thread, _cx| {
 286        assert_eq!(
 287            thread.last_received_or_pending_message().unwrap().role(),
 288            Role::Assistant
 289        );
 290        assert_eq!(
 291            thread
 292                .last_received_or_pending_message()
 293                .unwrap()
 294                .to_markdown(),
 295            "Hello\n"
 296        )
 297    });
 298    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 299}
 300
 301#[gpui::test]
 302async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 303    init_test(cx);
 304    always_allow_tools(cx);
 305
 306    let fs = FakeFs::new(cx.executor());
 307    let project = Project::test(fs, [], cx).await;
 308
 309    let environment = Rc::new(cx.update(|cx| {
 310        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 311    }));
 312    let handle = environment.terminal_handle.clone().unwrap();
 313
 314    #[allow(clippy::arc_with_non_send_sync)]
 315    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 316    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 317
 318    let task = cx.update(|cx| {
 319        tool.run(
 320            ToolInput::resolved(crate::TerminalToolInput {
 321                command: "sleep 1000".to_string(),
 322                cd: ".".to_string(),
 323                timeout_ms: Some(5),
 324            }),
 325            event_stream,
 326            cx,
 327        )
 328    });
 329
 330    let update = rx.expect_update_fields().await;
 331    assert!(
 332        update.content.iter().any(|blocks| {
 333            blocks
 334                .iter()
 335                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 336        }),
 337        "expected tool call update to include terminal content"
 338    );
 339
 340    let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
 341
 342    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 343    loop {
 344        if let Some(result) = task_future.as_mut().now_or_never() {
 345            let result = result.expect("terminal tool task should complete");
 346
 347            assert!(
 348                handle.was_killed(),
 349                "expected terminal handle to be killed on timeout"
 350            );
 351            assert!(
 352                result.contains("partial output"),
 353                "expected result to include terminal output, got: {result}"
 354            );
 355            return;
 356        }
 357
 358        if std::time::Instant::now() >= deadline {
 359            panic!("timed out waiting for terminal tool task to complete");
 360        }
 361
 362        cx.run_until_parked();
 363        cx.background_executor.timer(Duration::from_millis(1)).await;
 364    }
 365}
 366
 367#[gpui::test]
 368#[ignore]
 369async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 370    init_test(cx);
 371    always_allow_tools(cx);
 372
 373    let fs = FakeFs::new(cx.executor());
 374    let project = Project::test(fs, [], cx).await;
 375
 376    let environment = Rc::new(cx.update(|cx| {
 377        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 378    }));
 379    let handle = environment.terminal_handle.clone().unwrap();
 380
 381    #[allow(clippy::arc_with_non_send_sync)]
 382    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 383    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 384
 385    let _task = cx.update(|cx| {
 386        tool.run(
 387            ToolInput::resolved(crate::TerminalToolInput {
 388                command: "sleep 1000".to_string(),
 389                cd: ".".to_string(),
 390                timeout_ms: None,
 391            }),
 392            event_stream,
 393            cx,
 394        )
 395    });
 396
 397    let update = rx.expect_update_fields().await;
 398    assert!(
 399        update.content.iter().any(|blocks| {
 400            blocks
 401                .iter()
 402                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 403        }),
 404        "expected tool call update to include terminal content"
 405    );
 406
 407    cx.background_executor
 408        .timer(Duration::from_millis(25))
 409        .await;
 410
 411    assert!(
 412        !handle.was_killed(),
 413        "did not expect terminal handle to be killed without a timeout"
 414    );
 415}
 416
 417#[gpui::test]
 418async fn test_thinking(cx: &mut TestAppContext) {
 419    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 420    let fake_model = model.as_fake();
 421
 422    let events = thread
 423        .update(cx, |thread, cx| {
 424            thread.send(
 425                UserMessageId::new(),
 426                [indoc! {"
 427                    Testing:
 428
 429                    Generate a thinking step where you just think the word 'Think',
 430                    and have your final answer be 'Hello'
 431                "}],
 432                cx,
 433            )
 434        })
 435        .unwrap();
 436    cx.run_until_parked();
 437    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 438        text: "Think".to_string(),
 439        signature: None,
 440    });
 441    fake_model.send_last_completion_stream_text_chunk("Hello");
 442    fake_model
 443        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 444    fake_model.end_last_completion_stream();
 445
 446    let events = events.collect().await;
 447    thread.update(cx, |thread, _cx| {
 448        assert_eq!(
 449            thread.last_received_or_pending_message().unwrap().role(),
 450            Role::Assistant
 451        );
 452        assert_eq!(
 453            thread
 454                .last_received_or_pending_message()
 455                .unwrap()
 456                .to_markdown(),
 457            indoc! {"
 458                <think>Think</think>
 459                Hello
 460            "}
 461        )
 462    });
 463    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 464}
 465
 466#[gpui::test]
 467async fn test_system_prompt(cx: &mut TestAppContext) {
 468    let ThreadTest {
 469        model,
 470        thread,
 471        project_context,
 472        ..
 473    } = setup(cx, TestModel::Fake).await;
 474    let fake_model = model.as_fake();
 475
 476    project_context.update(cx, |project_context, _cx| {
 477        project_context.shell = "test-shell".into()
 478    });
 479    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 480    thread
 481        .update(cx, |thread, cx| {
 482            thread.send(UserMessageId::new(), ["abc"], cx)
 483        })
 484        .unwrap();
 485    cx.run_until_parked();
 486    let mut pending_completions = fake_model.pending_completions();
 487    assert_eq!(
 488        pending_completions.len(),
 489        1,
 490        "unexpected pending completions: {:?}",
 491        pending_completions
 492    );
 493
 494    let pending_completion = pending_completions.pop().unwrap();
 495    assert_eq!(pending_completion.messages[0].role, Role::System);
 496
 497    let system_message = &pending_completion.messages[0];
 498    let system_prompt = system_message.content[0].to_str().unwrap();
 499    assert!(
 500        system_prompt.contains("test-shell"),
 501        "unexpected system message: {:?}",
 502        system_message
 503    );
 504    assert!(
 505        system_prompt.contains("## Fixing Diagnostics"),
 506        "unexpected system message: {:?}",
 507        system_message
 508    );
 509}
 510
 511#[gpui::test]
 512async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 513    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 514    let fake_model = model.as_fake();
 515
 516    thread
 517        .update(cx, |thread, cx| {
 518            thread.send(UserMessageId::new(), ["abc"], cx)
 519        })
 520        .unwrap();
 521    cx.run_until_parked();
 522    let mut pending_completions = fake_model.pending_completions();
 523    assert_eq!(
 524        pending_completions.len(),
 525        1,
 526        "unexpected pending completions: {:?}",
 527        pending_completions
 528    );
 529
 530    let pending_completion = pending_completions.pop().unwrap();
 531    assert_eq!(pending_completion.messages[0].role, Role::System);
 532
 533    let system_message = &pending_completion.messages[0];
 534    let system_prompt = system_message.content[0].to_str().unwrap();
 535    assert!(
 536        !system_prompt.contains("## Tool Use"),
 537        "unexpected system message: {:?}",
 538        system_message
 539    );
 540    assert!(
 541        !system_prompt.contains("## Fixing Diagnostics"),
 542        "unexpected system message: {:?}",
 543        system_message
 544    );
 545}
 546
 547#[gpui::test]
 548async fn test_prompt_caching(cx: &mut TestAppContext) {
 549    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 550    let fake_model = model.as_fake();
 551
 552    // Send initial user message and verify it's cached
 553    thread
 554        .update(cx, |thread, cx| {
 555            thread.send(UserMessageId::new(), ["Message 1"], cx)
 556        })
 557        .unwrap();
 558    cx.run_until_parked();
 559
 560    let completion = fake_model.pending_completions().pop().unwrap();
 561    assert_eq!(
 562        completion.messages[1..],
 563        vec![LanguageModelRequestMessage {
 564            role: Role::User,
 565            content: vec!["Message 1".into()],
 566            cache: true,
 567            reasoning_details: None,
 568        }]
 569    );
 570    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 571        "Response to Message 1".into(),
 572    ));
 573    fake_model.end_last_completion_stream();
 574    cx.run_until_parked();
 575
 576    // Send another user message and verify only the latest is cached
 577    thread
 578        .update(cx, |thread, cx| {
 579            thread.send(UserMessageId::new(), ["Message 2"], cx)
 580        })
 581        .unwrap();
 582    cx.run_until_parked();
 583
 584    let completion = fake_model.pending_completions().pop().unwrap();
 585    assert_eq!(
 586        completion.messages[1..],
 587        vec![
 588            LanguageModelRequestMessage {
 589                role: Role::User,
 590                content: vec!["Message 1".into()],
 591                cache: false,
 592                reasoning_details: None,
 593            },
 594            LanguageModelRequestMessage {
 595                role: Role::Assistant,
 596                content: vec!["Response to Message 1".into()],
 597                cache: false,
 598                reasoning_details: None,
 599            },
 600            LanguageModelRequestMessage {
 601                role: Role::User,
 602                content: vec!["Message 2".into()],
 603                cache: true,
 604                reasoning_details: None,
 605            }
 606        ]
 607    );
 608    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 609        "Response to Message 2".into(),
 610    ));
 611    fake_model.end_last_completion_stream();
 612    cx.run_until_parked();
 613
 614    // Simulate a tool call and verify that the latest tool result is cached
 615    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 616    thread
 617        .update(cx, |thread, cx| {
 618            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 619        })
 620        .unwrap();
 621    cx.run_until_parked();
 622
 623    let tool_use = LanguageModelToolUse {
 624        id: "tool_1".into(),
 625        name: EchoTool::NAME.into(),
 626        raw_input: json!({"text": "test"}).to_string(),
 627        input: json!({"text": "test"}),
 628        is_input_complete: true,
 629        thought_signature: None,
 630    };
 631    fake_model
 632        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 633    fake_model.end_last_completion_stream();
 634    cx.run_until_parked();
 635
 636    let completion = fake_model.pending_completions().pop().unwrap();
 637    let tool_result = LanguageModelToolResult {
 638        tool_use_id: "tool_1".into(),
 639        tool_name: EchoTool::NAME.into(),
 640        is_error: false,
 641        content: "test".into(),
 642        output: Some("test".into()),
 643    };
 644    assert_eq!(
 645        completion.messages[1..],
 646        vec![
 647            LanguageModelRequestMessage {
 648                role: Role::User,
 649                content: vec!["Message 1".into()],
 650                cache: false,
 651                reasoning_details: None,
 652            },
 653            LanguageModelRequestMessage {
 654                role: Role::Assistant,
 655                content: vec!["Response to Message 1".into()],
 656                cache: false,
 657                reasoning_details: None,
 658            },
 659            LanguageModelRequestMessage {
 660                role: Role::User,
 661                content: vec!["Message 2".into()],
 662                cache: false,
 663                reasoning_details: None,
 664            },
 665            LanguageModelRequestMessage {
 666                role: Role::Assistant,
 667                content: vec!["Response to Message 2".into()],
 668                cache: false,
 669                reasoning_details: None,
 670            },
 671            LanguageModelRequestMessage {
 672                role: Role::User,
 673                content: vec!["Use the echo tool".into()],
 674                cache: false,
 675                reasoning_details: None,
 676            },
 677            LanguageModelRequestMessage {
 678                role: Role::Assistant,
 679                content: vec![MessageContent::ToolUse(tool_use)],
 680                cache: false,
 681                reasoning_details: None,
 682            },
 683            LanguageModelRequestMessage {
 684                role: Role::User,
 685                content: vec![MessageContent::ToolResult(tool_result)],
 686                cache: true,
 687                reasoning_details: None,
 688            }
 689        ]
 690    );
 691}
 692
 693#[gpui::test]
 694#[cfg_attr(not(feature = "e2e"), ignore)]
 695async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 696    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 697
 698    // Test a tool call that's likely to complete *before* streaming stops.
 699    let events = thread
 700        .update(cx, |thread, cx| {
 701            thread.add_tool(EchoTool);
 702            thread.send(
 703                UserMessageId::new(),
 704                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 705                cx,
 706            )
 707        })
 708        .unwrap()
 709        .collect()
 710        .await;
 711    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 712
 713    // Test a tool calls that's likely to complete *after* streaming stops.
 714    let events = thread
 715        .update(cx, |thread, cx| {
 716            thread.remove_tool(&EchoTool::NAME);
 717            thread.add_tool(DelayTool);
 718            thread.send(
 719                UserMessageId::new(),
 720                [
 721                    "Now call the delay tool with 200ms.",
 722                    "When the timer goes off, then you echo the output of the tool.",
 723                ],
 724                cx,
 725            )
 726        })
 727        .unwrap()
 728        .collect()
 729        .await;
 730    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 731    thread.update(cx, |thread, _cx| {
 732        assert!(
 733            thread
 734                .last_received_or_pending_message()
 735                .unwrap()
 736                .as_agent_message()
 737                .unwrap()
 738                .content
 739                .iter()
 740                .any(|content| {
 741                    if let AgentMessageContent::Text(text) = content {
 742                        text.contains("Ding")
 743                    } else {
 744                        false
 745                    }
 746                }),
 747            "{}",
 748            thread.to_markdown()
 749        );
 750    });
 751}
 752
 753#[gpui::test]
 754#[cfg_attr(not(feature = "e2e"), ignore)]
 755async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 756    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 757
 758    // Test a tool call that's likely to complete *before* streaming stops.
 759    let mut events = thread
 760        .update(cx, |thread, cx| {
 761            thread.add_tool(WordListTool);
 762            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 763        })
 764        .unwrap();
 765
 766    let mut saw_partial_tool_use = false;
 767    while let Some(event) = events.next().await {
 768        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 769            thread.update(cx, |thread, _cx| {
 770                // Look for a tool use in the thread's last message
 771                let message = thread.last_received_or_pending_message().unwrap();
 772                let agent_message = message.as_agent_message().unwrap();
 773                let last_content = agent_message.content.last().unwrap();
 774                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 775                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 776                    if tool_call.status == acp::ToolCallStatus::Pending {
 777                        if !last_tool_use.is_input_complete
 778                            && last_tool_use.input.get("g").is_none()
 779                        {
 780                            saw_partial_tool_use = true;
 781                        }
 782                    } else {
 783                        last_tool_use
 784                            .input
 785                            .get("a")
 786                            .expect("'a' has streamed because input is now complete");
 787                        last_tool_use
 788                            .input
 789                            .get("g")
 790                            .expect("'g' has streamed because input is now complete");
 791                    }
 792                } else {
 793                    panic!("last content should be a tool use");
 794                }
 795            });
 796        }
 797    }
 798
 799    assert!(
 800        saw_partial_tool_use,
 801        "should see at least one partially streamed tool use in the history"
 802    );
 803}
 804
 805#[gpui::test]
 806async fn test_tool_authorization(cx: &mut TestAppContext) {
 807    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 808    let fake_model = model.as_fake();
 809
 810    let mut events = thread
 811        .update(cx, |thread, cx| {
 812            thread.add_tool(ToolRequiringPermission);
 813            thread.send(UserMessageId::new(), ["abc"], cx)
 814        })
 815        .unwrap();
 816    cx.run_until_parked();
 817    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 818        LanguageModelToolUse {
 819            id: "tool_id_1".into(),
 820            name: ToolRequiringPermission::NAME.into(),
 821            raw_input: "{}".into(),
 822            input: json!({}),
 823            is_input_complete: true,
 824            thought_signature: None,
 825        },
 826    ));
 827    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 828        LanguageModelToolUse {
 829            id: "tool_id_2".into(),
 830            name: ToolRequiringPermission::NAME.into(),
 831            raw_input: "{}".into(),
 832            input: json!({}),
 833            is_input_complete: true,
 834            thought_signature: None,
 835        },
 836    ));
 837    fake_model.end_last_completion_stream();
 838    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 839    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 840
 841    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 842    tool_call_auth_1
 843        .response
 844        .send(acp::PermissionOptionId::new("allow").into())
 845        .unwrap();
 846    cx.run_until_parked();
 847
 848    // Reject the second - send "deny" option_id directly since Deny is now a button
 849    tool_call_auth_2
 850        .response
 851        .send(acp::PermissionOptionId::new("deny").into())
 852        .unwrap();
 853    cx.run_until_parked();
 854
 855    let completion = fake_model.pending_completions().pop().unwrap();
 856    let message = completion.messages.last().unwrap();
 857    assert_eq!(
 858        message.content,
 859        vec![
 860            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 861                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 862                tool_name: ToolRequiringPermission::NAME.into(),
 863                is_error: false,
 864                content: "Allowed".into(),
 865                output: Some("Allowed".into())
 866            }),
 867            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 868                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 869                tool_name: ToolRequiringPermission::NAME.into(),
 870                is_error: true,
 871                content: "Permission to run tool denied by user".into(),
 872                output: Some("Permission to run tool denied by user".into())
 873            })
 874        ]
 875    );
 876
 877    // Simulate yet another tool call.
 878    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 879        LanguageModelToolUse {
 880            id: "tool_id_3".into(),
 881            name: ToolRequiringPermission::NAME.into(),
 882            raw_input: "{}".into(),
 883            input: json!({}),
 884            is_input_complete: true,
 885            thought_signature: None,
 886        },
 887    ));
 888    fake_model.end_last_completion_stream();
 889
 890    // Respond by always allowing tools - send transformed option_id
 891    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 892    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 893    tool_call_auth_3
 894        .response
 895        .send(acp::PermissionOptionId::new("always_allow:tool_requiring_permission").into())
 896        .unwrap();
 897    cx.run_until_parked();
 898    let completion = fake_model.pending_completions().pop().unwrap();
 899    let message = completion.messages.last().unwrap();
 900    assert_eq!(
 901        message.content,
 902        vec![language_model::MessageContent::ToolResult(
 903            LanguageModelToolResult {
 904                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 905                tool_name: ToolRequiringPermission::NAME.into(),
 906                is_error: false,
 907                content: "Allowed".into(),
 908                output: Some("Allowed".into())
 909            }
 910        )]
 911    );
 912
 913    // Simulate a final tool call, ensuring we don't trigger authorization.
 914    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 915        LanguageModelToolUse {
 916            id: "tool_id_4".into(),
 917            name: ToolRequiringPermission::NAME.into(),
 918            raw_input: "{}".into(),
 919            input: json!({}),
 920            is_input_complete: true,
 921            thought_signature: None,
 922        },
 923    ));
 924    fake_model.end_last_completion_stream();
 925    cx.run_until_parked();
 926    let completion = fake_model.pending_completions().pop().unwrap();
 927    let message = completion.messages.last().unwrap();
 928    assert_eq!(
 929        message.content,
 930        vec![language_model::MessageContent::ToolResult(
 931            LanguageModelToolResult {
 932                tool_use_id: "tool_id_4".into(),
 933                tool_name: ToolRequiringPermission::NAME.into(),
 934                is_error: false,
 935                content: "Allowed".into(),
 936                output: Some("Allowed".into())
 937            }
 938        )]
 939    );
 940}
 941
 942#[gpui::test]
 943async fn test_tool_hallucination(cx: &mut TestAppContext) {
 944    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 945    let fake_model = model.as_fake();
 946
 947    let mut events = thread
 948        .update(cx, |thread, cx| {
 949            thread.send(UserMessageId::new(), ["abc"], cx)
 950        })
 951        .unwrap();
 952    cx.run_until_parked();
 953    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 954        LanguageModelToolUse {
 955            id: "tool_id_1".into(),
 956            name: "nonexistent_tool".into(),
 957            raw_input: "{}".into(),
 958            input: json!({}),
 959            is_input_complete: true,
 960            thought_signature: None,
 961        },
 962    ));
 963    fake_model.end_last_completion_stream();
 964
 965    let tool_call = expect_tool_call(&mut events).await;
 966    assert_eq!(tool_call.title, "nonexistent_tool");
 967    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 968    let update = expect_tool_call_update_fields(&mut events).await;
 969    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 970}
 971
 972async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 973    let event = events
 974        .next()
 975        .await
 976        .expect("no tool call authorization event received")
 977        .unwrap();
 978    match event {
 979        ThreadEvent::ToolCall(tool_call) => tool_call,
 980        event => {
 981            panic!("Unexpected event {event:?}");
 982        }
 983    }
 984}
 985
 986async fn expect_tool_call_update_fields(
 987    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 988) -> acp::ToolCallUpdate {
 989    let event = events
 990        .next()
 991        .await
 992        .expect("no tool call authorization event received")
 993        .unwrap();
 994    match event {
 995        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
 996        event => {
 997            panic!("Unexpected event {event:?}");
 998        }
 999    }
1000}
1001
1002async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1003    let event = events
1004        .next()
1005        .await
1006        .expect("no plan event received")
1007        .unwrap();
1008    match event {
1009        ThreadEvent::Plan(plan) => plan,
1010        event => {
1011            panic!("Unexpected event {event:?}");
1012        }
1013    }
1014}
1015
1016async fn next_tool_call_authorization(
1017    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1018) -> ToolCallAuthorization {
1019    loop {
1020        let event = events
1021            .next()
1022            .await
1023            .expect("no tool call authorization event received")
1024            .unwrap();
1025        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1026            let permission_kinds = tool_call_authorization
1027                .options
1028                .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1029                .map(|option| option.kind);
1030            let allow_once = tool_call_authorization
1031                .options
1032                .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1033                .map(|option| option.kind);
1034
1035            assert_eq!(
1036                permission_kinds,
1037                Some(acp::PermissionOptionKind::AllowAlways)
1038            );
1039            assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1040            return tool_call_authorization;
1041        }
1042    }
1043}
1044
1045#[test]
1046fn test_permission_options_terminal_with_pattern() {
1047    let permission_options = ToolPermissionContext::new(
1048        TerminalTool::NAME,
1049        vec!["cargo build --release".to_string()],
1050    )
1051    .build_permission_options();
1052
1053    let PermissionOptions::Dropdown(choices) = permission_options else {
1054        panic!("Expected dropdown permission options");
1055    };
1056
1057    assert_eq!(choices.len(), 3);
1058    let labels: Vec<&str> = choices
1059        .iter()
1060        .map(|choice| choice.allow.name.as_ref())
1061        .collect();
1062    assert!(labels.contains(&"Always for terminal"));
1063    assert!(labels.contains(&"Always for `cargo build` commands"));
1064    assert!(labels.contains(&"Only this time"));
1065}
1066
1067#[test]
1068fn test_permission_options_terminal_command_with_flag_second_token() {
1069    let permission_options =
1070        ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1071            .build_permission_options();
1072
1073    let PermissionOptions::Dropdown(choices) = permission_options else {
1074        panic!("Expected dropdown permission options");
1075    };
1076
1077    assert_eq!(choices.len(), 3);
1078    let labels: Vec<&str> = choices
1079        .iter()
1080        .map(|choice| choice.allow.name.as_ref())
1081        .collect();
1082    assert!(labels.contains(&"Always for terminal"));
1083    assert!(labels.contains(&"Always for `ls` commands"));
1084    assert!(labels.contains(&"Only this time"));
1085}
1086
1087#[test]
1088fn test_permission_options_terminal_single_word_command() {
1089    let permission_options =
1090        ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1091            .build_permission_options();
1092
1093    let PermissionOptions::Dropdown(choices) = permission_options else {
1094        panic!("Expected dropdown permission options");
1095    };
1096
1097    assert_eq!(choices.len(), 3);
1098    let labels: Vec<&str> = choices
1099        .iter()
1100        .map(|choice| choice.allow.name.as_ref())
1101        .collect();
1102    assert!(labels.contains(&"Always for terminal"));
1103    assert!(labels.contains(&"Always for `whoami` commands"));
1104    assert!(labels.contains(&"Only this time"));
1105}
1106
1107#[test]
1108fn test_permission_options_edit_file_with_path_pattern() {
1109    let permission_options =
1110        ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1111            .build_permission_options();
1112
1113    let PermissionOptions::Dropdown(choices) = permission_options else {
1114        panic!("Expected dropdown permission options");
1115    };
1116
1117    let labels: Vec<&str> = choices
1118        .iter()
1119        .map(|choice| choice.allow.name.as_ref())
1120        .collect();
1121    assert!(labels.contains(&"Always for edit file"));
1122    assert!(labels.contains(&"Always for `src/`"));
1123}
1124
1125#[test]
1126fn test_permission_options_fetch_with_domain_pattern() {
1127    let permission_options =
1128        ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1129            .build_permission_options();
1130
1131    let PermissionOptions::Dropdown(choices) = permission_options else {
1132        panic!("Expected dropdown permission options");
1133    };
1134
1135    let labels: Vec<&str> = choices
1136        .iter()
1137        .map(|choice| choice.allow.name.as_ref())
1138        .collect();
1139    assert!(labels.contains(&"Always for fetch"));
1140    assert!(labels.contains(&"Always for `docs.rs`"));
1141}
1142
1143#[test]
1144fn test_permission_options_without_pattern() {
1145    let permission_options = ToolPermissionContext::new(
1146        TerminalTool::NAME,
1147        vec!["./deploy.sh --production".to_string()],
1148    )
1149    .build_permission_options();
1150
1151    let PermissionOptions::Dropdown(choices) = permission_options else {
1152        panic!("Expected dropdown permission options");
1153    };
1154
1155    assert_eq!(choices.len(), 2);
1156    let labels: Vec<&str> = choices
1157        .iter()
1158        .map(|choice| choice.allow.name.as_ref())
1159        .collect();
1160    assert!(labels.contains(&"Always for terminal"));
1161    assert!(labels.contains(&"Only this time"));
1162    assert!(!labels.iter().any(|label| label.contains("commands")));
1163}
1164
1165#[test]
1166fn test_permission_options_symlink_target_are_flat_once_only() {
1167    let permission_options =
1168        ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1169            .build_permission_options();
1170
1171    let PermissionOptions::Flat(options) = permission_options else {
1172        panic!("Expected flat permission options for symlink target authorization");
1173    };
1174
1175    assert_eq!(options.len(), 2);
1176    assert!(options.iter().any(|option| {
1177        option.option_id.0.as_ref() == "allow"
1178            && option.kind == acp::PermissionOptionKind::AllowOnce
1179    }));
1180    assert!(options.iter().any(|option| {
1181        option.option_id.0.as_ref() == "deny"
1182            && option.kind == acp::PermissionOptionKind::RejectOnce
1183    }));
1184}
1185
1186#[test]
1187fn test_permission_option_ids_for_terminal() {
1188    let permission_options = ToolPermissionContext::new(
1189        TerminalTool::NAME,
1190        vec!["cargo build --release".to_string()],
1191    )
1192    .build_permission_options();
1193
1194    let PermissionOptions::Dropdown(choices) = permission_options else {
1195        panic!("Expected dropdown permission options");
1196    };
1197
1198    // Expect 3 choices: always-tool, always-pattern, once
1199    assert_eq!(choices.len(), 3);
1200
1201    // First two choices both use the tool-level option IDs
1202    assert_eq!(
1203        choices[0].allow.option_id.0.as_ref(),
1204        "always_allow:terminal"
1205    );
1206    assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1207    assert!(choices[0].sub_patterns.is_empty());
1208
1209    assert_eq!(
1210        choices[1].allow.option_id.0.as_ref(),
1211        "always_allow:terminal"
1212    );
1213    assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1214    assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1215
1216    // Third choice is the one-time allow/deny
1217    assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1218    assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1219    assert!(choices[2].sub_patterns.is_empty());
1220}
1221
1222#[test]
1223fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1224    let permission_options = ToolPermissionContext::new(
1225        TerminalTool::NAME,
1226        vec!["cargo test 2>&1 | tail".to_string()],
1227    )
1228    .build_permission_options();
1229
1230    let PermissionOptions::DropdownWithPatterns {
1231        choices,
1232        patterns,
1233        tool_name,
1234    } = permission_options
1235    else {
1236        panic!("Expected DropdownWithPatterns permission options for pipeline command");
1237    };
1238
1239    assert_eq!(tool_name, TerminalTool::NAME);
1240
1241    // Should have "Always for terminal" and "Only this time" choices
1242    assert_eq!(choices.len(), 2);
1243    let labels: Vec<&str> = choices
1244        .iter()
1245        .map(|choice| choice.allow.name.as_ref())
1246        .collect();
1247    assert!(labels.contains(&"Always for terminal"));
1248    assert!(labels.contains(&"Only this time"));
1249
1250    // Should have per-command patterns for "cargo test" and "tail"
1251    assert_eq!(patterns.len(), 2);
1252    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1253    assert!(pattern_names.contains(&"cargo test"));
1254    assert!(pattern_names.contains(&"tail"));
1255
1256    // Verify patterns are valid regex patterns
1257    let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1258    assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1259    assert!(regex_patterns.contains(&"^tail\\b"));
1260}
1261
1262#[test]
1263fn test_permission_options_terminal_pipeline_with_chaining() {
1264    let permission_options = ToolPermissionContext::new(
1265        TerminalTool::NAME,
1266        vec!["npm install && npm test | tail".to_string()],
1267    )
1268    .build_permission_options();
1269
1270    let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1271        panic!("Expected DropdownWithPatterns for chained pipeline command");
1272    };
1273
1274    // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1275    assert_eq!(patterns.len(), 3);
1276    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1277    assert!(pattern_names.contains(&"npm install"));
1278    assert!(pattern_names.contains(&"npm test"));
1279    assert!(pattern_names.contains(&"tail"));
1280}
1281
1282#[gpui::test]
1283#[cfg_attr(not(feature = "e2e"), ignore)]
1284async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1285    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1286
1287    // Test concurrent tool calls with different delay times
1288    let events = thread
1289        .update(cx, |thread, cx| {
1290            thread.add_tool(DelayTool);
1291            thread.send(
1292                UserMessageId::new(),
1293                [
1294                    "Call the delay tool twice in the same message.",
1295                    "Once with 100ms. Once with 300ms.",
1296                    "When both timers are complete, describe the outputs.",
1297                ],
1298                cx,
1299            )
1300        })
1301        .unwrap()
1302        .collect()
1303        .await;
1304
1305    let stop_reasons = stop_events(events);
1306    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1307
1308    thread.update(cx, |thread, _cx| {
1309        let last_message = thread.last_received_or_pending_message().unwrap();
1310        let agent_message = last_message.as_agent_message().unwrap();
1311        let text = agent_message
1312            .content
1313            .iter()
1314            .filter_map(|content| {
1315                if let AgentMessageContent::Text(text) = content {
1316                    Some(text.as_str())
1317                } else {
1318                    None
1319                }
1320            })
1321            .collect::<String>();
1322
1323        assert!(text.contains("Ding"));
1324    });
1325}
1326
1327#[gpui::test]
1328async fn test_profiles(cx: &mut TestAppContext) {
1329    let ThreadTest {
1330        model, thread, fs, ..
1331    } = setup(cx, TestModel::Fake).await;
1332    let fake_model = model.as_fake();
1333
1334    thread.update(cx, |thread, _cx| {
1335        thread.add_tool(DelayTool);
1336        thread.add_tool(EchoTool);
1337        thread.add_tool(InfiniteTool);
1338    });
1339
1340    // Override profiles and wait for settings to be loaded.
1341    fs.insert_file(
1342        paths::settings_file(),
1343        json!({
1344            "agent": {
1345                "profiles": {
1346                    "test-1": {
1347                        "name": "Test Profile 1",
1348                        "tools": {
1349                            EchoTool::NAME: true,
1350                            DelayTool::NAME: true,
1351                        }
1352                    },
1353                    "test-2": {
1354                        "name": "Test Profile 2",
1355                        "tools": {
1356                            InfiniteTool::NAME: true,
1357                        }
1358                    }
1359                }
1360            }
1361        })
1362        .to_string()
1363        .into_bytes(),
1364    )
1365    .await;
1366    cx.run_until_parked();
1367
1368    // Test that test-1 profile (default) has echo and delay tools
1369    thread
1370        .update(cx, |thread, cx| {
1371            thread.set_profile(AgentProfileId("test-1".into()), cx);
1372            thread.send(UserMessageId::new(), ["test"], cx)
1373        })
1374        .unwrap();
1375    cx.run_until_parked();
1376
1377    let mut pending_completions = fake_model.pending_completions();
1378    assert_eq!(pending_completions.len(), 1);
1379    let completion = pending_completions.pop().unwrap();
1380    let tool_names: Vec<String> = completion
1381        .tools
1382        .iter()
1383        .map(|tool| tool.name.clone())
1384        .collect();
1385    assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1386    fake_model.end_last_completion_stream();
1387
1388    // Switch to test-2 profile, and verify that it has only the infinite tool.
1389    thread
1390        .update(cx, |thread, cx| {
1391            thread.set_profile(AgentProfileId("test-2".into()), cx);
1392            thread.send(UserMessageId::new(), ["test2"], cx)
1393        })
1394        .unwrap();
1395    cx.run_until_parked();
1396    let mut pending_completions = fake_model.pending_completions();
1397    assert_eq!(pending_completions.len(), 1);
1398    let completion = pending_completions.pop().unwrap();
1399    let tool_names: Vec<String> = completion
1400        .tools
1401        .iter()
1402        .map(|tool| tool.name.clone())
1403        .collect();
1404    assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1405}
1406
1407#[gpui::test]
1408async fn test_mcp_tools(cx: &mut TestAppContext) {
1409    let ThreadTest {
1410        model,
1411        thread,
1412        context_server_store,
1413        fs,
1414        ..
1415    } = setup(cx, TestModel::Fake).await;
1416    let fake_model = model.as_fake();
1417
1418    // Override profiles and wait for settings to be loaded.
1419    fs.insert_file(
1420        paths::settings_file(),
1421        json!({
1422            "agent": {
1423                "tool_permissions": { "default": "allow" },
1424                "profiles": {
1425                    "test": {
1426                        "name": "Test Profile",
1427                        "enable_all_context_servers": true,
1428                        "tools": {
1429                            EchoTool::NAME: true,
1430                        }
1431                    },
1432                }
1433            }
1434        })
1435        .to_string()
1436        .into_bytes(),
1437    )
1438    .await;
1439    cx.run_until_parked();
1440    thread.update(cx, |thread, cx| {
1441        thread.set_profile(AgentProfileId("test".into()), cx)
1442    });
1443
1444    let mut mcp_tool_calls = setup_context_server(
1445        "test_server",
1446        vec![context_server::types::Tool {
1447            name: "echo".into(),
1448            description: None,
1449            input_schema: serde_json::to_value(EchoTool::input_schema(
1450                LanguageModelToolSchemaFormat::JsonSchema,
1451            ))
1452            .unwrap(),
1453            output_schema: None,
1454            annotations: None,
1455        }],
1456        &context_server_store,
1457        cx,
1458    );
1459
1460    let events = thread.update(cx, |thread, cx| {
1461        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1462    });
1463    cx.run_until_parked();
1464
1465    // Simulate the model calling the MCP tool.
1466    let completion = fake_model.pending_completions().pop().unwrap();
1467    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1468    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1469        LanguageModelToolUse {
1470            id: "tool_1".into(),
1471            name: "echo".into(),
1472            raw_input: json!({"text": "test"}).to_string(),
1473            input: json!({"text": "test"}),
1474            is_input_complete: true,
1475            thought_signature: None,
1476        },
1477    ));
1478    fake_model.end_last_completion_stream();
1479    cx.run_until_parked();
1480
1481    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1482    assert_eq!(tool_call_params.name, "echo");
1483    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1484    tool_call_response
1485        .send(context_server::types::CallToolResponse {
1486            content: vec![context_server::types::ToolResponseContent::Text {
1487                text: "test".into(),
1488            }],
1489            is_error: None,
1490            meta: None,
1491            structured_content: None,
1492        })
1493        .unwrap();
1494    cx.run_until_parked();
1495
1496    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1497    fake_model.send_last_completion_stream_text_chunk("Done!");
1498    fake_model.end_last_completion_stream();
1499    events.collect::<Vec<_>>().await;
1500
1501    // Send again after adding the echo tool, ensuring the name collision is resolved.
1502    let events = thread.update(cx, |thread, cx| {
1503        thread.add_tool(EchoTool);
1504        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1505    });
1506    cx.run_until_parked();
1507    let completion = fake_model.pending_completions().pop().unwrap();
1508    assert_eq!(
1509        tool_names_for_completion(&completion),
1510        vec!["echo", "test_server_echo"]
1511    );
1512    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1513        LanguageModelToolUse {
1514            id: "tool_2".into(),
1515            name: "test_server_echo".into(),
1516            raw_input: json!({"text": "mcp"}).to_string(),
1517            input: json!({"text": "mcp"}),
1518            is_input_complete: true,
1519            thought_signature: None,
1520        },
1521    ));
1522    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1523        LanguageModelToolUse {
1524            id: "tool_3".into(),
1525            name: "echo".into(),
1526            raw_input: json!({"text": "native"}).to_string(),
1527            input: json!({"text": "native"}),
1528            is_input_complete: true,
1529            thought_signature: None,
1530        },
1531    ));
1532    fake_model.end_last_completion_stream();
1533    cx.run_until_parked();
1534
1535    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1536    assert_eq!(tool_call_params.name, "echo");
1537    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1538    tool_call_response
1539        .send(context_server::types::CallToolResponse {
1540            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1541            is_error: None,
1542            meta: None,
1543            structured_content: None,
1544        })
1545        .unwrap();
1546    cx.run_until_parked();
1547
1548    // Ensure the tool results were inserted with the correct names.
1549    let completion = fake_model.pending_completions().pop().unwrap();
1550    assert_eq!(
1551        completion.messages.last().unwrap().content,
1552        vec![
1553            MessageContent::ToolResult(LanguageModelToolResult {
1554                tool_use_id: "tool_3".into(),
1555                tool_name: "echo".into(),
1556                is_error: false,
1557                content: "native".into(),
1558                output: Some("native".into()),
1559            },),
1560            MessageContent::ToolResult(LanguageModelToolResult {
1561                tool_use_id: "tool_2".into(),
1562                tool_name: "test_server_echo".into(),
1563                is_error: false,
1564                content: "mcp".into(),
1565                output: Some("mcp".into()),
1566            },),
1567        ]
1568    );
1569    fake_model.end_last_completion_stream();
1570    events.collect::<Vec<_>>().await;
1571}
1572
1573#[gpui::test]
1574async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1575    let ThreadTest {
1576        model,
1577        thread,
1578        context_server_store,
1579        fs,
1580        ..
1581    } = setup(cx, TestModel::Fake).await;
1582    let fake_model = model.as_fake();
1583
1584    // Setup settings to allow MCP tools
1585    fs.insert_file(
1586        paths::settings_file(),
1587        json!({
1588            "agent": {
1589                "always_allow_tool_actions": true,
1590                "profiles": {
1591                    "test": {
1592                        "name": "Test Profile",
1593                        "enable_all_context_servers": true,
1594                        "tools": {}
1595                    },
1596                }
1597            }
1598        })
1599        .to_string()
1600        .into_bytes(),
1601    )
1602    .await;
1603    cx.run_until_parked();
1604    thread.update(cx, |thread, cx| {
1605        thread.set_profile(AgentProfileId("test".into()), cx)
1606    });
1607
1608    // Setup a context server with a tool
1609    let mut mcp_tool_calls = setup_context_server(
1610        "github_server",
1611        vec![context_server::types::Tool {
1612            name: "issue_read".into(),
1613            description: Some("Read a GitHub issue".into()),
1614            input_schema: json!({
1615                "type": "object",
1616                "properties": {
1617                    "issue_url": { "type": "string" }
1618                }
1619            }),
1620            output_schema: None,
1621            annotations: None,
1622        }],
1623        &context_server_store,
1624        cx,
1625    );
1626
1627    // Send a message and have the model call the MCP tool
1628    let events = thread.update(cx, |thread, cx| {
1629        thread
1630            .send(UserMessageId::new(), ["Read issue #47404"], cx)
1631            .unwrap()
1632    });
1633    cx.run_until_parked();
1634
1635    // Verify the MCP tool is available to the model
1636    let completion = fake_model.pending_completions().pop().unwrap();
1637    assert_eq!(
1638        tool_names_for_completion(&completion),
1639        vec!["issue_read"],
1640        "MCP tool should be available"
1641    );
1642
1643    // Simulate the model calling the MCP tool
1644    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1645        LanguageModelToolUse {
1646            id: "tool_1".into(),
1647            name: "issue_read".into(),
1648            raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1649                .to_string(),
1650            input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1651            is_input_complete: true,
1652            thought_signature: None,
1653        },
1654    ));
1655    fake_model.end_last_completion_stream();
1656    cx.run_until_parked();
1657
1658    // The MCP server receives the tool call and responds with content
1659    let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1660    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1661    assert_eq!(tool_call_params.name, "issue_read");
1662    tool_call_response
1663        .send(context_server::types::CallToolResponse {
1664            content: vec![context_server::types::ToolResponseContent::Text {
1665                text: expected_tool_output.into(),
1666            }],
1667            is_error: None,
1668            meta: None,
1669            structured_content: None,
1670        })
1671        .unwrap();
1672    cx.run_until_parked();
1673
1674    // After tool completes, the model continues with a new completion request
1675    // that includes the tool results. We need to respond to this.
1676    let _completion = fake_model.pending_completions().pop().unwrap();
1677    fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1678    fake_model
1679        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1680    fake_model.end_last_completion_stream();
1681    events.collect::<Vec<_>>().await;
1682
1683    // Verify the tool result is stored in the thread by checking the markdown output.
1684    // The tool result is in the first assistant message (not the last one, which is
1685    // the model's response after the tool completed).
1686    thread.update(cx, |thread, _cx| {
1687        let markdown = thread.to_markdown();
1688        assert!(
1689            markdown.contains("**Tool Result**: issue_read"),
1690            "Thread should contain tool result header"
1691        );
1692        assert!(
1693            markdown.contains(expected_tool_output),
1694            "Thread should contain tool output: {}",
1695            expected_tool_output
1696        );
1697    });
1698
1699    // Simulate app restart: disconnect the MCP server.
1700    // After restart, the MCP server won't be connected yet when the thread is replayed.
1701    context_server_store.update(cx, |store, cx| {
1702        let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1703    });
1704    cx.run_until_parked();
1705
1706    // Replay the thread (this is what happens when loading a saved thread)
1707    let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1708
1709    let mut found_tool_call = None;
1710    let mut found_tool_call_update_with_output = None;
1711
1712    while let Some(event) = replay_events.next().await {
1713        let event = event.unwrap();
1714        match &event {
1715            ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1716                found_tool_call = Some(tc.clone());
1717            }
1718            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1719                if update.tool_call_id.to_string() == "tool_1" =>
1720            {
1721                if update.fields.raw_output.is_some() {
1722                    found_tool_call_update_with_output = Some(update.clone());
1723                }
1724            }
1725            _ => {}
1726        }
1727    }
1728
1729    // The tool call should be found
1730    assert!(
1731        found_tool_call.is_some(),
1732        "Tool call should be emitted during replay"
1733    );
1734
1735    assert!(
1736        found_tool_call_update_with_output.is_some(),
1737        "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1738    );
1739
1740    let update = found_tool_call_update_with_output.unwrap();
1741    assert_eq!(
1742        update.fields.raw_output,
1743        Some(expected_tool_output.into()),
1744        "raw_output should contain the saved tool result"
1745    );
1746
1747    // Also verify the status is correct (completed, not failed)
1748    assert_eq!(
1749        update.fields.status,
1750        Some(acp::ToolCallStatus::Completed),
1751        "Tool call status should reflect the original completion status"
1752    );
1753}
1754
1755#[gpui::test]
1756async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1757    let ThreadTest {
1758        model,
1759        thread,
1760        context_server_store,
1761        fs,
1762        ..
1763    } = setup(cx, TestModel::Fake).await;
1764    let fake_model = model.as_fake();
1765
1766    // Set up a profile with all tools enabled
1767    fs.insert_file(
1768        paths::settings_file(),
1769        json!({
1770            "agent": {
1771                "profiles": {
1772                    "test": {
1773                        "name": "Test Profile",
1774                        "enable_all_context_servers": true,
1775                        "tools": {
1776                            EchoTool::NAME: true,
1777                            DelayTool::NAME: true,
1778                            WordListTool::NAME: true,
1779                            ToolRequiringPermission::NAME: true,
1780                            InfiniteTool::NAME: true,
1781                        }
1782                    },
1783                }
1784            }
1785        })
1786        .to_string()
1787        .into_bytes(),
1788    )
1789    .await;
1790    cx.run_until_parked();
1791
1792    thread.update(cx, |thread, cx| {
1793        thread.set_profile(AgentProfileId("test".into()), cx);
1794        thread.add_tool(EchoTool);
1795        thread.add_tool(DelayTool);
1796        thread.add_tool(WordListTool);
1797        thread.add_tool(ToolRequiringPermission);
1798        thread.add_tool(InfiniteTool);
1799    });
1800
1801    // Set up multiple context servers with some overlapping tool names
1802    let _server1_calls = setup_context_server(
1803        "xxx",
1804        vec![
1805            context_server::types::Tool {
1806                name: "echo".into(), // Conflicts with native EchoTool
1807                description: None,
1808                input_schema: serde_json::to_value(EchoTool::input_schema(
1809                    LanguageModelToolSchemaFormat::JsonSchema,
1810                ))
1811                .unwrap(),
1812                output_schema: None,
1813                annotations: None,
1814            },
1815            context_server::types::Tool {
1816                name: "unique_tool_1".into(),
1817                description: None,
1818                input_schema: json!({"type": "object", "properties": {}}),
1819                output_schema: None,
1820                annotations: None,
1821            },
1822        ],
1823        &context_server_store,
1824        cx,
1825    );
1826
1827    let _server2_calls = setup_context_server(
1828        "yyy",
1829        vec![
1830            context_server::types::Tool {
1831                name: "echo".into(), // Also conflicts with native EchoTool
1832                description: None,
1833                input_schema: serde_json::to_value(EchoTool::input_schema(
1834                    LanguageModelToolSchemaFormat::JsonSchema,
1835                ))
1836                .unwrap(),
1837                output_schema: None,
1838                annotations: None,
1839            },
1840            context_server::types::Tool {
1841                name: "unique_tool_2".into(),
1842                description: None,
1843                input_schema: json!({"type": "object", "properties": {}}),
1844                output_schema: None,
1845                annotations: None,
1846            },
1847            context_server::types::Tool {
1848                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1849                description: None,
1850                input_schema: json!({"type": "object", "properties": {}}),
1851                output_schema: None,
1852                annotations: None,
1853            },
1854            context_server::types::Tool {
1855                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1856                description: None,
1857                input_schema: json!({"type": "object", "properties": {}}),
1858                output_schema: None,
1859                annotations: None,
1860            },
1861        ],
1862        &context_server_store,
1863        cx,
1864    );
1865    let _server3_calls = setup_context_server(
1866        "zzz",
1867        vec![
1868            context_server::types::Tool {
1869                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1870                description: None,
1871                input_schema: json!({"type": "object", "properties": {}}),
1872                output_schema: None,
1873                annotations: None,
1874            },
1875            context_server::types::Tool {
1876                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1877                description: None,
1878                input_schema: json!({"type": "object", "properties": {}}),
1879                output_schema: None,
1880                annotations: None,
1881            },
1882            context_server::types::Tool {
1883                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1884                description: None,
1885                input_schema: json!({"type": "object", "properties": {}}),
1886                output_schema: None,
1887                annotations: None,
1888            },
1889        ],
1890        &context_server_store,
1891        cx,
1892    );
1893
1894    // Server with spaces in name - tests snake_case conversion for API compatibility
1895    let _server4_calls = setup_context_server(
1896        "Azure DevOps",
1897        vec![context_server::types::Tool {
1898            name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1899            description: None,
1900            input_schema: serde_json::to_value(EchoTool::input_schema(
1901                LanguageModelToolSchemaFormat::JsonSchema,
1902            ))
1903            .unwrap(),
1904            output_schema: None,
1905            annotations: None,
1906        }],
1907        &context_server_store,
1908        cx,
1909    );
1910
1911    thread
1912        .update(cx, |thread, cx| {
1913            thread.send(UserMessageId::new(), ["Go"], cx)
1914        })
1915        .unwrap();
1916    cx.run_until_parked();
1917    let completion = fake_model.pending_completions().pop().unwrap();
1918    assert_eq!(
1919        tool_names_for_completion(&completion),
1920        vec![
1921            "azure_dev_ops_echo",
1922            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1923            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1924            "delay",
1925            "echo",
1926            "infinite",
1927            "tool_requiring_permission",
1928            "unique_tool_1",
1929            "unique_tool_2",
1930            "word_list",
1931            "xxx_echo",
1932            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1933            "yyy_echo",
1934            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1935        ]
1936    );
1937}
1938
1939#[gpui::test]
1940#[cfg_attr(not(feature = "e2e"), ignore)]
1941async fn test_cancellation(cx: &mut TestAppContext) {
1942    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1943
1944    let mut events = thread
1945        .update(cx, |thread, cx| {
1946            thread.add_tool(InfiniteTool);
1947            thread.add_tool(EchoTool);
1948            thread.send(
1949                UserMessageId::new(),
1950                ["Call the echo tool, then call the infinite tool, then explain their output"],
1951                cx,
1952            )
1953        })
1954        .unwrap();
1955
1956    // Wait until both tools are called.
1957    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1958    let mut echo_id = None;
1959    let mut echo_completed = false;
1960    while let Some(event) = events.next().await {
1961        match event.unwrap() {
1962            ThreadEvent::ToolCall(tool_call) => {
1963                assert_eq!(tool_call.title, expected_tools.remove(0));
1964                if tool_call.title == "Echo" {
1965                    echo_id = Some(tool_call.tool_call_id);
1966                }
1967            }
1968            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1969                acp::ToolCallUpdate {
1970                    tool_call_id,
1971                    fields:
1972                        acp::ToolCallUpdateFields {
1973                            status: Some(acp::ToolCallStatus::Completed),
1974                            ..
1975                        },
1976                    ..
1977                },
1978            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1979                echo_completed = true;
1980            }
1981            _ => {}
1982        }
1983
1984        if expected_tools.is_empty() && echo_completed {
1985            break;
1986        }
1987    }
1988
1989    // Cancel the current send and ensure that the event stream is closed, even
1990    // if one of the tools is still running.
1991    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1992    let events = events.collect::<Vec<_>>().await;
1993    let last_event = events.last();
1994    assert!(
1995        matches!(
1996            last_event,
1997            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1998        ),
1999        "unexpected event {last_event:?}"
2000    );
2001
2002    // Ensure we can still send a new message after cancellation.
2003    let events = thread
2004        .update(cx, |thread, cx| {
2005            thread.send(
2006                UserMessageId::new(),
2007                ["Testing: reply with 'Hello' then stop."],
2008                cx,
2009            )
2010        })
2011        .unwrap()
2012        .collect::<Vec<_>>()
2013        .await;
2014    thread.update(cx, |thread, _cx| {
2015        let message = thread.last_received_or_pending_message().unwrap();
2016        let agent_message = message.as_agent_message().unwrap();
2017        assert_eq!(
2018            agent_message.content,
2019            vec![AgentMessageContent::Text("Hello".to_string())]
2020        );
2021    });
2022    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2023}
2024
2025#[gpui::test]
2026async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2027    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2028    always_allow_tools(cx);
2029    let fake_model = model.as_fake();
2030
2031    let environment = Rc::new(cx.update(|cx| {
2032        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2033    }));
2034    let handle = environment.terminal_handle.clone().unwrap();
2035
2036    let mut events = thread
2037        .update(cx, |thread, cx| {
2038            thread.add_tool(crate::TerminalTool::new(
2039                thread.project().clone(),
2040                environment,
2041            ));
2042            thread.send(UserMessageId::new(), ["run a command"], cx)
2043        })
2044        .unwrap();
2045
2046    cx.run_until_parked();
2047
2048    // Simulate the model calling the terminal tool
2049    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2050        LanguageModelToolUse {
2051            id: "terminal_tool_1".into(),
2052            name: TerminalTool::NAME.into(),
2053            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2054            input: json!({"command": "sleep 1000", "cd": "."}),
2055            is_input_complete: true,
2056            thought_signature: None,
2057        },
2058    ));
2059    fake_model.end_last_completion_stream();
2060
2061    // Wait for the terminal tool to start running
2062    wait_for_terminal_tool_started(&mut events, cx).await;
2063
2064    // Cancel the thread while the terminal is running
2065    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2066
2067    // Collect remaining events, driving the executor to let cancellation complete
2068    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2069
2070    // Verify the terminal was killed
2071    assert!(
2072        handle.was_killed(),
2073        "expected terminal handle to be killed on cancellation"
2074    );
2075
2076    // Verify we got a cancellation stop event
2077    assert_eq!(
2078        stop_events(remaining_events),
2079        vec![acp::StopReason::Cancelled],
2080    );
2081
2082    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2083    thread.update(cx, |thread, _cx| {
2084        let message = thread.last_received_or_pending_message().unwrap();
2085        let agent_message = message.as_agent_message().unwrap();
2086
2087        let tool_use = agent_message
2088            .content
2089            .iter()
2090            .find_map(|content| match content {
2091                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2092                _ => None,
2093            })
2094            .expect("expected tool use in agent message");
2095
2096        let tool_result = agent_message
2097            .tool_results
2098            .get(&tool_use.id)
2099            .expect("expected tool result");
2100
2101        let result_text = match &tool_result.content {
2102            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2103            _ => panic!("expected text content in tool result"),
2104        };
2105
2106        // "partial output" comes from FakeTerminalHandle's output field
2107        assert!(
2108            result_text.contains("partial output"),
2109            "expected tool result to contain terminal output, got: {result_text}"
2110        );
2111        // Match the actual format from process_content in terminal_tool.rs
2112        assert!(
2113            result_text.contains("The user stopped this command"),
2114            "expected tool result to indicate user stopped, got: {result_text}"
2115        );
2116    });
2117
2118    // Verify we can send a new message after cancellation
2119    verify_thread_recovery(&thread, &fake_model, cx).await;
2120}
2121
2122#[gpui::test]
2123async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2124    // This test verifies that tools which properly handle cancellation via
2125    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2126    // to cancellation and report that they were cancelled.
2127    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2128    always_allow_tools(cx);
2129    let fake_model = model.as_fake();
2130
2131    let (tool, was_cancelled) = CancellationAwareTool::new();
2132
2133    let mut events = thread
2134        .update(cx, |thread, cx| {
2135            thread.add_tool(tool);
2136            thread.send(
2137                UserMessageId::new(),
2138                ["call the cancellation aware tool"],
2139                cx,
2140            )
2141        })
2142        .unwrap();
2143
2144    cx.run_until_parked();
2145
2146    // Simulate the model calling the cancellation-aware tool
2147    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2148        LanguageModelToolUse {
2149            id: "cancellation_aware_1".into(),
2150            name: "cancellation_aware".into(),
2151            raw_input: r#"{}"#.into(),
2152            input: json!({}),
2153            is_input_complete: true,
2154            thought_signature: None,
2155        },
2156    ));
2157    fake_model.end_last_completion_stream();
2158
2159    cx.run_until_parked();
2160
2161    // Wait for the tool call to be reported
2162    let mut tool_started = false;
2163    let deadline = cx.executor().num_cpus() * 100;
2164    for _ in 0..deadline {
2165        cx.run_until_parked();
2166
2167        while let Some(Some(event)) = events.next().now_or_never() {
2168            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2169                if tool_call.title == "Cancellation Aware Tool" {
2170                    tool_started = true;
2171                    break;
2172                }
2173            }
2174        }
2175
2176        if tool_started {
2177            break;
2178        }
2179
2180        cx.background_executor
2181            .timer(Duration::from_millis(10))
2182            .await;
2183    }
2184    assert!(tool_started, "expected cancellation aware tool to start");
2185
2186    // Cancel the thread and wait for it to complete
2187    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2188
2189    // The cancel task should complete promptly because the tool handles cancellation
2190    let timeout = cx.background_executor.timer(Duration::from_secs(5));
2191    futures::select! {
2192        _ = cancel_task.fuse() => {}
2193        _ = timeout.fuse() => {
2194            panic!("cancel task timed out - tool did not respond to cancellation");
2195        }
2196    }
2197
2198    // Verify the tool detected cancellation via its flag
2199    assert!(
2200        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2201        "tool should have detected cancellation via event_stream.cancelled_by_user()"
2202    );
2203
2204    // Collect remaining events
2205    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2206
2207    // Verify we got a cancellation stop event
2208    assert_eq!(
2209        stop_events(remaining_events),
2210        vec![acp::StopReason::Cancelled],
2211    );
2212
2213    // Verify we can send a new message after cancellation
2214    verify_thread_recovery(&thread, &fake_model, cx).await;
2215}
2216
2217/// Helper to verify thread can recover after cancellation by sending a simple message.
2218async fn verify_thread_recovery(
2219    thread: &Entity<Thread>,
2220    fake_model: &FakeLanguageModel,
2221    cx: &mut TestAppContext,
2222) {
2223    let events = thread
2224        .update(cx, |thread, cx| {
2225            thread.send(
2226                UserMessageId::new(),
2227                ["Testing: reply with 'Hello' then stop."],
2228                cx,
2229            )
2230        })
2231        .unwrap();
2232    cx.run_until_parked();
2233    fake_model.send_last_completion_stream_text_chunk("Hello");
2234    fake_model
2235        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2236    fake_model.end_last_completion_stream();
2237
2238    let events = events.collect::<Vec<_>>().await;
2239    thread.update(cx, |thread, _cx| {
2240        let message = thread.last_received_or_pending_message().unwrap();
2241        let agent_message = message.as_agent_message().unwrap();
2242        assert_eq!(
2243            agent_message.content,
2244            vec![AgentMessageContent::Text("Hello".to_string())]
2245        );
2246    });
2247    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2248}
2249
2250/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2251async fn wait_for_terminal_tool_started(
2252    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2253    cx: &mut TestAppContext,
2254) {
2255    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2256    for _ in 0..deadline {
2257        cx.run_until_parked();
2258
2259        while let Some(Some(event)) = events.next().now_or_never() {
2260            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2261                update,
2262            ))) = &event
2263            {
2264                if update.fields.content.as_ref().is_some_and(|content| {
2265                    content
2266                        .iter()
2267                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2268                }) {
2269                    return;
2270                }
2271            }
2272        }
2273
2274        cx.background_executor
2275            .timer(Duration::from_millis(10))
2276            .await;
2277    }
2278    panic!("terminal tool did not start within the expected time");
2279}
2280
2281/// Collects events until a Stop event is received, driving the executor to completion.
2282async fn collect_events_until_stop(
2283    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2284    cx: &mut TestAppContext,
2285) -> Vec<Result<ThreadEvent>> {
2286    let mut collected = Vec::new();
2287    let deadline = cx.executor().num_cpus() * 200;
2288
2289    for _ in 0..deadline {
2290        cx.executor().advance_clock(Duration::from_millis(10));
2291        cx.run_until_parked();
2292
2293        while let Some(Some(event)) = events.next().now_or_never() {
2294            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2295            collected.push(event);
2296            if is_stop {
2297                return collected;
2298            }
2299        }
2300    }
2301    panic!(
2302        "did not receive Stop event within the expected time; collected {} events",
2303        collected.len()
2304    );
2305}
2306
2307#[gpui::test]
2308async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2309    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2310    always_allow_tools(cx);
2311    let fake_model = model.as_fake();
2312
2313    let environment = Rc::new(cx.update(|cx| {
2314        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2315    }));
2316    let handle = environment.terminal_handle.clone().unwrap();
2317
2318    let message_id = UserMessageId::new();
2319    let mut events = thread
2320        .update(cx, |thread, cx| {
2321            thread.add_tool(crate::TerminalTool::new(
2322                thread.project().clone(),
2323                environment,
2324            ));
2325            thread.send(message_id.clone(), ["run a command"], cx)
2326        })
2327        .unwrap();
2328
2329    cx.run_until_parked();
2330
2331    // Simulate the model calling the terminal tool
2332    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2333        LanguageModelToolUse {
2334            id: "terminal_tool_1".into(),
2335            name: TerminalTool::NAME.into(),
2336            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2337            input: json!({"command": "sleep 1000", "cd": "."}),
2338            is_input_complete: true,
2339            thought_signature: None,
2340        },
2341    ));
2342    fake_model.end_last_completion_stream();
2343
2344    // Wait for the terminal tool to start running
2345    wait_for_terminal_tool_started(&mut events, cx).await;
2346
2347    // Truncate the thread while the terminal is running
2348    thread
2349        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2350        .unwrap();
2351
2352    // Drive the executor to let cancellation complete
2353    let _ = collect_events_until_stop(&mut events, cx).await;
2354
2355    // Verify the terminal was killed
2356    assert!(
2357        handle.was_killed(),
2358        "expected terminal handle to be killed on truncate"
2359    );
2360
2361    // Verify the thread is empty after truncation
2362    thread.update(cx, |thread, _cx| {
2363        assert_eq!(
2364            thread.to_markdown(),
2365            "",
2366            "expected thread to be empty after truncating the only message"
2367        );
2368    });
2369
2370    // Verify we can send a new message after truncation
2371    verify_thread_recovery(&thread, &fake_model, cx).await;
2372}
2373
2374#[gpui::test]
2375async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2376    // Tests that cancellation properly kills all running terminal tools when multiple are active.
2377    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2378    always_allow_tools(cx);
2379    let fake_model = model.as_fake();
2380
2381    let environment = Rc::new(MultiTerminalEnvironment::new());
2382
2383    let mut events = thread
2384        .update(cx, |thread, cx| {
2385            thread.add_tool(crate::TerminalTool::new(
2386                thread.project().clone(),
2387                environment.clone(),
2388            ));
2389            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2390        })
2391        .unwrap();
2392
2393    cx.run_until_parked();
2394
2395    // Simulate the model calling two terminal tools
2396    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2397        LanguageModelToolUse {
2398            id: "terminal_tool_1".into(),
2399            name: TerminalTool::NAME.into(),
2400            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2401            input: json!({"command": "sleep 1000", "cd": "."}),
2402            is_input_complete: true,
2403            thought_signature: None,
2404        },
2405    ));
2406    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2407        LanguageModelToolUse {
2408            id: "terminal_tool_2".into(),
2409            name: TerminalTool::NAME.into(),
2410            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2411            input: json!({"command": "sleep 2000", "cd": "."}),
2412            is_input_complete: true,
2413            thought_signature: None,
2414        },
2415    ));
2416    fake_model.end_last_completion_stream();
2417
2418    // Wait for both terminal tools to start by counting terminal content updates
2419    let mut terminals_started = 0;
2420    let deadline = cx.executor().num_cpus() * 100;
2421    for _ in 0..deadline {
2422        cx.run_until_parked();
2423
2424        while let Some(Some(event)) = events.next().now_or_never() {
2425            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2426                update,
2427            ))) = &event
2428            {
2429                if update.fields.content.as_ref().is_some_and(|content| {
2430                    content
2431                        .iter()
2432                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2433                }) {
2434                    terminals_started += 1;
2435                    if terminals_started >= 2 {
2436                        break;
2437                    }
2438                }
2439            }
2440        }
2441        if terminals_started >= 2 {
2442            break;
2443        }
2444
2445        cx.background_executor
2446            .timer(Duration::from_millis(10))
2447            .await;
2448    }
2449    assert!(
2450        terminals_started >= 2,
2451        "expected 2 terminal tools to start, got {terminals_started}"
2452    );
2453
2454    // Cancel the thread while both terminals are running
2455    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2456
2457    // Collect remaining events
2458    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2459
2460    // Verify both terminal handles were killed
2461    let handles = environment.handles();
2462    assert_eq!(
2463        handles.len(),
2464        2,
2465        "expected 2 terminal handles to be created"
2466    );
2467    assert!(
2468        handles[0].was_killed(),
2469        "expected first terminal handle to be killed on cancellation"
2470    );
2471    assert!(
2472        handles[1].was_killed(),
2473        "expected second terminal handle to be killed on cancellation"
2474    );
2475
2476    // Verify we got a cancellation stop event
2477    assert_eq!(
2478        stop_events(remaining_events),
2479        vec![acp::StopReason::Cancelled],
2480    );
2481}
2482
2483#[gpui::test]
2484async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2485    // Tests that clicking the stop button on the terminal card (as opposed to the main
2486    // cancel button) properly reports user stopped via the was_stopped_by_user path.
2487    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2488    always_allow_tools(cx);
2489    let fake_model = model.as_fake();
2490
2491    let environment = Rc::new(cx.update(|cx| {
2492        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2493    }));
2494    let handle = environment.terminal_handle.clone().unwrap();
2495
2496    let mut events = thread
2497        .update(cx, |thread, cx| {
2498            thread.add_tool(crate::TerminalTool::new(
2499                thread.project().clone(),
2500                environment,
2501            ));
2502            thread.send(UserMessageId::new(), ["run a command"], cx)
2503        })
2504        .unwrap();
2505
2506    cx.run_until_parked();
2507
2508    // Simulate the model calling the terminal tool
2509    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2510        LanguageModelToolUse {
2511            id: "terminal_tool_1".into(),
2512            name: TerminalTool::NAME.into(),
2513            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2514            input: json!({"command": "sleep 1000", "cd": "."}),
2515            is_input_complete: true,
2516            thought_signature: None,
2517        },
2518    ));
2519    fake_model.end_last_completion_stream();
2520
2521    // Wait for the terminal tool to start running
2522    wait_for_terminal_tool_started(&mut events, cx).await;
2523
2524    // Simulate user clicking stop on the terminal card itself.
2525    // This sets the flag and signals exit (simulating what the real UI would do).
2526    handle.set_stopped_by_user(true);
2527    handle.killed.store(true, Ordering::SeqCst);
2528    handle.signal_exit();
2529
2530    // Wait for the tool to complete
2531    cx.run_until_parked();
2532
2533    // The thread continues after tool completion - simulate the model ending its turn
2534    fake_model
2535        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2536    fake_model.end_last_completion_stream();
2537
2538    // Collect remaining events
2539    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2540
2541    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2542    assert_eq!(
2543        stop_events(remaining_events),
2544        vec![acp::StopReason::EndTurn],
2545    );
2546
2547    // Verify the tool result indicates user stopped
2548    thread.update(cx, |thread, _cx| {
2549        let message = thread.last_received_or_pending_message().unwrap();
2550        let agent_message = message.as_agent_message().unwrap();
2551
2552        let tool_use = agent_message
2553            .content
2554            .iter()
2555            .find_map(|content| match content {
2556                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2557                _ => None,
2558            })
2559            .expect("expected tool use in agent message");
2560
2561        let tool_result = agent_message
2562            .tool_results
2563            .get(&tool_use.id)
2564            .expect("expected tool result");
2565
2566        let result_text = match &tool_result.content {
2567            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2568            _ => panic!("expected text content in tool result"),
2569        };
2570
2571        assert!(
2572            result_text.contains("The user stopped this command"),
2573            "expected tool result to indicate user stopped, got: {result_text}"
2574        );
2575    });
2576}
2577
2578#[gpui::test]
2579async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2580    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2581    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2582    always_allow_tools(cx);
2583    let fake_model = model.as_fake();
2584
2585    let environment = Rc::new(cx.update(|cx| {
2586        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2587    }));
2588    let handle = environment.terminal_handle.clone().unwrap();
2589
2590    let mut events = thread
2591        .update(cx, |thread, cx| {
2592            thread.add_tool(crate::TerminalTool::new(
2593                thread.project().clone(),
2594                environment,
2595            ));
2596            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2597        })
2598        .unwrap();
2599
2600    cx.run_until_parked();
2601
2602    // Simulate the model calling the terminal tool with a short timeout
2603    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2604        LanguageModelToolUse {
2605            id: "terminal_tool_1".into(),
2606            name: TerminalTool::NAME.into(),
2607            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2608            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2609            is_input_complete: true,
2610            thought_signature: None,
2611        },
2612    ));
2613    fake_model.end_last_completion_stream();
2614
2615    // Wait for the terminal tool to start running
2616    wait_for_terminal_tool_started(&mut events, cx).await;
2617
2618    // Advance clock past the timeout
2619    cx.executor().advance_clock(Duration::from_millis(200));
2620    cx.run_until_parked();
2621
2622    // The thread continues after tool completion - simulate the model ending its turn
2623    fake_model
2624        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2625    fake_model.end_last_completion_stream();
2626
2627    // Collect remaining events
2628    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2629
2630    // Verify the terminal was killed due to timeout
2631    assert!(
2632        handle.was_killed(),
2633        "expected terminal handle to be killed on timeout"
2634    );
2635
2636    // Verify we got an EndTurn (the tool completed, just with timeout)
2637    assert_eq!(
2638        stop_events(remaining_events),
2639        vec![acp::StopReason::EndTurn],
2640    );
2641
2642    // Verify the tool result indicates timeout, not user stopped
2643    thread.update(cx, |thread, _cx| {
2644        let message = thread.last_received_or_pending_message().unwrap();
2645        let agent_message = message.as_agent_message().unwrap();
2646
2647        let tool_use = agent_message
2648            .content
2649            .iter()
2650            .find_map(|content| match content {
2651                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2652                _ => None,
2653            })
2654            .expect("expected tool use in agent message");
2655
2656        let tool_result = agent_message
2657            .tool_results
2658            .get(&tool_use.id)
2659            .expect("expected tool result");
2660
2661        let result_text = match &tool_result.content {
2662            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2663            _ => panic!("expected text content in tool result"),
2664        };
2665
2666        assert!(
2667            result_text.contains("timed out"),
2668            "expected tool result to indicate timeout, got: {result_text}"
2669        );
2670        assert!(
2671            !result_text.contains("The user stopped"),
2672            "tool result should not mention user stopped when it timed out, got: {result_text}"
2673        );
2674    });
2675}
2676
2677#[gpui::test]
2678async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2679    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2680    let fake_model = model.as_fake();
2681
2682    let events_1 = thread
2683        .update(cx, |thread, cx| {
2684            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2685        })
2686        .unwrap();
2687    cx.run_until_parked();
2688    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2689    cx.run_until_parked();
2690
2691    let events_2 = thread
2692        .update(cx, |thread, cx| {
2693            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2694        })
2695        .unwrap();
2696    cx.run_until_parked();
2697    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2698    fake_model
2699        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2700    fake_model.end_last_completion_stream();
2701
2702    let events_1 = events_1.collect::<Vec<_>>().await;
2703    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2704    let events_2 = events_2.collect::<Vec<_>>().await;
2705    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2706}
2707
2708#[gpui::test]
2709async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2710    // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2711    // the retry loop waits on a timer. If the user switches models and sends a new message
2712    // during that delay, the old turn should exit immediately instead of retrying with the
2713    // stale model.
2714    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2715    let model_a = model.as_fake();
2716
2717    // Start a turn with model_a.
2718    let events_1 = thread
2719        .update(cx, |thread, cx| {
2720            thread.send(UserMessageId::new(), ["Hello"], cx)
2721        })
2722        .unwrap();
2723    cx.run_until_parked();
2724    assert_eq!(model_a.completion_count(), 1);
2725
2726    // Model returns a retryable upstream 500. The turn enters the retry delay.
2727    model_a.send_last_completion_stream_error(
2728        LanguageModelCompletionError::UpstreamProviderError {
2729            message: "Internal server error".to_string(),
2730            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2731            retry_after: None,
2732        },
2733    );
2734    model_a.end_last_completion_stream();
2735    cx.run_until_parked();
2736
2737    // The old completion was consumed; model_a has no pending requests yet because the
2738    // retry timer hasn't fired.
2739    assert_eq!(model_a.completion_count(), 0);
2740
2741    // Switch to model_b and send a new message. This cancels the old turn.
2742    let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2743        "fake", "model-b", "Model B", false,
2744    ));
2745    thread.update(cx, |thread, cx| {
2746        thread.set_model(model_b.clone(), cx);
2747    });
2748    let events_2 = thread
2749        .update(cx, |thread, cx| {
2750            thread.send(UserMessageId::new(), ["Continue"], cx)
2751        })
2752        .unwrap();
2753    cx.run_until_parked();
2754
2755    // model_b should have received its completion request.
2756    assert_eq!(model_b.as_fake().completion_count(), 1);
2757
2758    // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2759    cx.executor().advance_clock(Duration::from_secs(10));
2760    cx.run_until_parked();
2761
2762    // model_a must NOT have received another completion request — the cancelled turn
2763    // should have exited during the retry delay rather than retrying with the old model.
2764    assert_eq!(
2765        model_a.completion_count(),
2766        0,
2767        "old model should not receive a retry request after cancellation"
2768    );
2769
2770    // Complete model_b's turn.
2771    model_b
2772        .as_fake()
2773        .send_last_completion_stream_text_chunk("Done!");
2774    model_b
2775        .as_fake()
2776        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2777    model_b.as_fake().end_last_completion_stream();
2778
2779    let events_1 = events_1.collect::<Vec<_>>().await;
2780    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2781
2782    let events_2 = events_2.collect::<Vec<_>>().await;
2783    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2784}
2785
2786#[gpui::test]
2787async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2788    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2789    let fake_model = model.as_fake();
2790
2791    let events_1 = thread
2792        .update(cx, |thread, cx| {
2793            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2794        })
2795        .unwrap();
2796    cx.run_until_parked();
2797    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2798    fake_model
2799        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2800    fake_model.end_last_completion_stream();
2801    let events_1 = events_1.collect::<Vec<_>>().await;
2802
2803    let events_2 = thread
2804        .update(cx, |thread, cx| {
2805            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2806        })
2807        .unwrap();
2808    cx.run_until_parked();
2809    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2810    fake_model
2811        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2812    fake_model.end_last_completion_stream();
2813    let events_2 = events_2.collect::<Vec<_>>().await;
2814
2815    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2816    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2817}
2818
2819#[gpui::test]
2820async fn test_refusal(cx: &mut TestAppContext) {
2821    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2822    let fake_model = model.as_fake();
2823
2824    let events = thread
2825        .update(cx, |thread, cx| {
2826            thread.send(UserMessageId::new(), ["Hello"], cx)
2827        })
2828        .unwrap();
2829    cx.run_until_parked();
2830    thread.read_with(cx, |thread, _| {
2831        assert_eq!(
2832            thread.to_markdown(),
2833            indoc! {"
2834                ## User
2835
2836                Hello
2837            "}
2838        );
2839    });
2840
2841    fake_model.send_last_completion_stream_text_chunk("Hey!");
2842    cx.run_until_parked();
2843    thread.read_with(cx, |thread, _| {
2844        assert_eq!(
2845            thread.to_markdown(),
2846            indoc! {"
2847                ## User
2848
2849                Hello
2850
2851                ## Assistant
2852
2853                Hey!
2854            "}
2855        );
2856    });
2857
2858    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2859    fake_model
2860        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2861    let events = events.collect::<Vec<_>>().await;
2862    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2863    thread.read_with(cx, |thread, _| {
2864        assert_eq!(thread.to_markdown(), "");
2865    });
2866}
2867
2868#[gpui::test]
2869async fn test_truncate_first_message(cx: &mut TestAppContext) {
2870    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2871    let fake_model = model.as_fake();
2872
2873    let message_id = UserMessageId::new();
2874    thread
2875        .update(cx, |thread, cx| {
2876            thread.send(message_id.clone(), ["Hello"], cx)
2877        })
2878        .unwrap();
2879    cx.run_until_parked();
2880    thread.read_with(cx, |thread, _| {
2881        assert_eq!(
2882            thread.to_markdown(),
2883            indoc! {"
2884                ## User
2885
2886                Hello
2887            "}
2888        );
2889        assert_eq!(thread.latest_token_usage(), None);
2890    });
2891
2892    fake_model.send_last_completion_stream_text_chunk("Hey!");
2893    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2894        language_model::TokenUsage {
2895            input_tokens: 32_000,
2896            output_tokens: 16_000,
2897            cache_creation_input_tokens: 0,
2898            cache_read_input_tokens: 0,
2899        },
2900    ));
2901    cx.run_until_parked();
2902    thread.read_with(cx, |thread, _| {
2903        assert_eq!(
2904            thread.to_markdown(),
2905            indoc! {"
2906                ## User
2907
2908                Hello
2909
2910                ## Assistant
2911
2912                Hey!
2913            "}
2914        );
2915        assert_eq!(
2916            thread.latest_token_usage(),
2917            Some(acp_thread::TokenUsage {
2918                used_tokens: 32_000 + 16_000,
2919                max_tokens: 1_000_000,
2920                max_output_tokens: None,
2921                input_tokens: 32_000,
2922                output_tokens: 16_000,
2923            })
2924        );
2925    });
2926
2927    thread
2928        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2929        .unwrap();
2930    cx.run_until_parked();
2931    thread.read_with(cx, |thread, _| {
2932        assert_eq!(thread.to_markdown(), "");
2933        assert_eq!(thread.latest_token_usage(), None);
2934    });
2935
2936    // Ensure we can still send a new message after truncation.
2937    thread
2938        .update(cx, |thread, cx| {
2939            thread.send(UserMessageId::new(), ["Hi"], cx)
2940        })
2941        .unwrap();
2942    thread.update(cx, |thread, _cx| {
2943        assert_eq!(
2944            thread.to_markdown(),
2945            indoc! {"
2946                ## User
2947
2948                Hi
2949            "}
2950        );
2951    });
2952    cx.run_until_parked();
2953    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2954    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2955        language_model::TokenUsage {
2956            input_tokens: 40_000,
2957            output_tokens: 20_000,
2958            cache_creation_input_tokens: 0,
2959            cache_read_input_tokens: 0,
2960        },
2961    ));
2962    cx.run_until_parked();
2963    thread.read_with(cx, |thread, _| {
2964        assert_eq!(
2965            thread.to_markdown(),
2966            indoc! {"
2967                ## User
2968
2969                Hi
2970
2971                ## Assistant
2972
2973                Ahoy!
2974            "}
2975        );
2976
2977        assert_eq!(
2978            thread.latest_token_usage(),
2979            Some(acp_thread::TokenUsage {
2980                used_tokens: 40_000 + 20_000,
2981                max_tokens: 1_000_000,
2982                max_output_tokens: None,
2983                input_tokens: 40_000,
2984                output_tokens: 20_000,
2985            })
2986        );
2987    });
2988}
2989
2990#[gpui::test]
2991async fn test_truncate_second_message(cx: &mut TestAppContext) {
2992    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2993    let fake_model = model.as_fake();
2994
2995    thread
2996        .update(cx, |thread, cx| {
2997            thread.send(UserMessageId::new(), ["Message 1"], cx)
2998        })
2999        .unwrap();
3000    cx.run_until_parked();
3001    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3002    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3003        language_model::TokenUsage {
3004            input_tokens: 32_000,
3005            output_tokens: 16_000,
3006            cache_creation_input_tokens: 0,
3007            cache_read_input_tokens: 0,
3008        },
3009    ));
3010    fake_model.end_last_completion_stream();
3011    cx.run_until_parked();
3012
3013    let assert_first_message_state = |cx: &mut TestAppContext| {
3014        thread.clone().read_with(cx, |thread, _| {
3015            assert_eq!(
3016                thread.to_markdown(),
3017                indoc! {"
3018                    ## User
3019
3020                    Message 1
3021
3022                    ## Assistant
3023
3024                    Message 1 response
3025                "}
3026            );
3027
3028            assert_eq!(
3029                thread.latest_token_usage(),
3030                Some(acp_thread::TokenUsage {
3031                    used_tokens: 32_000 + 16_000,
3032                    max_tokens: 1_000_000,
3033                    max_output_tokens: None,
3034                    input_tokens: 32_000,
3035                    output_tokens: 16_000,
3036                })
3037            );
3038        });
3039    };
3040
3041    assert_first_message_state(cx);
3042
3043    let second_message_id = UserMessageId::new();
3044    thread
3045        .update(cx, |thread, cx| {
3046            thread.send(second_message_id.clone(), ["Message 2"], cx)
3047        })
3048        .unwrap();
3049    cx.run_until_parked();
3050
3051    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3052    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3053        language_model::TokenUsage {
3054            input_tokens: 40_000,
3055            output_tokens: 20_000,
3056            cache_creation_input_tokens: 0,
3057            cache_read_input_tokens: 0,
3058        },
3059    ));
3060    fake_model.end_last_completion_stream();
3061    cx.run_until_parked();
3062
3063    thread.read_with(cx, |thread, _| {
3064        assert_eq!(
3065            thread.to_markdown(),
3066            indoc! {"
3067                ## User
3068
3069                Message 1
3070
3071                ## Assistant
3072
3073                Message 1 response
3074
3075                ## User
3076
3077                Message 2
3078
3079                ## Assistant
3080
3081                Message 2 response
3082            "}
3083        );
3084
3085        assert_eq!(
3086            thread.latest_token_usage(),
3087            Some(acp_thread::TokenUsage {
3088                used_tokens: 40_000 + 20_000,
3089                max_tokens: 1_000_000,
3090                max_output_tokens: None,
3091                input_tokens: 40_000,
3092                output_tokens: 20_000,
3093            })
3094        );
3095    });
3096
3097    thread
3098        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3099        .unwrap();
3100    cx.run_until_parked();
3101
3102    assert_first_message_state(cx);
3103}
3104
3105#[gpui::test]
3106async fn test_title_generation(cx: &mut TestAppContext) {
3107    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3108    let fake_model = model.as_fake();
3109
3110    let summary_model = Arc::new(FakeLanguageModel::default());
3111    thread.update(cx, |thread, cx| {
3112        thread.set_summarization_model(Some(summary_model.clone()), cx)
3113    });
3114
3115    let send = thread
3116        .update(cx, |thread, cx| {
3117            thread.send(UserMessageId::new(), ["Hello"], cx)
3118        })
3119        .unwrap();
3120    cx.run_until_parked();
3121
3122    fake_model.send_last_completion_stream_text_chunk("Hey!");
3123    fake_model.end_last_completion_stream();
3124    cx.run_until_parked();
3125    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), None));
3126
3127    // Ensure the summary model has been invoked to generate a title.
3128    summary_model.send_last_completion_stream_text_chunk("Hello ");
3129    summary_model.send_last_completion_stream_text_chunk("world\nG");
3130    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3131    summary_model.end_last_completion_stream();
3132    send.collect::<Vec<_>>().await;
3133    cx.run_until_parked();
3134    thread.read_with(cx, |thread, _| {
3135        assert_eq!(thread.title(), Some("Hello world".into()))
3136    });
3137
3138    // Send another message, ensuring no title is generated this time.
3139    let send = thread
3140        .update(cx, |thread, cx| {
3141            thread.send(UserMessageId::new(), ["Hello again"], cx)
3142        })
3143        .unwrap();
3144    cx.run_until_parked();
3145    fake_model.send_last_completion_stream_text_chunk("Hey again!");
3146    fake_model.end_last_completion_stream();
3147    cx.run_until_parked();
3148    assert_eq!(summary_model.pending_completions(), Vec::new());
3149    send.collect::<Vec<_>>().await;
3150    thread.read_with(cx, |thread, _| {
3151        assert_eq!(thread.title(), Some("Hello world".into()))
3152    });
3153}
3154
3155#[gpui::test]
3156async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3157    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3158    let fake_model = model.as_fake();
3159
3160    let _events = thread
3161        .update(cx, |thread, cx| {
3162            thread.add_tool(ToolRequiringPermission);
3163            thread.add_tool(EchoTool);
3164            thread.send(UserMessageId::new(), ["Hey!"], cx)
3165        })
3166        .unwrap();
3167    cx.run_until_parked();
3168
3169    let permission_tool_use = LanguageModelToolUse {
3170        id: "tool_id_1".into(),
3171        name: ToolRequiringPermission::NAME.into(),
3172        raw_input: "{}".into(),
3173        input: json!({}),
3174        is_input_complete: true,
3175        thought_signature: None,
3176    };
3177    let echo_tool_use = LanguageModelToolUse {
3178        id: "tool_id_2".into(),
3179        name: EchoTool::NAME.into(),
3180        raw_input: json!({"text": "test"}).to_string(),
3181        input: json!({"text": "test"}),
3182        is_input_complete: true,
3183        thought_signature: None,
3184    };
3185    fake_model.send_last_completion_stream_text_chunk("Hi!");
3186    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3187        permission_tool_use,
3188    ));
3189    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3190        echo_tool_use.clone(),
3191    ));
3192    fake_model.end_last_completion_stream();
3193    cx.run_until_parked();
3194
3195    // Ensure pending tools are skipped when building a request.
3196    let request = thread
3197        .read_with(cx, |thread, cx| {
3198            thread.build_completion_request(CompletionIntent::EditFile, cx)
3199        })
3200        .unwrap();
3201    assert_eq!(
3202        request.messages[1..],
3203        vec![
3204            LanguageModelRequestMessage {
3205                role: Role::User,
3206                content: vec!["Hey!".into()],
3207                cache: true,
3208                reasoning_details: None,
3209            },
3210            LanguageModelRequestMessage {
3211                role: Role::Assistant,
3212                content: vec![
3213                    MessageContent::Text("Hi!".into()),
3214                    MessageContent::ToolUse(echo_tool_use.clone())
3215                ],
3216                cache: false,
3217                reasoning_details: None,
3218            },
3219            LanguageModelRequestMessage {
3220                role: Role::User,
3221                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3222                    tool_use_id: echo_tool_use.id.clone(),
3223                    tool_name: echo_tool_use.name,
3224                    is_error: false,
3225                    content: "test".into(),
3226                    output: Some("test".into())
3227                })],
3228                cache: false,
3229                reasoning_details: None,
3230            },
3231        ],
3232    );
3233}
3234
3235#[gpui::test]
3236async fn test_agent_connection(cx: &mut TestAppContext) {
3237    cx.update(settings::init);
3238    let templates = Templates::new();
3239
3240    // Initialize language model system with test provider
3241    cx.update(|cx| {
3242        gpui_tokio::init(cx);
3243
3244        let http_client = FakeHttpClient::with_404_response();
3245        let clock = Arc::new(clock::FakeSystemClock::new());
3246        let client = Client::new(clock, http_client, cx);
3247        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3248        language_model::init(user_store.clone(), client.clone(), cx);
3249        language_models::init(user_store, client.clone(), cx);
3250        LanguageModelRegistry::test(cx);
3251    });
3252    cx.executor().forbid_parking();
3253
3254    // Create a project for new_thread
3255    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3256    fake_fs.insert_tree(path!("/test"), json!({})).await;
3257    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3258    let cwd = PathList::new(&[Path::new("/test")]);
3259    let thread_store = cx.new(|cx| ThreadStore::new(cx));
3260
3261    // Create agent and connection
3262    let agent = cx
3263        .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3264    let connection = NativeAgentConnection(agent.clone());
3265
3266    // Create a thread using new_thread
3267    let connection_rc = Rc::new(connection.clone());
3268    let acp_thread = cx
3269        .update(|cx| connection_rc.new_session(project, cwd, cx))
3270        .await
3271        .expect("new_thread should succeed");
3272
3273    // Get the session_id from the AcpThread
3274    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3275
3276    // Test model_selector returns Some
3277    let selector_opt = connection.model_selector(&session_id);
3278    assert!(
3279        selector_opt.is_some(),
3280        "agent should always support ModelSelector"
3281    );
3282    let selector = selector_opt.unwrap();
3283
3284    // Test list_models
3285    let listed_models = cx
3286        .update(|cx| selector.list_models(cx))
3287        .await
3288        .expect("list_models should succeed");
3289    let AgentModelList::Grouped(listed_models) = listed_models else {
3290        panic!("Unexpected model list type");
3291    };
3292    assert!(!listed_models.is_empty(), "should have at least one model");
3293    assert_eq!(
3294        listed_models[&AgentModelGroupName("Fake".into())][0]
3295            .id
3296            .0
3297            .as_ref(),
3298        "fake/fake"
3299    );
3300
3301    // Test selected_model returns the default
3302    let model = cx
3303        .update(|cx| selector.selected_model(cx))
3304        .await
3305        .expect("selected_model should succeed");
3306    let model = cx
3307        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3308        .unwrap();
3309    let model = model.as_fake();
3310    assert_eq!(model.id().0, "fake", "should return default model");
3311
3312    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3313    cx.run_until_parked();
3314    model.send_last_completion_stream_text_chunk("def");
3315    cx.run_until_parked();
3316    acp_thread.read_with(cx, |thread, cx| {
3317        assert_eq!(
3318            thread.to_markdown(cx),
3319            indoc! {"
3320                ## User
3321
3322                abc
3323
3324                ## Assistant
3325
3326                def
3327
3328            "}
3329        )
3330    });
3331
3332    // Test cancel
3333    cx.update(|cx| connection.cancel(&session_id, cx));
3334    request.await.expect("prompt should fail gracefully");
3335
3336    // Explicitly close the session and drop the ACP thread.
3337    cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3338        .await
3339        .unwrap();
3340    drop(acp_thread);
3341    let result = cx
3342        .update(|cx| {
3343            connection.prompt(
3344                Some(acp_thread::UserMessageId::new()),
3345                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3346                cx,
3347            )
3348        })
3349        .await;
3350    assert_eq!(
3351        result.as_ref().unwrap_err().to_string(),
3352        "Session not found",
3353        "unexpected result: {:?}",
3354        result
3355    );
3356}
3357
3358#[gpui::test]
3359async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3360    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3361    thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3362    let fake_model = model.as_fake();
3363
3364    let mut events = thread
3365        .update(cx, |thread, cx| {
3366            thread.send(UserMessageId::new(), ["Echo something"], cx)
3367        })
3368        .unwrap();
3369    cx.run_until_parked();
3370
3371    // Simulate streaming partial input.
3372    let input = json!({});
3373    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3374        LanguageModelToolUse {
3375            id: "1".into(),
3376            name: EchoTool::NAME.into(),
3377            raw_input: input.to_string(),
3378            input,
3379            is_input_complete: false,
3380            thought_signature: None,
3381        },
3382    ));
3383
3384    // Input streaming completed
3385    let input = json!({ "text": "Hello!" });
3386    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3387        LanguageModelToolUse {
3388            id: "1".into(),
3389            name: "echo".into(),
3390            raw_input: input.to_string(),
3391            input,
3392            is_input_complete: true,
3393            thought_signature: None,
3394        },
3395    ));
3396    fake_model.end_last_completion_stream();
3397    cx.run_until_parked();
3398
3399    let tool_call = expect_tool_call(&mut events).await;
3400    assert_eq!(
3401        tool_call,
3402        acp::ToolCall::new("1", "Echo")
3403            .raw_input(json!({}))
3404            .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3405    );
3406    let update = expect_tool_call_update_fields(&mut events).await;
3407    assert_eq!(
3408        update,
3409        acp::ToolCallUpdate::new(
3410            "1",
3411            acp::ToolCallUpdateFields::new()
3412                .title("Echo")
3413                .kind(acp::ToolKind::Other)
3414                .raw_input(json!({ "text": "Hello!"}))
3415        )
3416    );
3417    let update = expect_tool_call_update_fields(&mut events).await;
3418    assert_eq!(
3419        update,
3420        acp::ToolCallUpdate::new(
3421            "1",
3422            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3423        )
3424    );
3425    let update = expect_tool_call_update_fields(&mut events).await;
3426    assert_eq!(
3427        update,
3428        acp::ToolCallUpdate::new(
3429            "1",
3430            acp::ToolCallUpdateFields::new()
3431                .status(acp::ToolCallStatus::Completed)
3432                .raw_output("Hello!")
3433        )
3434    );
3435}
3436
3437#[gpui::test]
3438async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3439    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3440    thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3441    let fake_model = model.as_fake();
3442
3443    let mut events = thread
3444        .update(cx, |thread, cx| {
3445            thread.send(UserMessageId::new(), ["Make a plan"], cx)
3446        })
3447        .unwrap();
3448    cx.run_until_parked();
3449
3450    let input = json!({
3451        "plan": [
3452            {
3453                "step": "Inspect the code",
3454                "status": "completed",
3455                "priority": "high"
3456            },
3457            {
3458                "step": "Implement the tool",
3459                "status": "in_progress"
3460            },
3461            {
3462                "step": "Run tests",
3463                "status": "pending",
3464                "priority": "low"
3465            }
3466        ]
3467    });
3468    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3469        LanguageModelToolUse {
3470            id: "plan_1".into(),
3471            name: UpdatePlanTool::NAME.into(),
3472            raw_input: input.to_string(),
3473            input,
3474            is_input_complete: true,
3475            thought_signature: None,
3476        },
3477    ));
3478    fake_model.end_last_completion_stream();
3479    cx.run_until_parked();
3480
3481    let tool_call = expect_tool_call(&mut events).await;
3482    assert_eq!(
3483        tool_call,
3484        acp::ToolCall::new("plan_1", "Update plan")
3485            .kind(acp::ToolKind::Think)
3486            .raw_input(json!({
3487                "plan": [
3488                    {
3489                        "step": "Inspect the code",
3490                        "status": "completed",
3491                        "priority": "high"
3492                    },
3493                    {
3494                        "step": "Implement the tool",
3495                        "status": "in_progress"
3496                    },
3497                    {
3498                        "step": "Run tests",
3499                        "status": "pending",
3500                        "priority": "low"
3501                    }
3502                ]
3503            }))
3504            .meta(acp::Meta::from_iter([(
3505                "tool_name".into(),
3506                "update_plan".into()
3507            )]))
3508    );
3509
3510    let update = expect_tool_call_update_fields(&mut events).await;
3511    assert_eq!(
3512        update,
3513        acp::ToolCallUpdate::new(
3514            "plan_1",
3515            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3516        )
3517    );
3518
3519    let plan = expect_plan(&mut events).await;
3520    assert_eq!(
3521        plan,
3522        acp::Plan::new(vec![
3523            acp::PlanEntry::new(
3524                "Inspect the code",
3525                acp::PlanEntryPriority::High,
3526                acp::PlanEntryStatus::Completed,
3527            ),
3528            acp::PlanEntry::new(
3529                "Implement the tool",
3530                acp::PlanEntryPriority::Medium,
3531                acp::PlanEntryStatus::InProgress,
3532            ),
3533            acp::PlanEntry::new(
3534                "Run tests",
3535                acp::PlanEntryPriority::Low,
3536                acp::PlanEntryStatus::Pending,
3537            ),
3538        ])
3539    );
3540
3541    let update = expect_tool_call_update_fields(&mut events).await;
3542    assert_eq!(
3543        update,
3544        acp::ToolCallUpdate::new(
3545            "plan_1",
3546            acp::ToolCallUpdateFields::new()
3547                .status(acp::ToolCallStatus::Completed)
3548                .raw_output("Plan updated")
3549        )
3550    );
3551}
3552
3553#[gpui::test]
3554async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3555    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3556    let fake_model = model.as_fake();
3557
3558    let mut events = thread
3559        .update(cx, |thread, cx| {
3560            thread.send(UserMessageId::new(), ["Hello!"], cx)
3561        })
3562        .unwrap();
3563    cx.run_until_parked();
3564
3565    fake_model.send_last_completion_stream_text_chunk("Hey!");
3566    fake_model.end_last_completion_stream();
3567
3568    let mut retry_events = Vec::new();
3569    while let Some(Ok(event)) = events.next().await {
3570        match event {
3571            ThreadEvent::Retry(retry_status) => {
3572                retry_events.push(retry_status);
3573            }
3574            ThreadEvent::Stop(..) => break,
3575            _ => {}
3576        }
3577    }
3578
3579    assert_eq!(retry_events.len(), 0);
3580    thread.read_with(cx, |thread, _cx| {
3581        assert_eq!(
3582            thread.to_markdown(),
3583            indoc! {"
3584                ## User
3585
3586                Hello!
3587
3588                ## Assistant
3589
3590                Hey!
3591            "}
3592        )
3593    });
3594}
3595
3596#[gpui::test]
3597async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3598    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3599    let fake_model = model.as_fake();
3600
3601    let mut events = thread
3602        .update(cx, |thread, cx| {
3603            thread.send(UserMessageId::new(), ["Hello!"], cx)
3604        })
3605        .unwrap();
3606    cx.run_until_parked();
3607
3608    fake_model.send_last_completion_stream_text_chunk("Hey,");
3609    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3610        provider: LanguageModelProviderName::new("Anthropic"),
3611        retry_after: Some(Duration::from_secs(3)),
3612    });
3613    fake_model.end_last_completion_stream();
3614
3615    cx.executor().advance_clock(Duration::from_secs(3));
3616    cx.run_until_parked();
3617
3618    fake_model.send_last_completion_stream_text_chunk("there!");
3619    fake_model.end_last_completion_stream();
3620    cx.run_until_parked();
3621
3622    let mut retry_events = Vec::new();
3623    while let Some(Ok(event)) = events.next().await {
3624        match event {
3625            ThreadEvent::Retry(retry_status) => {
3626                retry_events.push(retry_status);
3627            }
3628            ThreadEvent::Stop(..) => break,
3629            _ => {}
3630        }
3631    }
3632
3633    assert_eq!(retry_events.len(), 1);
3634    assert!(matches!(
3635        retry_events[0],
3636        acp_thread::RetryStatus { attempt: 1, .. }
3637    ));
3638    thread.read_with(cx, |thread, _cx| {
3639        assert_eq!(
3640            thread.to_markdown(),
3641            indoc! {"
3642                ## User
3643
3644                Hello!
3645
3646                ## Assistant
3647
3648                Hey,
3649
3650                [resume]
3651
3652                ## Assistant
3653
3654                there!
3655            "}
3656        )
3657    });
3658}
3659
3660#[gpui::test]
3661async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3662    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3663    let fake_model = model.as_fake();
3664
3665    let events = thread
3666        .update(cx, |thread, cx| {
3667            thread.add_tool(EchoTool);
3668            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3669        })
3670        .unwrap();
3671    cx.run_until_parked();
3672
3673    let tool_use_1 = LanguageModelToolUse {
3674        id: "tool_1".into(),
3675        name: EchoTool::NAME.into(),
3676        raw_input: json!({"text": "test"}).to_string(),
3677        input: json!({"text": "test"}),
3678        is_input_complete: true,
3679        thought_signature: None,
3680    };
3681    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3682        tool_use_1.clone(),
3683    ));
3684    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3685        provider: LanguageModelProviderName::new("Anthropic"),
3686        retry_after: Some(Duration::from_secs(3)),
3687    });
3688    fake_model.end_last_completion_stream();
3689
3690    cx.executor().advance_clock(Duration::from_secs(3));
3691    let completion = fake_model.pending_completions().pop().unwrap();
3692    assert_eq!(
3693        completion.messages[1..],
3694        vec![
3695            LanguageModelRequestMessage {
3696                role: Role::User,
3697                content: vec!["Call the echo tool!".into()],
3698                cache: false,
3699                reasoning_details: None,
3700            },
3701            LanguageModelRequestMessage {
3702                role: Role::Assistant,
3703                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3704                cache: false,
3705                reasoning_details: None,
3706            },
3707            LanguageModelRequestMessage {
3708                role: Role::User,
3709                content: vec![language_model::MessageContent::ToolResult(
3710                    LanguageModelToolResult {
3711                        tool_use_id: tool_use_1.id.clone(),
3712                        tool_name: tool_use_1.name.clone(),
3713                        is_error: false,
3714                        content: "test".into(),
3715                        output: Some("test".into())
3716                    }
3717                )],
3718                cache: true,
3719                reasoning_details: None,
3720            },
3721        ]
3722    );
3723
3724    fake_model.send_last_completion_stream_text_chunk("Done");
3725    fake_model.end_last_completion_stream();
3726    cx.run_until_parked();
3727    events.collect::<Vec<_>>().await;
3728    thread.read_with(cx, |thread, _cx| {
3729        assert_eq!(
3730            thread.last_received_or_pending_message(),
3731            Some(Message::Agent(AgentMessage {
3732                content: vec![AgentMessageContent::Text("Done".into())],
3733                tool_results: IndexMap::default(),
3734                reasoning_details: None,
3735            }))
3736        );
3737    })
3738}
3739
3740#[gpui::test]
3741async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3742    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3743    let fake_model = model.as_fake();
3744
3745    let mut events = thread
3746        .update(cx, |thread, cx| {
3747            thread.send(UserMessageId::new(), ["Hello!"], cx)
3748        })
3749        .unwrap();
3750    cx.run_until_parked();
3751
3752    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3753        fake_model.send_last_completion_stream_error(
3754            LanguageModelCompletionError::ServerOverloaded {
3755                provider: LanguageModelProviderName::new("Anthropic"),
3756                retry_after: Some(Duration::from_secs(3)),
3757            },
3758        );
3759        fake_model.end_last_completion_stream();
3760        cx.executor().advance_clock(Duration::from_secs(3));
3761        cx.run_until_parked();
3762    }
3763
3764    let mut errors = Vec::new();
3765    let mut retry_events = Vec::new();
3766    while let Some(event) = events.next().await {
3767        match event {
3768            Ok(ThreadEvent::Retry(retry_status)) => {
3769                retry_events.push(retry_status);
3770            }
3771            Ok(ThreadEvent::Stop(..)) => break,
3772            Err(error) => errors.push(error),
3773            _ => {}
3774        }
3775    }
3776
3777    assert_eq!(
3778        retry_events.len(),
3779        crate::thread::MAX_RETRY_ATTEMPTS as usize
3780    );
3781    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3782        assert_eq!(retry_events[i].attempt, i + 1);
3783    }
3784    assert_eq!(errors.len(), 1);
3785    let error = errors[0]
3786        .downcast_ref::<LanguageModelCompletionError>()
3787        .unwrap();
3788    assert!(matches!(
3789        error,
3790        LanguageModelCompletionError::ServerOverloaded { .. }
3791    ));
3792}
3793
3794#[gpui::test]
3795async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3796    cx: &mut TestAppContext,
3797) {
3798    init_test(cx);
3799    always_allow_tools(cx);
3800
3801    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3802    let fake_model = model.as_fake();
3803
3804    thread.update(cx, |thread, _cx| {
3805        thread.add_tool(StreamingEchoTool::new());
3806    });
3807
3808    let _events = thread
3809        .update(cx, |thread, cx| {
3810            thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3811        })
3812        .unwrap();
3813    cx.run_until_parked();
3814
3815    // Send a partial tool use (is_input_complete = false), simulating the LLM
3816    // streaming input for a tool.
3817    let tool_use = LanguageModelToolUse {
3818        id: "tool_1".into(),
3819        name: "streaming_echo".into(),
3820        raw_input: r#"{"text": "partial"}"#.into(),
3821        input: json!({"text": "partial"}),
3822        is_input_complete: false,
3823        thought_signature: None,
3824    };
3825    fake_model
3826        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3827    cx.run_until_parked();
3828
3829    // Send a stream error WITHOUT ever sending is_input_complete = true.
3830    // Before the fix, this would deadlock: the tool waits for more partials
3831    // (or cancellation), run_turn_internal waits for the tool, and the sender
3832    // keeping the channel open lives inside RunningTurn.
3833    fake_model.send_last_completion_stream_error(
3834        LanguageModelCompletionError::UpstreamProviderError {
3835            message: "Internal server error".to_string(),
3836            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3837            retry_after: None,
3838        },
3839    );
3840    fake_model.end_last_completion_stream();
3841
3842    // Advance past the retry delay so run_turn_internal retries.
3843    cx.executor().advance_clock(Duration::from_secs(5));
3844    cx.run_until_parked();
3845
3846    // The retry request should contain the streaming tool's error result,
3847    // proving the tool terminated and its result was forwarded.
3848    let completion = fake_model
3849        .pending_completions()
3850        .pop()
3851        .expect("No running turn");
3852    assert_eq!(
3853        completion.messages[1..],
3854        vec![
3855            LanguageModelRequestMessage {
3856                role: Role::User,
3857                content: vec!["Use the streaming_echo tool".into()],
3858                cache: false,
3859                reasoning_details: None,
3860            },
3861            LanguageModelRequestMessage {
3862                role: Role::Assistant,
3863                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3864                cache: false,
3865                reasoning_details: None,
3866            },
3867            LanguageModelRequestMessage {
3868                role: Role::User,
3869                content: vec![language_model::MessageContent::ToolResult(
3870                    LanguageModelToolResult {
3871                        tool_use_id: tool_use.id.clone(),
3872                        tool_name: tool_use.name,
3873                        is_error: true,
3874                        content: "Failed to receive tool input: tool input was not fully received"
3875                            .into(),
3876                        output: Some(
3877                            "Failed to receive tool input: tool input was not fully received"
3878                                .into()
3879                        ),
3880                    }
3881                )],
3882                cache: true,
3883                reasoning_details: None,
3884            },
3885        ]
3886    );
3887
3888    // Finish the retry round so the turn completes cleanly.
3889    fake_model.send_last_completion_stream_text_chunk("Done");
3890    fake_model.end_last_completion_stream();
3891    cx.run_until_parked();
3892
3893    thread.read_with(cx, |thread, _cx| {
3894        assert!(
3895            thread.is_turn_complete(),
3896            "Thread should not be stuck; the turn should have completed",
3897        );
3898    });
3899}
3900
3901/// Filters out the stop events for asserting against in tests
3902fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3903    result_events
3904        .into_iter()
3905        .filter_map(|event| match event.unwrap() {
3906            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3907            _ => None,
3908        })
3909        .collect()
3910}
3911
3912struct ThreadTest {
3913    model: Arc<dyn LanguageModel>,
3914    thread: Entity<Thread>,
3915    project_context: Entity<ProjectContext>,
3916    context_server_store: Entity<ContextServerStore>,
3917    fs: Arc<FakeFs>,
3918}
3919
3920enum TestModel {
3921    Sonnet4,
3922    Fake,
3923}
3924
3925impl TestModel {
3926    fn id(&self) -> LanguageModelId {
3927        match self {
3928            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3929            TestModel::Fake => unreachable!(),
3930        }
3931    }
3932}
3933
3934async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3935    cx.executor().allow_parking();
3936
3937    let fs = FakeFs::new(cx.background_executor.clone());
3938    fs.create_dir(paths::settings_file().parent().unwrap())
3939        .await
3940        .unwrap();
3941    fs.insert_file(
3942        paths::settings_file(),
3943        json!({
3944            "agent": {
3945                "default_profile": "test-profile",
3946                "profiles": {
3947                    "test-profile": {
3948                        "name": "Test Profile",
3949                        "tools": {
3950                            EchoTool::NAME: true,
3951                            DelayTool::NAME: true,
3952                            WordListTool::NAME: true,
3953                            ToolRequiringPermission::NAME: true,
3954                            InfiniteTool::NAME: true,
3955                            CancellationAwareTool::NAME: true,
3956                            StreamingEchoTool::NAME: true,
3957                            StreamingFailingEchoTool::NAME: true,
3958                            TerminalTool::NAME: true,
3959                            UpdatePlanTool::NAME: true,
3960                        }
3961                    }
3962                }
3963            }
3964        })
3965        .to_string()
3966        .into_bytes(),
3967    )
3968    .await;
3969
3970    cx.update(|cx| {
3971        settings::init(cx);
3972
3973        match model {
3974            TestModel::Fake => {}
3975            TestModel::Sonnet4 => {
3976                gpui_tokio::init(cx);
3977                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3978                cx.set_http_client(Arc::new(http_client));
3979                let client = Client::production(cx);
3980                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3981                language_model::init(user_store.clone(), client.clone(), cx);
3982                language_models::init(user_store, client.clone(), cx);
3983            }
3984        };
3985
3986        watch_settings(fs.clone(), cx);
3987    });
3988
3989    let templates = Templates::new();
3990
3991    fs.insert_tree(path!("/test"), json!({})).await;
3992    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3993
3994    let model = cx
3995        .update(|cx| {
3996            if let TestModel::Fake = model {
3997                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3998            } else {
3999                let model_id = model.id();
4000                let models = LanguageModelRegistry::read_global(cx);
4001                let model = models
4002                    .available_models(cx)
4003                    .find(|model| model.id() == model_id)
4004                    .unwrap();
4005
4006                let provider = models.provider(&model.provider_id()).unwrap();
4007                let authenticated = provider.authenticate(cx);
4008
4009                cx.spawn(async move |_cx| {
4010                    authenticated.await.unwrap();
4011                    model
4012                })
4013            }
4014        })
4015        .await;
4016
4017    let project_context = cx.new(|_cx| ProjectContext::default());
4018    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4019    let context_server_registry =
4020        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4021    let thread = cx.new(|cx| {
4022        Thread::new(
4023            project,
4024            project_context.clone(),
4025            context_server_registry,
4026            templates,
4027            Some(model.clone()),
4028            cx,
4029        )
4030    });
4031    ThreadTest {
4032        model,
4033        thread,
4034        project_context,
4035        context_server_store,
4036        fs,
4037    }
4038}
4039
4040#[cfg(test)]
4041#[ctor::ctor]
4042fn init_logger() {
4043    if std::env::var("RUST_LOG").is_ok() {
4044        env_logger::init();
4045    }
4046}
4047
4048fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4049    let fs = fs.clone();
4050    cx.spawn({
4051        async move |cx| {
4052            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4053                cx.background_executor(),
4054                fs,
4055                paths::settings_file().clone(),
4056            );
4057            let _watcher_task = watcher_task;
4058
4059            while let Some(new_settings_content) = new_settings_content_rx.next().await {
4060                cx.update(|cx| {
4061                    SettingsStore::update_global(cx, |settings, cx| {
4062                        settings.set_user_settings(&new_settings_content, cx)
4063                    })
4064                })
4065                .ok();
4066            }
4067        }
4068    })
4069    .detach();
4070}
4071
4072fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4073    completion
4074        .tools
4075        .iter()
4076        .map(|tool| tool.name.clone())
4077        .collect()
4078}
4079
4080fn setup_context_server(
4081    name: &'static str,
4082    tools: Vec<context_server::types::Tool>,
4083    context_server_store: &Entity<ContextServerStore>,
4084    cx: &mut TestAppContext,
4085) -> mpsc::UnboundedReceiver<(
4086    context_server::types::CallToolParams,
4087    oneshot::Sender<context_server::types::CallToolResponse>,
4088)> {
4089    cx.update(|cx| {
4090        let mut settings = ProjectSettings::get_global(cx).clone();
4091        settings.context_servers.insert(
4092            name.into(),
4093            project::project_settings::ContextServerSettings::Stdio {
4094                enabled: true,
4095                remote: false,
4096                command: ContextServerCommand {
4097                    path: "somebinary".into(),
4098                    args: Vec::new(),
4099                    env: None,
4100                    timeout: None,
4101                },
4102            },
4103        );
4104        ProjectSettings::override_global(settings, cx);
4105    });
4106
4107    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4108    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4109        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4110            context_server::types::InitializeResponse {
4111                protocol_version: context_server::types::ProtocolVersion(
4112                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4113                ),
4114                server_info: context_server::types::Implementation {
4115                    name: name.into(),
4116                    version: "1.0.0".to_string(),
4117                },
4118                capabilities: context_server::types::ServerCapabilities {
4119                    tools: Some(context_server::types::ToolsCapabilities {
4120                        list_changed: Some(true),
4121                    }),
4122                    ..Default::default()
4123                },
4124                meta: None,
4125            }
4126        })
4127        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4128            let tools = tools.clone();
4129            async move {
4130                context_server::types::ListToolsResponse {
4131                    tools,
4132                    next_cursor: None,
4133                    meta: None,
4134                }
4135            }
4136        })
4137        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4138            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4139            async move {
4140                let (response_tx, response_rx) = oneshot::channel();
4141                mcp_tool_calls_tx
4142                    .unbounded_send((params, response_tx))
4143                    .unwrap();
4144                response_rx.await.unwrap()
4145            }
4146        });
4147    context_server_store.update(cx, |store, cx| {
4148        store.start_server(
4149            Arc::new(ContextServer::new(
4150                ContextServerId(name.into()),
4151                Arc::new(fake_transport),
4152            )),
4153            cx,
4154        );
4155    });
4156    cx.run_until_parked();
4157    mcp_tool_calls_rx
4158}
4159
4160#[gpui::test]
4161async fn test_tokens_before_message(cx: &mut TestAppContext) {
4162    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4163    let fake_model = model.as_fake();
4164
4165    // First message
4166    let message_1_id = UserMessageId::new();
4167    thread
4168        .update(cx, |thread, cx| {
4169            thread.send(message_1_id.clone(), ["First message"], cx)
4170        })
4171        .unwrap();
4172    cx.run_until_parked();
4173
4174    // Before any response, tokens_before_message should return None for first message
4175    thread.read_with(cx, |thread, _| {
4176        assert_eq!(
4177            thread.tokens_before_message(&message_1_id),
4178            None,
4179            "First message should have no tokens before it"
4180        );
4181    });
4182
4183    // Complete first message with usage
4184    fake_model.send_last_completion_stream_text_chunk("Response 1");
4185    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4186        language_model::TokenUsage {
4187            input_tokens: 100,
4188            output_tokens: 50,
4189            cache_creation_input_tokens: 0,
4190            cache_read_input_tokens: 0,
4191        },
4192    ));
4193    fake_model.end_last_completion_stream();
4194    cx.run_until_parked();
4195
4196    // First message still has no tokens before it
4197    thread.read_with(cx, |thread, _| {
4198        assert_eq!(
4199            thread.tokens_before_message(&message_1_id),
4200            None,
4201            "First message should still have no tokens before it after response"
4202        );
4203    });
4204
4205    // Second message
4206    let message_2_id = UserMessageId::new();
4207    thread
4208        .update(cx, |thread, cx| {
4209            thread.send(message_2_id.clone(), ["Second message"], cx)
4210        })
4211        .unwrap();
4212    cx.run_until_parked();
4213
4214    // Second message should have first message's input tokens before it
4215    thread.read_with(cx, |thread, _| {
4216        assert_eq!(
4217            thread.tokens_before_message(&message_2_id),
4218            Some(100),
4219            "Second message should have 100 tokens before it (from first request)"
4220        );
4221    });
4222
4223    // Complete second message
4224    fake_model.send_last_completion_stream_text_chunk("Response 2");
4225    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4226        language_model::TokenUsage {
4227            input_tokens: 250, // Total for this request (includes previous context)
4228            output_tokens: 75,
4229            cache_creation_input_tokens: 0,
4230            cache_read_input_tokens: 0,
4231        },
4232    ));
4233    fake_model.end_last_completion_stream();
4234    cx.run_until_parked();
4235
4236    // Third message
4237    let message_3_id = UserMessageId::new();
4238    thread
4239        .update(cx, |thread, cx| {
4240            thread.send(message_3_id.clone(), ["Third message"], cx)
4241        })
4242        .unwrap();
4243    cx.run_until_parked();
4244
4245    // Third message should have second message's input tokens (250) before it
4246    thread.read_with(cx, |thread, _| {
4247        assert_eq!(
4248            thread.tokens_before_message(&message_3_id),
4249            Some(250),
4250            "Third message should have 250 tokens before it (from second request)"
4251        );
4252        // Second message should still have 100
4253        assert_eq!(
4254            thread.tokens_before_message(&message_2_id),
4255            Some(100),
4256            "Second message should still have 100 tokens before it"
4257        );
4258        // First message still has none
4259        assert_eq!(
4260            thread.tokens_before_message(&message_1_id),
4261            None,
4262            "First message should still have no tokens before it"
4263        );
4264    });
4265}
4266
4267#[gpui::test]
4268async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4269    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4270    let fake_model = model.as_fake();
4271
4272    // Set up three messages with responses
4273    let message_1_id = UserMessageId::new();
4274    thread
4275        .update(cx, |thread, cx| {
4276            thread.send(message_1_id.clone(), ["Message 1"], cx)
4277        })
4278        .unwrap();
4279    cx.run_until_parked();
4280    fake_model.send_last_completion_stream_text_chunk("Response 1");
4281    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4282        language_model::TokenUsage {
4283            input_tokens: 100,
4284            output_tokens: 50,
4285            cache_creation_input_tokens: 0,
4286            cache_read_input_tokens: 0,
4287        },
4288    ));
4289    fake_model.end_last_completion_stream();
4290    cx.run_until_parked();
4291
4292    let message_2_id = UserMessageId::new();
4293    thread
4294        .update(cx, |thread, cx| {
4295            thread.send(message_2_id.clone(), ["Message 2"], cx)
4296        })
4297        .unwrap();
4298    cx.run_until_parked();
4299    fake_model.send_last_completion_stream_text_chunk("Response 2");
4300    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4301        language_model::TokenUsage {
4302            input_tokens: 250,
4303            output_tokens: 75,
4304            cache_creation_input_tokens: 0,
4305            cache_read_input_tokens: 0,
4306        },
4307    ));
4308    fake_model.end_last_completion_stream();
4309    cx.run_until_parked();
4310
4311    // Verify initial state
4312    thread.read_with(cx, |thread, _| {
4313        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4314    });
4315
4316    // Truncate at message 2 (removes message 2 and everything after)
4317    thread
4318        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4319        .unwrap();
4320    cx.run_until_parked();
4321
4322    // After truncation, message_2_id no longer exists, so lookup should return None
4323    thread.read_with(cx, |thread, _| {
4324        assert_eq!(
4325            thread.tokens_before_message(&message_2_id),
4326            None,
4327            "After truncation, message 2 no longer exists"
4328        );
4329        // Message 1 still exists but has no tokens before it
4330        assert_eq!(
4331            thread.tokens_before_message(&message_1_id),
4332            None,
4333            "First message still has no tokens before it"
4334        );
4335    });
4336}
4337
4338#[gpui::test]
4339async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4340    init_test(cx);
4341
4342    let fs = FakeFs::new(cx.executor());
4343    fs.insert_tree("/root", json!({})).await;
4344    let project = Project::test(fs, ["/root".as_ref()], cx).await;
4345
4346    // Test 1: Deny rule blocks command
4347    {
4348        let environment = Rc::new(cx.update(|cx| {
4349            FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4350        }));
4351
4352        cx.update(|cx| {
4353            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4354            settings.tool_permissions.tools.insert(
4355                TerminalTool::NAME.into(),
4356                agent_settings::ToolRules {
4357                    default: Some(settings::ToolPermissionMode::Confirm),
4358                    always_allow: vec![],
4359                    always_deny: vec![
4360                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4361                    ],
4362                    always_confirm: vec![],
4363                    invalid_patterns: vec![],
4364                },
4365            );
4366            agent_settings::AgentSettings::override_global(settings, cx);
4367        });
4368
4369        #[allow(clippy::arc_with_non_send_sync)]
4370        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4371        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4372
4373        let task = cx.update(|cx| {
4374            tool.run(
4375                ToolInput::resolved(crate::TerminalToolInput {
4376                    command: "rm -rf /".to_string(),
4377                    cd: ".".to_string(),
4378                    timeout_ms: None,
4379                }),
4380                event_stream,
4381                cx,
4382            )
4383        });
4384
4385        let result = task.await;
4386        assert!(
4387            result.is_err(),
4388            "expected command to be blocked by deny rule"
4389        );
4390        let err_msg = result.unwrap_err().to_lowercase();
4391        assert!(
4392            err_msg.contains("blocked"),
4393            "error should mention the command was blocked"
4394        );
4395    }
4396
4397    // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4398    {
4399        let environment = Rc::new(cx.update(|cx| {
4400            FakeThreadEnvironment::default()
4401                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4402        }));
4403
4404        cx.update(|cx| {
4405            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4406            settings.tool_permissions.tools.insert(
4407                TerminalTool::NAME.into(),
4408                agent_settings::ToolRules {
4409                    default: Some(settings::ToolPermissionMode::Deny),
4410                    always_allow: vec![
4411                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4412                    ],
4413                    always_deny: vec![],
4414                    always_confirm: vec![],
4415                    invalid_patterns: vec![],
4416                },
4417            );
4418            agent_settings::AgentSettings::override_global(settings, cx);
4419        });
4420
4421        #[allow(clippy::arc_with_non_send_sync)]
4422        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4423        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4424
4425        let task = cx.update(|cx| {
4426            tool.run(
4427                ToolInput::resolved(crate::TerminalToolInput {
4428                    command: "echo hello".to_string(),
4429                    cd: ".".to_string(),
4430                    timeout_ms: None,
4431                }),
4432                event_stream,
4433                cx,
4434            )
4435        });
4436
4437        let update = rx.expect_update_fields().await;
4438        assert!(
4439            update.content.iter().any(|blocks| {
4440                blocks
4441                    .iter()
4442                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4443            }),
4444            "expected terminal content (allow rule should skip confirmation and override default deny)"
4445        );
4446
4447        let result = task.await;
4448        assert!(
4449            result.is_ok(),
4450            "expected command to succeed without confirmation"
4451        );
4452    }
4453
4454    // Test 3: global default: allow does NOT override always_confirm patterns
4455    {
4456        let environment = Rc::new(cx.update(|cx| {
4457            FakeThreadEnvironment::default()
4458                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4459        }));
4460
4461        cx.update(|cx| {
4462            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4463            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4464            settings.tool_permissions.tools.insert(
4465                TerminalTool::NAME.into(),
4466                agent_settings::ToolRules {
4467                    default: Some(settings::ToolPermissionMode::Allow),
4468                    always_allow: vec![],
4469                    always_deny: vec![],
4470                    always_confirm: vec![
4471                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4472                    ],
4473                    invalid_patterns: vec![],
4474                },
4475            );
4476            agent_settings::AgentSettings::override_global(settings, cx);
4477        });
4478
4479        #[allow(clippy::arc_with_non_send_sync)]
4480        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4481        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4482
4483        let _task = cx.update(|cx| {
4484            tool.run(
4485                ToolInput::resolved(crate::TerminalToolInput {
4486                    command: "sudo rm file".to_string(),
4487                    cd: ".".to_string(),
4488                    timeout_ms: None,
4489                }),
4490                event_stream,
4491                cx,
4492            )
4493        });
4494
4495        // With global default: allow, confirm patterns are still respected
4496        // The expect_authorization() call will panic if no authorization is requested,
4497        // which validates that the confirm pattern still triggers confirmation
4498        let _auth = rx.expect_authorization().await;
4499
4500        drop(_task);
4501    }
4502
4503    // Test 4: tool-specific default: deny is respected even with global default: allow
4504    {
4505        let environment = Rc::new(cx.update(|cx| {
4506            FakeThreadEnvironment::default()
4507                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4508        }));
4509
4510        cx.update(|cx| {
4511            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4512            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4513            settings.tool_permissions.tools.insert(
4514                TerminalTool::NAME.into(),
4515                agent_settings::ToolRules {
4516                    default: Some(settings::ToolPermissionMode::Deny),
4517                    always_allow: vec![],
4518                    always_deny: vec![],
4519                    always_confirm: vec![],
4520                    invalid_patterns: vec![],
4521                },
4522            );
4523            agent_settings::AgentSettings::override_global(settings, cx);
4524        });
4525
4526        #[allow(clippy::arc_with_non_send_sync)]
4527        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4528        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4529
4530        let task = cx.update(|cx| {
4531            tool.run(
4532                ToolInput::resolved(crate::TerminalToolInput {
4533                    command: "echo hello".to_string(),
4534                    cd: ".".to_string(),
4535                    timeout_ms: None,
4536                }),
4537                event_stream,
4538                cx,
4539            )
4540        });
4541
4542        // tool-specific default: deny is respected even with global default: allow
4543        let result = task.await;
4544        assert!(
4545            result.is_err(),
4546            "expected command to be blocked by tool-specific deny default"
4547        );
4548        let err_msg = result.unwrap_err().to_lowercase();
4549        assert!(
4550            err_msg.contains("disabled"),
4551            "error should mention the tool is disabled, got: {err_msg}"
4552        );
4553    }
4554}
4555
4556#[gpui::test]
4557async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4558    init_test(cx);
4559    cx.update(|cx| {
4560        LanguageModelRegistry::test(cx);
4561    });
4562    cx.update(|cx| {
4563        cx.update_flags(true, vec!["subagents".to_string()]);
4564    });
4565
4566    let fs = FakeFs::new(cx.executor());
4567    fs.insert_tree(
4568        "/",
4569        json!({
4570            "a": {
4571                "b.md": "Lorem"
4572            }
4573        }),
4574    )
4575    .await;
4576    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4577    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4578    let agent = cx.update(|cx| {
4579        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4580    });
4581    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4582
4583    let acp_thread = cx
4584        .update(|cx| {
4585            connection
4586                .clone()
4587                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4588        })
4589        .await
4590        .unwrap();
4591    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4592    let thread = agent.read_with(cx, |agent, _| {
4593        agent.sessions.get(&session_id).unwrap().thread.clone()
4594    });
4595    let model = Arc::new(FakeLanguageModel::default());
4596
4597    // Ensure empty threads are not saved, even if they get mutated.
4598    thread.update(cx, |thread, cx| {
4599        thread.set_model(model.clone(), cx);
4600    });
4601    cx.run_until_parked();
4602
4603    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4604    cx.run_until_parked();
4605    model.send_last_completion_stream_text_chunk("spawning subagent");
4606    let subagent_tool_input = SpawnAgentToolInput {
4607        label: "label".to_string(),
4608        message: "subagent task prompt".to_string(),
4609        session_id: None,
4610    };
4611    let subagent_tool_use = LanguageModelToolUse {
4612        id: "subagent_1".into(),
4613        name: SpawnAgentTool::NAME.into(),
4614        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4615        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4616        is_input_complete: true,
4617        thought_signature: None,
4618    };
4619    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4620        subagent_tool_use,
4621    ));
4622    model.end_last_completion_stream();
4623
4624    cx.run_until_parked();
4625
4626    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4627        thread
4628            .running_subagent_ids(cx)
4629            .get(0)
4630            .expect("subagent thread should be running")
4631            .clone()
4632    });
4633
4634    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4635        agent
4636            .sessions
4637            .get(&subagent_session_id)
4638            .expect("subagent session should exist")
4639            .acp_thread
4640            .clone()
4641    });
4642
4643    model.send_last_completion_stream_text_chunk("subagent task response");
4644    model.end_last_completion_stream();
4645
4646    cx.run_until_parked();
4647
4648    assert_eq!(
4649        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4650        indoc! {"
4651            ## User
4652
4653            subagent task prompt
4654
4655            ## Assistant
4656
4657            subagent task response
4658
4659        "}
4660    );
4661
4662    model.send_last_completion_stream_text_chunk("Response");
4663    model.end_last_completion_stream();
4664
4665    send.await.unwrap();
4666
4667    assert_eq!(
4668        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4669        indoc! {r#"
4670            ## User
4671
4672            Prompt
4673
4674            ## Assistant
4675
4676            spawning subagent
4677
4678            **Tool Call: label**
4679            Status: Completed
4680
4681            subagent task response
4682
4683            ## Assistant
4684
4685            Response
4686
4687        "#},
4688    );
4689}
4690
4691#[gpui::test]
4692async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4693    init_test(cx);
4694    cx.update(|cx| {
4695        LanguageModelRegistry::test(cx);
4696    });
4697    cx.update(|cx| {
4698        cx.update_flags(true, vec!["subagents".to_string()]);
4699    });
4700
4701    let fs = FakeFs::new(cx.executor());
4702    fs.insert_tree(
4703        "/",
4704        json!({
4705            "a": {
4706                "b.md": "Lorem"
4707            }
4708        }),
4709    )
4710    .await;
4711    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4712    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4713    let agent = cx.update(|cx| {
4714        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4715    });
4716    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4717
4718    let acp_thread = cx
4719        .update(|cx| {
4720            connection
4721                .clone()
4722                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4723        })
4724        .await
4725        .unwrap();
4726    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4727    let thread = agent.read_with(cx, |agent, _| {
4728        agent.sessions.get(&session_id).unwrap().thread.clone()
4729    });
4730    let model = Arc::new(FakeLanguageModel::default());
4731
4732    // Ensure empty threads are not saved, even if they get mutated.
4733    thread.update(cx, |thread, cx| {
4734        thread.set_model(model.clone(), cx);
4735    });
4736    cx.run_until_parked();
4737
4738    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4739    cx.run_until_parked();
4740    model.send_last_completion_stream_text_chunk("spawning subagent");
4741    let subagent_tool_input = SpawnAgentToolInput {
4742        label: "label".to_string(),
4743        message: "subagent task prompt".to_string(),
4744        session_id: None,
4745    };
4746    let subagent_tool_use = LanguageModelToolUse {
4747        id: "subagent_1".into(),
4748        name: SpawnAgentTool::NAME.into(),
4749        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4750        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4751        is_input_complete: true,
4752        thought_signature: None,
4753    };
4754    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4755        subagent_tool_use,
4756    ));
4757    model.end_last_completion_stream();
4758
4759    cx.run_until_parked();
4760
4761    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4762        thread
4763            .running_subagent_ids(cx)
4764            .get(0)
4765            .expect("subagent thread should be running")
4766            .clone()
4767    });
4768
4769    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4770        agent
4771            .sessions
4772            .get(&subagent_session_id)
4773            .expect("subagent session should exist")
4774            .acp_thread
4775            .clone()
4776    });
4777
4778    model.send_last_completion_stream_text_chunk("subagent task response 1");
4779    model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4780        text: "thinking more about the subagent task".into(),
4781        signature: None,
4782    });
4783    model.send_last_completion_stream_text_chunk("subagent task response 2");
4784    model.end_last_completion_stream();
4785
4786    cx.run_until_parked();
4787
4788    assert_eq!(
4789        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4790        indoc! {"
4791            ## User
4792
4793            subagent task prompt
4794
4795            ## Assistant
4796
4797            subagent task response 1
4798
4799            <thinking>
4800            thinking more about the subagent task
4801            </thinking>
4802
4803            subagent task response 2
4804
4805        "}
4806    );
4807
4808    model.send_last_completion_stream_text_chunk("Response");
4809    model.end_last_completion_stream();
4810
4811    send.await.unwrap();
4812
4813    assert_eq!(
4814        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4815        indoc! {r#"
4816            ## User
4817
4818            Prompt
4819
4820            ## Assistant
4821
4822            spawning subagent
4823
4824            **Tool Call: label**
4825            Status: Completed
4826
4827            subagent task response 1
4828
4829            subagent task response 2
4830
4831            ## Assistant
4832
4833            Response
4834
4835        "#},
4836    );
4837}
4838
4839#[gpui::test]
4840async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4841    init_test(cx);
4842    cx.update(|cx| {
4843        LanguageModelRegistry::test(cx);
4844    });
4845    cx.update(|cx| {
4846        cx.update_flags(true, vec!["subagents".to_string()]);
4847    });
4848
4849    let fs = FakeFs::new(cx.executor());
4850    fs.insert_tree(
4851        "/",
4852        json!({
4853            "a": {
4854                "b.md": "Lorem"
4855            }
4856        }),
4857    )
4858    .await;
4859    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4860    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4861    let agent = cx.update(|cx| {
4862        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4863    });
4864    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4865
4866    let acp_thread = cx
4867        .update(|cx| {
4868            connection
4869                .clone()
4870                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4871        })
4872        .await
4873        .unwrap();
4874    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4875    let thread = agent.read_with(cx, |agent, _| {
4876        agent.sessions.get(&session_id).unwrap().thread.clone()
4877    });
4878    let model = Arc::new(FakeLanguageModel::default());
4879
4880    // Ensure empty threads are not saved, even if they get mutated.
4881    thread.update(cx, |thread, cx| {
4882        thread.set_model(model.clone(), cx);
4883    });
4884    cx.run_until_parked();
4885
4886    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4887    cx.run_until_parked();
4888    model.send_last_completion_stream_text_chunk("spawning subagent");
4889    let subagent_tool_input = SpawnAgentToolInput {
4890        label: "label".to_string(),
4891        message: "subagent task prompt".to_string(),
4892        session_id: None,
4893    };
4894    let subagent_tool_use = LanguageModelToolUse {
4895        id: "subagent_1".into(),
4896        name: SpawnAgentTool::NAME.into(),
4897        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4898        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4899        is_input_complete: true,
4900        thought_signature: None,
4901    };
4902    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4903        subagent_tool_use,
4904    ));
4905    model.end_last_completion_stream();
4906
4907    cx.run_until_parked();
4908
4909    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4910        thread
4911            .running_subagent_ids(cx)
4912            .get(0)
4913            .expect("subagent thread should be running")
4914            .clone()
4915    });
4916    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4917        agent
4918            .sessions
4919            .get(&subagent_session_id)
4920            .expect("subagent session should exist")
4921            .acp_thread
4922            .clone()
4923    });
4924
4925    // model.send_last_completion_stream_text_chunk("subagent task response");
4926    // model.end_last_completion_stream();
4927
4928    // cx.run_until_parked();
4929
4930    acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4931
4932    cx.run_until_parked();
4933
4934    send.await.unwrap();
4935
4936    acp_thread.read_with(cx, |thread, cx| {
4937        assert_eq!(thread.status(), ThreadStatus::Idle);
4938        assert_eq!(
4939            thread.to_markdown(cx),
4940            indoc! {"
4941                ## User
4942
4943                Prompt
4944
4945                ## Assistant
4946
4947                spawning subagent
4948
4949                **Tool Call: label**
4950                Status: Canceled
4951
4952            "}
4953        );
4954    });
4955    subagent_acp_thread.read_with(cx, |thread, cx| {
4956        assert_eq!(thread.status(), ThreadStatus::Idle);
4957        assert_eq!(
4958            thread.to_markdown(cx),
4959            indoc! {"
4960                ## User
4961
4962                subagent task prompt
4963
4964            "}
4965        );
4966    });
4967}
4968
4969#[gpui::test]
4970async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4971    init_test(cx);
4972    cx.update(|cx| {
4973        LanguageModelRegistry::test(cx);
4974    });
4975    cx.update(|cx| {
4976        cx.update_flags(true, vec!["subagents".to_string()]);
4977    });
4978
4979    let fs = FakeFs::new(cx.executor());
4980    fs.insert_tree(
4981        "/",
4982        json!({
4983            "a": {
4984                "b.md": "Lorem"
4985            }
4986        }),
4987    )
4988    .await;
4989    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4990    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4991    let agent = cx.update(|cx| {
4992        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4993    });
4994    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4995
4996    let acp_thread = cx
4997        .update(|cx| {
4998            connection
4999                .clone()
5000                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5001        })
5002        .await
5003        .unwrap();
5004    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5005    let thread = agent.read_with(cx, |agent, _| {
5006        agent.sessions.get(&session_id).unwrap().thread.clone()
5007    });
5008    let model = Arc::new(FakeLanguageModel::default());
5009
5010    thread.update(cx, |thread, cx| {
5011        thread.set_model(model.clone(), cx);
5012    });
5013    cx.run_until_parked();
5014
5015    // === First turn: create subagent ===
5016    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5017    cx.run_until_parked();
5018    model.send_last_completion_stream_text_chunk("spawning subagent");
5019    let subagent_tool_input = SpawnAgentToolInput {
5020        label: "initial task".to_string(),
5021        message: "do the first task".to_string(),
5022        session_id: None,
5023    };
5024    let subagent_tool_use = LanguageModelToolUse {
5025        id: "subagent_1".into(),
5026        name: SpawnAgentTool::NAME.into(),
5027        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5028        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5029        is_input_complete: true,
5030        thought_signature: None,
5031    };
5032    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5033        subagent_tool_use,
5034    ));
5035    model.end_last_completion_stream();
5036
5037    cx.run_until_parked();
5038
5039    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5040        thread
5041            .running_subagent_ids(cx)
5042            .get(0)
5043            .expect("subagent thread should be running")
5044            .clone()
5045    });
5046
5047    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5048        agent
5049            .sessions
5050            .get(&subagent_session_id)
5051            .expect("subagent session should exist")
5052            .acp_thread
5053            .clone()
5054    });
5055
5056    // Subagent responds
5057    model.send_last_completion_stream_text_chunk("first task response");
5058    model.end_last_completion_stream();
5059
5060    cx.run_until_parked();
5061
5062    // Parent model responds to complete first turn
5063    model.send_last_completion_stream_text_chunk("First response");
5064    model.end_last_completion_stream();
5065
5066    send.await.unwrap();
5067
5068    // Verify subagent is no longer running
5069    thread.read_with(cx, |thread, cx| {
5070        assert!(
5071            thread.running_subagent_ids(cx).is_empty(),
5072            "subagent should not be running after completion"
5073        );
5074    });
5075
5076    // === Second turn: resume subagent with session_id ===
5077    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5078    cx.run_until_parked();
5079    model.send_last_completion_stream_text_chunk("resuming subagent");
5080    let resume_tool_input = SpawnAgentToolInput {
5081        label: "follow-up task".to_string(),
5082        message: "do the follow-up task".to_string(),
5083        session_id: Some(subagent_session_id.clone()),
5084    };
5085    let resume_tool_use = LanguageModelToolUse {
5086        id: "subagent_2".into(),
5087        name: SpawnAgentTool::NAME.into(),
5088        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5089        input: serde_json::to_value(&resume_tool_input).unwrap(),
5090        is_input_complete: true,
5091        thought_signature: None,
5092    };
5093    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5094    model.end_last_completion_stream();
5095
5096    cx.run_until_parked();
5097
5098    // Subagent should be running again with the same session
5099    thread.read_with(cx, |thread, cx| {
5100        let running = thread.running_subagent_ids(cx);
5101        assert_eq!(running.len(), 1, "subagent should be running");
5102        assert_eq!(running[0], subagent_session_id, "should be same session");
5103    });
5104
5105    // Subagent responds to follow-up
5106    model.send_last_completion_stream_text_chunk("follow-up task response");
5107    model.end_last_completion_stream();
5108
5109    cx.run_until_parked();
5110
5111    // Parent model responds to complete second turn
5112    model.send_last_completion_stream_text_chunk("Second response");
5113    model.end_last_completion_stream();
5114
5115    send2.await.unwrap();
5116
5117    // Verify subagent is no longer running
5118    thread.read_with(cx, |thread, cx| {
5119        assert!(
5120            thread.running_subagent_ids(cx).is_empty(),
5121            "subagent should not be running after resume completion"
5122        );
5123    });
5124
5125    // Verify the subagent's acp thread has both conversation turns
5126    assert_eq!(
5127        subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5128        indoc! {"
5129            ## User
5130
5131            do the first task
5132
5133            ## Assistant
5134
5135            first task response
5136
5137            ## User
5138
5139            do the follow-up task
5140
5141            ## Assistant
5142
5143            follow-up task response
5144
5145        "}
5146    );
5147}
5148
5149#[gpui::test]
5150async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5151    init_test(cx);
5152
5153    cx.update(|cx| {
5154        cx.update_flags(true, vec!["subagents".to_string()]);
5155    });
5156
5157    let fs = FakeFs::new(cx.executor());
5158    fs.insert_tree(path!("/test"), json!({})).await;
5159    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5160    let project_context = cx.new(|_cx| ProjectContext::default());
5161    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5162    let context_server_registry =
5163        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5164    let model = Arc::new(FakeLanguageModel::default());
5165
5166    let parent_thread = cx.new(|cx| {
5167        Thread::new(
5168            project.clone(),
5169            project_context,
5170            context_server_registry,
5171            Templates::new(),
5172            Some(model.clone()),
5173            cx,
5174        )
5175    });
5176
5177    let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5178    subagent_thread.read_with(cx, |subagent_thread, cx| {
5179        assert!(subagent_thread.is_subagent());
5180        assert_eq!(subagent_thread.depth(), 1);
5181        assert_eq!(
5182            subagent_thread.model().map(|model| model.id()),
5183            Some(model.id())
5184        );
5185        assert_eq!(
5186            subagent_thread.parent_thread_id(),
5187            Some(parent_thread.read(cx).id().clone())
5188        );
5189    });
5190}
5191
5192#[gpui::test]
5193async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5194    init_test(cx);
5195
5196    cx.update(|cx| {
5197        cx.update_flags(true, vec!["subagents".to_string()]);
5198    });
5199
5200    let fs = FakeFs::new(cx.executor());
5201    fs.insert_tree(path!("/test"), json!({})).await;
5202    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5203    let project_context = cx.new(|_cx| ProjectContext::default());
5204    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5205    let context_server_registry =
5206        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5207    let model = Arc::new(FakeLanguageModel::default());
5208    let environment = Rc::new(cx.update(|cx| {
5209        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5210    }));
5211
5212    let deep_parent_thread = cx.new(|cx| {
5213        let mut thread = Thread::new(
5214            project.clone(),
5215            project_context,
5216            context_server_registry,
5217            Templates::new(),
5218            Some(model.clone()),
5219            cx,
5220        );
5221        thread.set_subagent_context(SubagentContext {
5222            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5223            depth: MAX_SUBAGENT_DEPTH - 1,
5224        });
5225        thread
5226    });
5227    let deep_subagent_thread = cx.new(|cx| {
5228        let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5229        thread.add_default_tools(environment, cx);
5230        thread
5231    });
5232
5233    deep_subagent_thread.read_with(cx, |thread, _| {
5234        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5235        assert!(
5236            !thread.has_registered_tool(SpawnAgentTool::NAME),
5237            "subagent tool should not be present at max depth"
5238        );
5239    });
5240}
5241
5242#[gpui::test]
5243async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5244    init_test(cx);
5245
5246    cx.update(|cx| {
5247        cx.update_flags(true, vec!["subagents".to_string()]);
5248    });
5249
5250    let fs = FakeFs::new(cx.executor());
5251    fs.insert_tree(path!("/test"), json!({})).await;
5252    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5253    let project_context = cx.new(|_cx| ProjectContext::default());
5254    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5255    let context_server_registry =
5256        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5257    let model = Arc::new(FakeLanguageModel::default());
5258
5259    let parent = cx.new(|cx| {
5260        Thread::new(
5261            project.clone(),
5262            project_context.clone(),
5263            context_server_registry.clone(),
5264            Templates::new(),
5265            Some(model.clone()),
5266            cx,
5267        )
5268    });
5269
5270    let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5271
5272    parent.update(cx, |thread, _cx| {
5273        thread.register_running_subagent(subagent.downgrade());
5274    });
5275
5276    subagent
5277        .update(cx, |thread, cx| {
5278            thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5279        })
5280        .unwrap();
5281    cx.run_until_parked();
5282
5283    subagent.read_with(cx, |thread, _| {
5284        assert!(!thread.is_turn_complete(), "subagent should be running");
5285    });
5286
5287    parent.update(cx, |thread, cx| {
5288        thread.cancel(cx).detach();
5289    });
5290
5291    subagent.read_with(cx, |thread, _| {
5292        assert!(
5293            thread.is_turn_complete(),
5294            "subagent should be cancelled when parent cancels"
5295        );
5296    });
5297}
5298
5299#[gpui::test]
5300async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5301    init_test(cx);
5302    cx.update(|cx| {
5303        LanguageModelRegistry::test(cx);
5304    });
5305    cx.update(|cx| {
5306        cx.update_flags(true, vec!["subagents".to_string()]);
5307    });
5308
5309    let fs = FakeFs::new(cx.executor());
5310    fs.insert_tree(
5311        "/",
5312        json!({
5313            "a": {
5314                "b.md": "Lorem"
5315            }
5316        }),
5317    )
5318    .await;
5319    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5320    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5321    let agent = cx.update(|cx| {
5322        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5323    });
5324    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5325
5326    let acp_thread = cx
5327        .update(|cx| {
5328            connection
5329                .clone()
5330                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5331        })
5332        .await
5333        .unwrap();
5334    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5335    let thread = agent.read_with(cx, |agent, _| {
5336        agent.sessions.get(&session_id).unwrap().thread.clone()
5337    });
5338    let model = Arc::new(FakeLanguageModel::default());
5339
5340    thread.update(cx, |thread, cx| {
5341        thread.set_model(model.clone(), cx);
5342    });
5343    cx.run_until_parked();
5344
5345    // Start the parent turn
5346    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5347    cx.run_until_parked();
5348    model.send_last_completion_stream_text_chunk("spawning subagent");
5349    let subagent_tool_input = SpawnAgentToolInput {
5350        label: "label".to_string(),
5351        message: "subagent task prompt".to_string(),
5352        session_id: None,
5353    };
5354    let subagent_tool_use = LanguageModelToolUse {
5355        id: "subagent_1".into(),
5356        name: SpawnAgentTool::NAME.into(),
5357        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5358        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5359        is_input_complete: true,
5360        thought_signature: None,
5361    };
5362    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5363        subagent_tool_use,
5364    ));
5365    model.end_last_completion_stream();
5366
5367    cx.run_until_parked();
5368
5369    // Verify subagent is running
5370    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5371        thread
5372            .running_subagent_ids(cx)
5373            .get(0)
5374            .expect("subagent thread should be running")
5375            .clone()
5376    });
5377
5378    // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5379    model.send_last_completion_stream_text_chunk("partial work");
5380    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5381        TokenUsage {
5382            input_tokens: 850_000,
5383            output_tokens: 0,
5384            cache_creation_input_tokens: 0,
5385            cache_read_input_tokens: 0,
5386        },
5387    ));
5388
5389    cx.run_until_parked();
5390
5391    // The subagent should no longer be running
5392    thread.read_with(cx, |thread, cx| {
5393        assert!(
5394            thread.running_subagent_ids(cx).is_empty(),
5395            "subagent should be stopped after context window warning"
5396        );
5397    });
5398
5399    // The parent model should get a new completion request to respond to the tool error
5400    model.send_last_completion_stream_text_chunk("Response after warning");
5401    model.end_last_completion_stream();
5402
5403    send.await.unwrap();
5404
5405    // Verify the parent thread shows the warning error in the tool call
5406    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5407    assert!(
5408        markdown.contains("nearing the end of its context window"),
5409        "tool output should contain context window warning message, got:\n{markdown}"
5410    );
5411    assert!(
5412        markdown.contains("Status: Failed"),
5413        "tool call should have Failed status, got:\n{markdown}"
5414    );
5415
5416    // Verify the subagent session still exists (can be resumed)
5417    agent.read_with(cx, |agent, _cx| {
5418        assert!(
5419            agent.sessions.contains_key(&subagent_session_id),
5420            "subagent session should still exist for potential resume"
5421        );
5422    });
5423}
5424
5425#[gpui::test]
5426async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5427    init_test(cx);
5428    cx.update(|cx| {
5429        LanguageModelRegistry::test(cx);
5430    });
5431    cx.update(|cx| {
5432        cx.update_flags(true, vec!["subagents".to_string()]);
5433    });
5434
5435    let fs = FakeFs::new(cx.executor());
5436    fs.insert_tree(
5437        "/",
5438        json!({
5439            "a": {
5440                "b.md": "Lorem"
5441            }
5442        }),
5443    )
5444    .await;
5445    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5446    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5447    let agent = cx.update(|cx| {
5448        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5449    });
5450    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5451
5452    let acp_thread = cx
5453        .update(|cx| {
5454            connection
5455                .clone()
5456                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5457        })
5458        .await
5459        .unwrap();
5460    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5461    let thread = agent.read_with(cx, |agent, _| {
5462        agent.sessions.get(&session_id).unwrap().thread.clone()
5463    });
5464    let model = Arc::new(FakeLanguageModel::default());
5465
5466    thread.update(cx, |thread, cx| {
5467        thread.set_model(model.clone(), cx);
5468    });
5469    cx.run_until_parked();
5470
5471    // === First turn: create subagent, trigger context window warning ===
5472    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5473    cx.run_until_parked();
5474    model.send_last_completion_stream_text_chunk("spawning subagent");
5475    let subagent_tool_input = SpawnAgentToolInput {
5476        label: "initial task".to_string(),
5477        message: "do the first task".to_string(),
5478        session_id: None,
5479    };
5480    let subagent_tool_use = LanguageModelToolUse {
5481        id: "subagent_1".into(),
5482        name: SpawnAgentTool::NAME.into(),
5483        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5484        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5485        is_input_complete: true,
5486        thought_signature: None,
5487    };
5488    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5489        subagent_tool_use,
5490    ));
5491    model.end_last_completion_stream();
5492
5493    cx.run_until_parked();
5494
5495    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5496        thread
5497            .running_subagent_ids(cx)
5498            .get(0)
5499            .expect("subagent thread should be running")
5500            .clone()
5501    });
5502
5503    // Subagent sends a usage update that crosses the warning threshold.
5504    // This triggers Normal→Warning, stopping the subagent.
5505    model.send_last_completion_stream_text_chunk("partial work");
5506    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5507        TokenUsage {
5508            input_tokens: 850_000,
5509            output_tokens: 0,
5510            cache_creation_input_tokens: 0,
5511            cache_read_input_tokens: 0,
5512        },
5513    ));
5514
5515    cx.run_until_parked();
5516
5517    // Verify the first turn was stopped with a context window warning
5518    thread.read_with(cx, |thread, cx| {
5519        assert!(
5520            thread.running_subagent_ids(cx).is_empty(),
5521            "subagent should be stopped after context window warning"
5522        );
5523    });
5524
5525    // Parent model responds to complete first turn
5526    model.send_last_completion_stream_text_chunk("First response");
5527    model.end_last_completion_stream();
5528
5529    send.await.unwrap();
5530
5531    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5532    assert!(
5533        markdown.contains("nearing the end of its context window"),
5534        "first turn should have context window warning, got:\n{markdown}"
5535    );
5536
5537    // === Second turn: resume the same subagent (now at Warning level) ===
5538    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5539    cx.run_until_parked();
5540    model.send_last_completion_stream_text_chunk("resuming subagent");
5541    let resume_tool_input = SpawnAgentToolInput {
5542        label: "follow-up task".to_string(),
5543        message: "do the follow-up task".to_string(),
5544        session_id: Some(subagent_session_id.clone()),
5545    };
5546    let resume_tool_use = LanguageModelToolUse {
5547        id: "subagent_2".into(),
5548        name: SpawnAgentTool::NAME.into(),
5549        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5550        input: serde_json::to_value(&resume_tool_input).unwrap(),
5551        is_input_complete: true,
5552        thought_signature: None,
5553    };
5554    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5555    model.end_last_completion_stream();
5556
5557    cx.run_until_parked();
5558
5559    // Subagent responds with tokens still at warning level (no worse).
5560    // Since ratio_before_prompt was already Warning, this should NOT
5561    // trigger the context window warning again.
5562    model.send_last_completion_stream_text_chunk("follow-up task response");
5563    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5564        TokenUsage {
5565            input_tokens: 870_000,
5566            output_tokens: 0,
5567            cache_creation_input_tokens: 0,
5568            cache_read_input_tokens: 0,
5569        },
5570    ));
5571    model.end_last_completion_stream();
5572
5573    cx.run_until_parked();
5574
5575    // Parent model responds to complete second turn
5576    model.send_last_completion_stream_text_chunk("Second response");
5577    model.end_last_completion_stream();
5578
5579    send2.await.unwrap();
5580
5581    // The resumed subagent should have completed normally since the ratio
5582    // didn't transition (it was Warning before and stayed at Warning)
5583    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5584    assert!(
5585        markdown.contains("follow-up task response"),
5586        "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5587    );
5588    // The second tool call should NOT have a context window warning
5589    let second_tool_pos = markdown
5590        .find("follow-up task")
5591        .expect("should find follow-up tool call");
5592    let after_second_tool = &markdown[second_tool_pos..];
5593    assert!(
5594        !after_second_tool.contains("nearing the end of its context window"),
5595        "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5596    );
5597}
5598
5599#[gpui::test]
5600async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5601    init_test(cx);
5602    cx.update(|cx| {
5603        LanguageModelRegistry::test(cx);
5604    });
5605    cx.update(|cx| {
5606        cx.update_flags(true, vec!["subagents".to_string()]);
5607    });
5608
5609    let fs = FakeFs::new(cx.executor());
5610    fs.insert_tree(
5611        "/",
5612        json!({
5613            "a": {
5614                "b.md": "Lorem"
5615            }
5616        }),
5617    )
5618    .await;
5619    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5620    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5621    let agent = cx.update(|cx| {
5622        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5623    });
5624    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5625
5626    let acp_thread = cx
5627        .update(|cx| {
5628            connection
5629                .clone()
5630                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5631        })
5632        .await
5633        .unwrap();
5634    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5635    let thread = agent.read_with(cx, |agent, _| {
5636        agent.sessions.get(&session_id).unwrap().thread.clone()
5637    });
5638    let model = Arc::new(FakeLanguageModel::default());
5639
5640    thread.update(cx, |thread, cx| {
5641        thread.set_model(model.clone(), cx);
5642    });
5643    cx.run_until_parked();
5644
5645    // Start the parent turn
5646    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5647    cx.run_until_parked();
5648    model.send_last_completion_stream_text_chunk("spawning subagent");
5649    let subagent_tool_input = SpawnAgentToolInput {
5650        label: "label".to_string(),
5651        message: "subagent task prompt".to_string(),
5652        session_id: None,
5653    };
5654    let subagent_tool_use = LanguageModelToolUse {
5655        id: "subagent_1".into(),
5656        name: SpawnAgentTool::NAME.into(),
5657        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5658        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5659        is_input_complete: true,
5660        thought_signature: None,
5661    };
5662    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5663        subagent_tool_use,
5664    ));
5665    model.end_last_completion_stream();
5666
5667    cx.run_until_parked();
5668
5669    // Verify subagent is running
5670    thread.read_with(cx, |thread, cx| {
5671        assert!(
5672            !thread.running_subagent_ids(cx).is_empty(),
5673            "subagent should be running"
5674        );
5675    });
5676
5677    // The subagent's model returns a non-retryable error
5678    model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5679        tokens: None,
5680    });
5681
5682    cx.run_until_parked();
5683
5684    // The subagent should no longer be running
5685    thread.read_with(cx, |thread, cx| {
5686        assert!(
5687            thread.running_subagent_ids(cx).is_empty(),
5688            "subagent should not be running after error"
5689        );
5690    });
5691
5692    // The parent model should get a new completion request to respond to the tool error
5693    model.send_last_completion_stream_text_chunk("Response after error");
5694    model.end_last_completion_stream();
5695
5696    send.await.unwrap();
5697
5698    // Verify the parent thread shows the error in the tool call
5699    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5700    assert!(
5701        markdown.contains("Status: Failed"),
5702        "tool call should have Failed status after model error, got:\n{markdown}"
5703    );
5704}
5705
5706#[gpui::test]
5707async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5708    init_test(cx);
5709
5710    let fs = FakeFs::new(cx.executor());
5711    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5712        .await;
5713    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5714
5715    cx.update(|cx| {
5716        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5717        settings.tool_permissions.tools.insert(
5718            EditFileTool::NAME.into(),
5719            agent_settings::ToolRules {
5720                default: Some(settings::ToolPermissionMode::Allow),
5721                always_allow: vec![],
5722                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5723                always_confirm: vec![],
5724                invalid_patterns: vec![],
5725            },
5726        );
5727        agent_settings::AgentSettings::override_global(settings, cx);
5728    });
5729
5730    let context_server_registry =
5731        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5732    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5733    let templates = crate::Templates::new();
5734    let thread = cx.new(|cx| {
5735        crate::Thread::new(
5736            project.clone(),
5737            cx.new(|_cx| prompt_store::ProjectContext::default()),
5738            context_server_registry,
5739            templates.clone(),
5740            None,
5741            cx,
5742        )
5743    });
5744
5745    #[allow(clippy::arc_with_non_send_sync)]
5746    let tool = Arc::new(crate::EditFileTool::new(
5747        project.clone(),
5748        thread.downgrade(),
5749        language_registry,
5750        templates,
5751    ));
5752    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5753
5754    let task = cx.update(|cx| {
5755        tool.run(
5756            ToolInput::resolved(crate::EditFileToolInput {
5757                display_description: "Edit sensitive file".to_string(),
5758                path: "root/sensitive_config.txt".into(),
5759                mode: crate::EditFileMode::Edit,
5760            }),
5761            event_stream,
5762            cx,
5763        )
5764    });
5765
5766    let result = task.await;
5767    assert!(result.is_err(), "expected edit to be blocked");
5768    assert!(
5769        result.unwrap_err().to_string().contains("blocked"),
5770        "error should mention the edit was blocked"
5771    );
5772}
5773
5774#[gpui::test]
5775async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5776    init_test(cx);
5777
5778    let fs = FakeFs::new(cx.executor());
5779    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5780        .await;
5781    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5782
5783    cx.update(|cx| {
5784        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5785        settings.tool_permissions.tools.insert(
5786            DeletePathTool::NAME.into(),
5787            agent_settings::ToolRules {
5788                default: Some(settings::ToolPermissionMode::Allow),
5789                always_allow: vec![],
5790                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5791                always_confirm: vec![],
5792                invalid_patterns: vec![],
5793            },
5794        );
5795        agent_settings::AgentSettings::override_global(settings, cx);
5796    });
5797
5798    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5799
5800    #[allow(clippy::arc_with_non_send_sync)]
5801    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5802    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5803
5804    let task = cx.update(|cx| {
5805        tool.run(
5806            ToolInput::resolved(crate::DeletePathToolInput {
5807                path: "root/important_data.txt".to_string(),
5808            }),
5809            event_stream,
5810            cx,
5811        )
5812    });
5813
5814    let result = task.await;
5815    assert!(result.is_err(), "expected deletion to be blocked");
5816    assert!(
5817        result.unwrap_err().contains("blocked"),
5818        "error should mention the deletion was blocked"
5819    );
5820}
5821
5822#[gpui::test]
5823async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5824    init_test(cx);
5825
5826    let fs = FakeFs::new(cx.executor());
5827    fs.insert_tree(
5828        "/root",
5829        json!({
5830            "safe.txt": "content",
5831            "protected": {}
5832        }),
5833    )
5834    .await;
5835    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5836
5837    cx.update(|cx| {
5838        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5839        settings.tool_permissions.tools.insert(
5840            MovePathTool::NAME.into(),
5841            agent_settings::ToolRules {
5842                default: Some(settings::ToolPermissionMode::Allow),
5843                always_allow: vec![],
5844                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5845                always_confirm: vec![],
5846                invalid_patterns: vec![],
5847            },
5848        );
5849        agent_settings::AgentSettings::override_global(settings, cx);
5850    });
5851
5852    #[allow(clippy::arc_with_non_send_sync)]
5853    let tool = Arc::new(crate::MovePathTool::new(project));
5854    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5855
5856    let task = cx.update(|cx| {
5857        tool.run(
5858            ToolInput::resolved(crate::MovePathToolInput {
5859                source_path: "root/safe.txt".to_string(),
5860                destination_path: "root/protected/safe.txt".to_string(),
5861            }),
5862            event_stream,
5863            cx,
5864        )
5865    });
5866
5867    let result = task.await;
5868    assert!(
5869        result.is_err(),
5870        "expected move to be blocked due to destination path"
5871    );
5872    assert!(
5873        result.unwrap_err().contains("blocked"),
5874        "error should mention the move was blocked"
5875    );
5876}
5877
5878#[gpui::test]
5879async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5880    init_test(cx);
5881
5882    let fs = FakeFs::new(cx.executor());
5883    fs.insert_tree(
5884        "/root",
5885        json!({
5886            "secret.txt": "secret content",
5887            "public": {}
5888        }),
5889    )
5890    .await;
5891    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5892
5893    cx.update(|cx| {
5894        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5895        settings.tool_permissions.tools.insert(
5896            MovePathTool::NAME.into(),
5897            agent_settings::ToolRules {
5898                default: Some(settings::ToolPermissionMode::Allow),
5899                always_allow: vec![],
5900                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5901                always_confirm: vec![],
5902                invalid_patterns: vec![],
5903            },
5904        );
5905        agent_settings::AgentSettings::override_global(settings, cx);
5906    });
5907
5908    #[allow(clippy::arc_with_non_send_sync)]
5909    let tool = Arc::new(crate::MovePathTool::new(project));
5910    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5911
5912    let task = cx.update(|cx| {
5913        tool.run(
5914            ToolInput::resolved(crate::MovePathToolInput {
5915                source_path: "root/secret.txt".to_string(),
5916                destination_path: "root/public/not_secret.txt".to_string(),
5917            }),
5918            event_stream,
5919            cx,
5920        )
5921    });
5922
5923    let result = task.await;
5924    assert!(
5925        result.is_err(),
5926        "expected move to be blocked due to source path"
5927    );
5928    assert!(
5929        result.unwrap_err().contains("blocked"),
5930        "error should mention the move was blocked"
5931    );
5932}
5933
5934#[gpui::test]
5935async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5936    init_test(cx);
5937
5938    let fs = FakeFs::new(cx.executor());
5939    fs.insert_tree(
5940        "/root",
5941        json!({
5942            "confidential.txt": "confidential data",
5943            "dest": {}
5944        }),
5945    )
5946    .await;
5947    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5948
5949    cx.update(|cx| {
5950        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5951        settings.tool_permissions.tools.insert(
5952            CopyPathTool::NAME.into(),
5953            agent_settings::ToolRules {
5954                default: Some(settings::ToolPermissionMode::Allow),
5955                always_allow: vec![],
5956                always_deny: vec![
5957                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5958                ],
5959                always_confirm: vec![],
5960                invalid_patterns: vec![],
5961            },
5962        );
5963        agent_settings::AgentSettings::override_global(settings, cx);
5964    });
5965
5966    #[allow(clippy::arc_with_non_send_sync)]
5967    let tool = Arc::new(crate::CopyPathTool::new(project));
5968    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5969
5970    let task = cx.update(|cx| {
5971        tool.run(
5972            ToolInput::resolved(crate::CopyPathToolInput {
5973                source_path: "root/confidential.txt".to_string(),
5974                destination_path: "root/dest/copy.txt".to_string(),
5975            }),
5976            event_stream,
5977            cx,
5978        )
5979    });
5980
5981    let result = task.await;
5982    assert!(result.is_err(), "expected copy to be blocked");
5983    assert!(
5984        result.unwrap_err().contains("blocked"),
5985        "error should mention the copy was blocked"
5986    );
5987}
5988
5989#[gpui::test]
5990async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5991    init_test(cx);
5992
5993    let fs = FakeFs::new(cx.executor());
5994    fs.insert_tree(
5995        "/root",
5996        json!({
5997            "normal.txt": "normal content",
5998            "readonly": {
5999                "config.txt": "readonly content"
6000            }
6001        }),
6002    )
6003    .await;
6004    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6005
6006    cx.update(|cx| {
6007        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6008        settings.tool_permissions.tools.insert(
6009            SaveFileTool::NAME.into(),
6010            agent_settings::ToolRules {
6011                default: Some(settings::ToolPermissionMode::Allow),
6012                always_allow: vec![],
6013                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6014                always_confirm: vec![],
6015                invalid_patterns: vec![],
6016            },
6017        );
6018        agent_settings::AgentSettings::override_global(settings, cx);
6019    });
6020
6021    #[allow(clippy::arc_with_non_send_sync)]
6022    let tool = Arc::new(crate::SaveFileTool::new(project));
6023    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6024
6025    let task = cx.update(|cx| {
6026        tool.run(
6027            ToolInput::resolved(crate::SaveFileToolInput {
6028                paths: vec![
6029                    std::path::PathBuf::from("root/normal.txt"),
6030                    std::path::PathBuf::from("root/readonly/config.txt"),
6031                ],
6032            }),
6033            event_stream,
6034            cx,
6035        )
6036    });
6037
6038    let result = task.await;
6039    assert!(
6040        result.is_err(),
6041        "expected save to be blocked due to denied path"
6042    );
6043    assert!(
6044        result.unwrap_err().contains("blocked"),
6045        "error should mention the save was blocked"
6046    );
6047}
6048
6049#[gpui::test]
6050async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6051    init_test(cx);
6052
6053    let fs = FakeFs::new(cx.executor());
6054    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6055        .await;
6056    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6057
6058    cx.update(|cx| {
6059        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6060        settings.tool_permissions.tools.insert(
6061            SaveFileTool::NAME.into(),
6062            agent_settings::ToolRules {
6063                default: Some(settings::ToolPermissionMode::Allow),
6064                always_allow: vec![],
6065                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6066                always_confirm: vec![],
6067                invalid_patterns: vec![],
6068            },
6069        );
6070        agent_settings::AgentSettings::override_global(settings, cx);
6071    });
6072
6073    #[allow(clippy::arc_with_non_send_sync)]
6074    let tool = Arc::new(crate::SaveFileTool::new(project));
6075    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6076
6077    let task = cx.update(|cx| {
6078        tool.run(
6079            ToolInput::resolved(crate::SaveFileToolInput {
6080                paths: vec![std::path::PathBuf::from("root/config.secret")],
6081            }),
6082            event_stream,
6083            cx,
6084        )
6085    });
6086
6087    let result = task.await;
6088    assert!(result.is_err(), "expected save to be blocked");
6089    assert!(
6090        result.unwrap_err().contains("blocked"),
6091        "error should mention the save was blocked"
6092    );
6093}
6094
6095#[gpui::test]
6096async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6097    init_test(cx);
6098
6099    cx.update(|cx| {
6100        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6101        settings.tool_permissions.tools.insert(
6102            WebSearchTool::NAME.into(),
6103            agent_settings::ToolRules {
6104                default: Some(settings::ToolPermissionMode::Allow),
6105                always_allow: vec![],
6106                always_deny: vec![
6107                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6108                ],
6109                always_confirm: vec![],
6110                invalid_patterns: vec![],
6111            },
6112        );
6113        agent_settings::AgentSettings::override_global(settings, cx);
6114    });
6115
6116    #[allow(clippy::arc_with_non_send_sync)]
6117    let tool = Arc::new(crate::WebSearchTool);
6118    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6119
6120    let input: crate::WebSearchToolInput =
6121        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6122
6123    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6124
6125    let result = task.await;
6126    assert!(result.is_err(), "expected search to be blocked");
6127    match result.unwrap_err() {
6128        crate::WebSearchToolOutput::Error { error } => {
6129            assert!(
6130                error.contains("blocked"),
6131                "error should mention the search was blocked"
6132            );
6133        }
6134        other => panic!("expected Error variant, got: {other:?}"),
6135    }
6136}
6137
6138#[gpui::test]
6139async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6140    init_test(cx);
6141
6142    let fs = FakeFs::new(cx.executor());
6143    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6144        .await;
6145    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6146
6147    cx.update(|cx| {
6148        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6149        settings.tool_permissions.tools.insert(
6150            EditFileTool::NAME.into(),
6151            agent_settings::ToolRules {
6152                default: Some(settings::ToolPermissionMode::Confirm),
6153                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6154                always_deny: vec![],
6155                always_confirm: vec![],
6156                invalid_patterns: vec![],
6157            },
6158        );
6159        agent_settings::AgentSettings::override_global(settings, cx);
6160    });
6161
6162    let context_server_registry =
6163        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6164    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6165    let templates = crate::Templates::new();
6166    let thread = cx.new(|cx| {
6167        crate::Thread::new(
6168            project.clone(),
6169            cx.new(|_cx| prompt_store::ProjectContext::default()),
6170            context_server_registry,
6171            templates.clone(),
6172            None,
6173            cx,
6174        )
6175    });
6176
6177    #[allow(clippy::arc_with_non_send_sync)]
6178    let tool = Arc::new(crate::EditFileTool::new(
6179        project,
6180        thread.downgrade(),
6181        language_registry,
6182        templates,
6183    ));
6184    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6185
6186    let _task = cx.update(|cx| {
6187        tool.run(
6188            ToolInput::resolved(crate::EditFileToolInput {
6189                display_description: "Edit README".to_string(),
6190                path: "root/README.md".into(),
6191                mode: crate::EditFileMode::Edit,
6192            }),
6193            event_stream,
6194            cx,
6195        )
6196    });
6197
6198    cx.run_until_parked();
6199
6200    let event = rx.try_next();
6201    assert!(
6202        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6203        "expected no authorization request for allowed .md file"
6204    );
6205}
6206
6207#[gpui::test]
6208async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6209    init_test(cx);
6210
6211    let fs = FakeFs::new(cx.executor());
6212    fs.insert_tree(
6213        "/root",
6214        json!({
6215            ".zed": {
6216                "settings.json": "{}"
6217            },
6218            "README.md": "# Hello"
6219        }),
6220    )
6221    .await;
6222    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6223
6224    cx.update(|cx| {
6225        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6226        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6227        agent_settings::AgentSettings::override_global(settings, cx);
6228    });
6229
6230    let context_server_registry =
6231        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6232    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6233    let templates = crate::Templates::new();
6234    let thread = cx.new(|cx| {
6235        crate::Thread::new(
6236            project.clone(),
6237            cx.new(|_cx| prompt_store::ProjectContext::default()),
6238            context_server_registry,
6239            templates.clone(),
6240            None,
6241            cx,
6242        )
6243    });
6244
6245    #[allow(clippy::arc_with_non_send_sync)]
6246    let tool = Arc::new(crate::EditFileTool::new(
6247        project,
6248        thread.downgrade(),
6249        language_registry,
6250        templates,
6251    ));
6252
6253    // Editing a file inside .zed/ should still prompt even with global default: allow,
6254    // because local settings paths are sensitive and require confirmation regardless.
6255    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6256    let _task = cx.update(|cx| {
6257        tool.run(
6258            ToolInput::resolved(crate::EditFileToolInput {
6259                display_description: "Edit local settings".to_string(),
6260                path: "root/.zed/settings.json".into(),
6261                mode: crate::EditFileMode::Edit,
6262            }),
6263            event_stream,
6264            cx,
6265        )
6266    });
6267
6268    let _update = rx.expect_update_fields().await;
6269    let _auth = rx.expect_authorization().await;
6270}
6271
6272#[gpui::test]
6273async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6274    init_test(cx);
6275
6276    cx.update(|cx| {
6277        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6278        settings.tool_permissions.tools.insert(
6279            FetchTool::NAME.into(),
6280            agent_settings::ToolRules {
6281                default: Some(settings::ToolPermissionMode::Allow),
6282                always_allow: vec![],
6283                always_deny: vec![
6284                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6285                ],
6286                always_confirm: vec![],
6287                invalid_patterns: vec![],
6288            },
6289        );
6290        agent_settings::AgentSettings::override_global(settings, cx);
6291    });
6292
6293    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6294
6295    #[allow(clippy::arc_with_non_send_sync)]
6296    let tool = Arc::new(crate::FetchTool::new(http_client));
6297    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6298
6299    let input: crate::FetchToolInput =
6300        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6301
6302    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6303
6304    let result = task.await;
6305    assert!(result.is_err(), "expected fetch to be blocked");
6306    assert!(
6307        result.unwrap_err().contains("blocked"),
6308        "error should mention the fetch was blocked"
6309    );
6310}
6311
6312#[gpui::test]
6313async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6314    init_test(cx);
6315
6316    cx.update(|cx| {
6317        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6318        settings.tool_permissions.tools.insert(
6319            FetchTool::NAME.into(),
6320            agent_settings::ToolRules {
6321                default: Some(settings::ToolPermissionMode::Confirm),
6322                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6323                always_deny: vec![],
6324                always_confirm: vec![],
6325                invalid_patterns: vec![],
6326            },
6327        );
6328        agent_settings::AgentSettings::override_global(settings, cx);
6329    });
6330
6331    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6332
6333    #[allow(clippy::arc_with_non_send_sync)]
6334    let tool = Arc::new(crate::FetchTool::new(http_client));
6335    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6336
6337    let input: crate::FetchToolInput =
6338        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6339
6340    let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6341
6342    cx.run_until_parked();
6343
6344    let event = rx.try_next();
6345    assert!(
6346        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6347        "expected no authorization request for allowed docs.rs URL"
6348    );
6349}
6350
6351#[gpui::test]
6352async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6353    init_test(cx);
6354    always_allow_tools(cx);
6355
6356    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6357    let fake_model = model.as_fake();
6358
6359    // Add a tool so we can simulate tool calls
6360    thread.update(cx, |thread, _cx| {
6361        thread.add_tool(EchoTool);
6362    });
6363
6364    // Start a turn by sending a message
6365    let mut events = thread
6366        .update(cx, |thread, cx| {
6367            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6368        })
6369        .unwrap();
6370    cx.run_until_parked();
6371
6372    // Simulate the model making a tool call
6373    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6374        LanguageModelToolUse {
6375            id: "tool_1".into(),
6376            name: "echo".into(),
6377            raw_input: r#"{"text": "hello"}"#.into(),
6378            input: json!({"text": "hello"}),
6379            is_input_complete: true,
6380            thought_signature: None,
6381        },
6382    ));
6383    fake_model
6384        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6385
6386    // Signal that a message is queued before ending the stream
6387    thread.update(cx, |thread, _cx| {
6388        thread.set_has_queued_message(true);
6389    });
6390
6391    // Now end the stream - tool will run, and the boundary check should see the queue
6392    fake_model.end_last_completion_stream();
6393
6394    // Collect all events until the turn stops
6395    let all_events = collect_events_until_stop(&mut events, cx).await;
6396
6397    // Verify we received the tool call event
6398    let tool_call_ids: Vec<_> = all_events
6399        .iter()
6400        .filter_map(|e| match e {
6401            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6402            _ => None,
6403        })
6404        .collect();
6405    assert_eq!(
6406        tool_call_ids,
6407        vec!["tool_1"],
6408        "Should have received a tool call event for our echo tool"
6409    );
6410
6411    // The turn should have stopped with EndTurn
6412    let stop_reasons = stop_events(all_events);
6413    assert_eq!(
6414        stop_reasons,
6415        vec![acp::StopReason::EndTurn],
6416        "Turn should have ended after tool completion due to queued message"
6417    );
6418
6419    // Verify the queued message flag is still set
6420    thread.update(cx, |thread, _cx| {
6421        assert!(
6422            thread.has_queued_message(),
6423            "Should still have queued message flag set"
6424        );
6425    });
6426
6427    // Thread should be idle now
6428    thread.update(cx, |thread, _cx| {
6429        assert!(
6430            thread.is_turn_complete(),
6431            "Thread should not be running after turn ends"
6432        );
6433    });
6434}
6435
6436#[gpui::test]
6437async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6438    init_test(cx);
6439    always_allow_tools(cx);
6440
6441    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6442    let fake_model = model.as_fake();
6443
6444    thread.update(cx, |thread, _cx| {
6445        thread.add_tool(StreamingFailingEchoTool {
6446            receive_chunks_until_failure: 1,
6447        });
6448    });
6449
6450    let _events = thread
6451        .update(cx, |thread, cx| {
6452            thread.send(
6453                UserMessageId::new(),
6454                ["Use the streaming_failing_echo tool"],
6455                cx,
6456            )
6457        })
6458        .unwrap();
6459    cx.run_until_parked();
6460
6461    let tool_use = LanguageModelToolUse {
6462        id: "call_1".into(),
6463        name: StreamingFailingEchoTool::NAME.into(),
6464        raw_input: "hello".into(),
6465        input: json!({}),
6466        is_input_complete: false,
6467        thought_signature: None,
6468    };
6469
6470    fake_model
6471        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6472
6473    cx.run_until_parked();
6474
6475    let completions = fake_model.pending_completions();
6476    let last_completion = completions.last().unwrap();
6477
6478    assert_eq!(
6479        last_completion.messages[1..],
6480        vec![
6481            LanguageModelRequestMessage {
6482                role: Role::User,
6483                content: vec!["Use the streaming_failing_echo tool".into()],
6484                cache: false,
6485                reasoning_details: None,
6486            },
6487            LanguageModelRequestMessage {
6488                role: Role::Assistant,
6489                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6490                cache: false,
6491                reasoning_details: None,
6492            },
6493            LanguageModelRequestMessage {
6494                role: Role::User,
6495                content: vec![language_model::MessageContent::ToolResult(
6496                    LanguageModelToolResult {
6497                        tool_use_id: tool_use.id.clone(),
6498                        tool_name: tool_use.name,
6499                        is_error: true,
6500                        content: "failed".into(),
6501                        output: Some("failed".into()),
6502                    }
6503                )],
6504                cache: true,
6505                reasoning_details: None,
6506            },
6507        ]
6508    );
6509}
6510
6511#[gpui::test]
6512async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6513    init_test(cx);
6514    always_allow_tools(cx);
6515
6516    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6517    let fake_model = model.as_fake();
6518
6519    let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6520        oneshot::channel();
6521
6522    thread.update(cx, |thread, _cx| {
6523        thread.add_tool(
6524            StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6525        );
6526        thread.add_tool(StreamingFailingEchoTool {
6527            receive_chunks_until_failure: 1,
6528        });
6529    });
6530
6531    let _events = thread
6532        .update(cx, |thread, cx| {
6533            thread.send(
6534                UserMessageId::new(),
6535                ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6536                cx,
6537            )
6538        })
6539        .unwrap();
6540    cx.run_until_parked();
6541
6542    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6543        LanguageModelToolUse {
6544            id: "call_1".into(),
6545            name: StreamingEchoTool::NAME.into(),
6546            raw_input: "hello".into(),
6547            input: json!({ "text": "hello" }),
6548            is_input_complete: false,
6549            thought_signature: None,
6550        },
6551    ));
6552    let first_tool_use = LanguageModelToolUse {
6553        id: "call_1".into(),
6554        name: StreamingEchoTool::NAME.into(),
6555        raw_input: "hello world".into(),
6556        input: json!({ "text": "hello world" }),
6557        is_input_complete: true,
6558        thought_signature: None,
6559    };
6560    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6561        first_tool_use.clone(),
6562    ));
6563    let second_tool_use = LanguageModelToolUse {
6564        name: StreamingFailingEchoTool::NAME.into(),
6565        raw_input: "hello".into(),
6566        input: json!({ "text": "hello" }),
6567        is_input_complete: false,
6568        thought_signature: None,
6569        id: "call_2".into(),
6570    };
6571    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6572        second_tool_use.clone(),
6573    ));
6574
6575    cx.run_until_parked();
6576
6577    complete_streaming_echo_tool_call_tx.send(()).unwrap();
6578
6579    cx.run_until_parked();
6580
6581    let completions = fake_model.pending_completions();
6582    let last_completion = completions.last().unwrap();
6583
6584    assert_eq!(
6585        last_completion.messages[1..],
6586        vec![
6587            LanguageModelRequestMessage {
6588                role: Role::User,
6589                content: vec![
6590                    "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6591                ],
6592                cache: false,
6593                reasoning_details: None,
6594            },
6595            LanguageModelRequestMessage {
6596                role: Role::Assistant,
6597                content: vec![
6598                    language_model::MessageContent::ToolUse(first_tool_use.clone()),
6599                    language_model::MessageContent::ToolUse(second_tool_use.clone())
6600                ],
6601                cache: false,
6602                reasoning_details: None,
6603            },
6604            LanguageModelRequestMessage {
6605                role: Role::User,
6606                content: vec![
6607                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6608                        tool_use_id: second_tool_use.id.clone(),
6609                        tool_name: second_tool_use.name,
6610                        is_error: true,
6611                        content: "failed".into(),
6612                        output: Some("failed".into()),
6613                    }),
6614                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6615                        tool_use_id: first_tool_use.id.clone(),
6616                        tool_name: first_tool_use.name,
6617                        is_error: false,
6618                        content: "hello world".into(),
6619                        output: Some("hello world".into()),
6620                    }),
6621                ],
6622                cache: true,
6623                reasoning_details: None,
6624            },
6625        ]
6626    );
6627}
6628
6629#[gpui::test]
6630async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6631    let ThreadTest {
6632        model, thread, fs, ..
6633    } = setup(cx, TestModel::Fake).await;
6634    let fake_model_a = model.as_fake();
6635
6636    thread.update(cx, |thread, _cx| {
6637        thread.add_tool(EchoTool);
6638        thread.add_tool(DelayTool);
6639    });
6640
6641    // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6642    fs.insert_file(
6643        paths::settings_file(),
6644        json!({
6645            "agent": {
6646                "profiles": {
6647                    "profile-a": {
6648                        "name": "Profile A",
6649                        "tools": {
6650                            EchoTool::NAME: true,
6651                            DelayTool::NAME: true,
6652                        }
6653                    },
6654                    "profile-b": {
6655                        "name": "Profile B",
6656                        "tools": {
6657                            DelayTool::NAME: true,
6658                        }
6659                    }
6660                }
6661            }
6662        })
6663        .to_string()
6664        .into_bytes(),
6665    )
6666    .await;
6667    cx.run_until_parked();
6668
6669    thread.update(cx, |thread, cx| {
6670        thread.set_profile(AgentProfileId("profile-a".into()), cx);
6671        thread.set_thinking_enabled(false, cx);
6672    });
6673
6674    // Send a message — first iteration starts with model A, profile-a, thinking off.
6675    thread
6676        .update(cx, |thread, cx| {
6677            thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6678        })
6679        .unwrap();
6680    cx.run_until_parked();
6681
6682    // Verify first request has both tools and thinking disabled.
6683    let completions = fake_model_a.pending_completions();
6684    assert_eq!(completions.len(), 1);
6685    let first_tools = tool_names_for_completion(&completions[0]);
6686    assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6687    assert!(!completions[0].thinking_allowed);
6688
6689    // Model A responds with an echo tool call.
6690    fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6691        LanguageModelToolUse {
6692            id: "tool_1".into(),
6693            name: "echo".into(),
6694            raw_input: r#"{"text":"hello"}"#.into(),
6695            input: json!({"text": "hello"}),
6696            is_input_complete: true,
6697            thought_signature: None,
6698        },
6699    ));
6700    fake_model_a.end_last_completion_stream();
6701
6702    // Before the next iteration runs, switch to profile-b (only DelayTool),
6703    // swap in a new model, and enable thinking.
6704    let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6705        "test-provider",
6706        "model-b",
6707        "Model B",
6708        true,
6709    ));
6710    thread.update(cx, |thread, cx| {
6711        thread.set_profile(AgentProfileId("profile-b".into()), cx);
6712        thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6713        thread.set_thinking_enabled(true, cx);
6714    });
6715
6716    // Run until parked — processes the echo tool call, loops back, picks up
6717    // the new model/profile/thinking, and makes a second request to model B.
6718    cx.run_until_parked();
6719
6720    // The second request should have gone to model B.
6721    let model_b_completions = fake_model_b.pending_completions();
6722    assert_eq!(
6723        model_b_completions.len(),
6724        1,
6725        "second request should go to model B"
6726    );
6727
6728    // Profile-b only has DelayTool, so echo should be gone.
6729    let second_tools = tool_names_for_completion(&model_b_completions[0]);
6730    assert_eq!(second_tools, vec![DelayTool::NAME]);
6731
6732    // Thinking should now be enabled.
6733    assert!(model_b_completions[0].thinking_allowed);
6734}