mod.rs

   1use super::*;
   2use acp_thread::{
   3    AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
   4    UserMessageId,
   5};
   6use agent_client_protocol::{self as acp};
   7use agent_settings::AgentProfileId;
   8use anyhow::Result;
   9use client::{Client, UserStore};
  10use cloud_llm_client::CompletionIntent;
  11use collections::IndexMap;
  12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  13use feature_flags::FeatureFlagAppExt as _;
  14use fs::{FakeFs, Fs};
  15use futures::{
  16    FutureExt as _, StreamExt,
  17    channel::{
  18        mpsc::{self, UnboundedReceiver},
  19        oneshot,
  20    },
  21    future::{Fuse, Shared},
  22};
  23use gpui::{
  24    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  25    http_client::FakeHttpClient,
  26};
  27use indoc::indoc;
  28use language_model::{
  29    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  30    LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  31    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  32    LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
  33    fake_provider::FakeLanguageModel,
  34};
  35use pretty_assertions::assert_eq;
  36use project::{
  37    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  38};
  39use prompt_store::ProjectContext;
  40use reqwest_client::ReqwestClient;
  41use schemars::JsonSchema;
  42use serde::{Deserialize, Serialize};
  43use serde_json::json;
  44use settings::{Settings, SettingsStore};
  45use std::{
  46    path::Path,
  47    pin::Pin,
  48    rc::Rc,
  49    sync::{
  50        Arc,
  51        atomic::{AtomicBool, AtomicUsize, Ordering},
  52    },
  53    time::Duration,
  54};
  55use util::path;
  56
  57mod edit_file_thread_test;
  58mod test_tools;
  59use test_tools::*;
  60
  61pub(crate) fn init_test(cx: &mut TestAppContext) {
  62    cx.update(|cx| {
  63        let settings_store = SettingsStore::test(cx);
  64        cx.set_global(settings_store);
  65    });
  66}
  67
  68pub(crate) struct FakeTerminalHandle {
  69    killed: Arc<AtomicBool>,
  70    stopped_by_user: Arc<AtomicBool>,
  71    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  72    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  73    output: acp::TerminalOutputResponse,
  74    id: acp::TerminalId,
  75}
  76
  77impl FakeTerminalHandle {
  78    pub(crate) fn new_never_exits(cx: &mut App) -> Self {
  79        let killed = Arc::new(AtomicBool::new(false));
  80        let stopped_by_user = Arc::new(AtomicBool::new(false));
  81
  82        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  83
  84        let wait_for_exit = cx
  85            .spawn(async move |_cx| {
  86                // Wait for the exit signal (sent when kill() is called)
  87                let _ = exit_receiver.await;
  88                acp::TerminalExitStatus::new()
  89            })
  90            .shared();
  91
  92        Self {
  93            killed,
  94            stopped_by_user,
  95            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  96            wait_for_exit,
  97            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  98            id: acp::TerminalId::new("fake_terminal".to_string()),
  99        }
 100    }
 101
 102    pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
 103        let killed = Arc::new(AtomicBool::new(false));
 104        let stopped_by_user = Arc::new(AtomicBool::new(false));
 105        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 106
 107        let wait_for_exit = cx
 108            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 109            .shared();
 110
 111        Self {
 112            killed,
 113            stopped_by_user,
 114            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 115            wait_for_exit,
 116            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 117            id: acp::TerminalId::new("fake_terminal".to_string()),
 118        }
 119    }
 120
 121    pub(crate) fn was_killed(&self) -> bool {
 122        self.killed.load(Ordering::SeqCst)
 123    }
 124
 125    pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
 126        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 127    }
 128
 129    pub(crate) fn signal_exit(&self) {
 130        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 131            let _ = sender.send(());
 132        }
 133    }
 134}
 135
 136impl crate::TerminalHandle for FakeTerminalHandle {
 137    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 138        Ok(self.id.clone())
 139    }
 140
 141    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 142        Ok(self.output.clone())
 143    }
 144
 145    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 146        Ok(self.wait_for_exit.clone())
 147    }
 148
 149    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 150        self.killed.store(true, Ordering::SeqCst);
 151        self.signal_exit();
 152        Ok(())
 153    }
 154
 155    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 156        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 157    }
 158}
 159
 160struct FakeSubagentHandle {
 161    session_id: acp::SessionId,
 162    send_task: Shared<Task<String>>,
 163}
 164
 165impl SubagentHandle for FakeSubagentHandle {
 166    fn id(&self) -> acp::SessionId {
 167        self.session_id.clone()
 168    }
 169
 170    fn num_entries(&self, _cx: &App) -> usize {
 171        unimplemented!()
 172    }
 173
 174    fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
 175        let task = self.send_task.clone();
 176        cx.background_spawn(async move { Ok(task.await) })
 177    }
 178}
 179
 180#[derive(Default)]
 181pub(crate) struct FakeThreadEnvironment {
 182    terminal_handle: Option<Rc<FakeTerminalHandle>>,
 183    subagent_handle: Option<Rc<FakeSubagentHandle>>,
 184    terminal_creations: Arc<AtomicUsize>,
 185}
 186
 187impl FakeThreadEnvironment {
 188    pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
 189        Self {
 190            terminal_handle: Some(terminal_handle.into()),
 191            ..self
 192        }
 193    }
 194
 195    pub(crate) fn terminal_creation_count(&self) -> usize {
 196        self.terminal_creations.load(Ordering::SeqCst)
 197    }
 198}
 199
 200impl crate::ThreadEnvironment for FakeThreadEnvironment {
 201    fn create_terminal(
 202        &self,
 203        _command: String,
 204        _cwd: Option<std::path::PathBuf>,
 205        _output_byte_limit: Option<u64>,
 206        _cx: &mut AsyncApp,
 207    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 208        self.terminal_creations.fetch_add(1, Ordering::SeqCst);
 209        let handle = self
 210            .terminal_handle
 211            .clone()
 212            .expect("Terminal handle not available on FakeThreadEnvironment");
 213        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 214    }
 215
 216    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 217        Ok(self
 218            .subagent_handle
 219            .clone()
 220            .expect("Subagent handle not available on FakeThreadEnvironment")
 221            as Rc<dyn SubagentHandle>)
 222    }
 223}
 224
 225/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 226struct MultiTerminalEnvironment {
 227    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 228}
 229
 230impl MultiTerminalEnvironment {
 231    fn new() -> Self {
 232        Self {
 233            handles: std::cell::RefCell::new(Vec::new()),
 234        }
 235    }
 236
 237    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 238        self.handles.borrow().clone()
 239    }
 240}
 241
 242impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 243    fn create_terminal(
 244        &self,
 245        _command: String,
 246        _cwd: Option<std::path::PathBuf>,
 247        _output_byte_limit: Option<u64>,
 248        cx: &mut AsyncApp,
 249    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 250        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 251        self.handles.borrow_mut().push(handle.clone());
 252        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 253    }
 254
 255    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 256        unimplemented!()
 257    }
 258}
 259
 260fn always_allow_tools(cx: &mut TestAppContext) {
 261    cx.update(|cx| {
 262        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 263        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
 264        agent_settings::AgentSettings::override_global(settings, cx);
 265    });
 266}
 267
 268#[gpui::test]
 269async fn test_echo(cx: &mut TestAppContext) {
 270    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 271    let fake_model = model.as_fake();
 272
 273    let events = thread
 274        .update(cx, |thread, cx| {
 275            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 276        })
 277        .unwrap();
 278    cx.run_until_parked();
 279    fake_model.send_last_completion_stream_text_chunk("Hello");
 280    fake_model
 281        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 282    fake_model.end_last_completion_stream();
 283
 284    let events = events.collect().await;
 285    thread.update(cx, |thread, _cx| {
 286        assert_eq!(
 287            thread.last_received_or_pending_message().unwrap().role(),
 288            Role::Assistant
 289        );
 290        assert_eq!(
 291            thread
 292                .last_received_or_pending_message()
 293                .unwrap()
 294                .to_markdown(),
 295            "Hello\n"
 296        )
 297    });
 298    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 299}
 300
 301#[gpui::test]
 302async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 303    init_test(cx);
 304    always_allow_tools(cx);
 305
 306    let fs = FakeFs::new(cx.executor());
 307    let project = Project::test(fs, [], cx).await;
 308
 309    let environment = Rc::new(cx.update(|cx| {
 310        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 311    }));
 312    let handle = environment.terminal_handle.clone().unwrap();
 313
 314    #[allow(clippy::arc_with_non_send_sync)]
 315    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 316    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 317
 318    let task = cx.update(|cx| {
 319        tool.run(
 320            ToolInput::resolved(crate::TerminalToolInput {
 321                command: "sleep 1000".to_string(),
 322                cd: ".".to_string(),
 323                timeout_ms: Some(5),
 324            }),
 325            event_stream,
 326            cx,
 327        )
 328    });
 329
 330    let update = rx.expect_update_fields().await;
 331    assert!(
 332        update.content.iter().any(|blocks| {
 333            blocks
 334                .iter()
 335                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 336        }),
 337        "expected tool call update to include terminal content"
 338    );
 339
 340    let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
 341
 342    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 343    loop {
 344        if let Some(result) = task_future.as_mut().now_or_never() {
 345            let result = result.expect("terminal tool task should complete");
 346
 347            assert!(
 348                handle.was_killed(),
 349                "expected terminal handle to be killed on timeout"
 350            );
 351            assert!(
 352                result.contains("partial output"),
 353                "expected result to include terminal output, got: {result}"
 354            );
 355            return;
 356        }
 357
 358        if std::time::Instant::now() >= deadline {
 359            panic!("timed out waiting for terminal tool task to complete");
 360        }
 361
 362        cx.run_until_parked();
 363        cx.background_executor.timer(Duration::from_millis(1)).await;
 364    }
 365}
 366
 367#[gpui::test]
 368#[ignore]
 369async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 370    init_test(cx);
 371    always_allow_tools(cx);
 372
 373    let fs = FakeFs::new(cx.executor());
 374    let project = Project::test(fs, [], cx).await;
 375
 376    let environment = Rc::new(cx.update(|cx| {
 377        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 378    }));
 379    let handle = environment.terminal_handle.clone().unwrap();
 380
 381    #[allow(clippy::arc_with_non_send_sync)]
 382    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 383    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 384
 385    let _task = cx.update(|cx| {
 386        tool.run(
 387            ToolInput::resolved(crate::TerminalToolInput {
 388                command: "sleep 1000".to_string(),
 389                cd: ".".to_string(),
 390                timeout_ms: None,
 391            }),
 392            event_stream,
 393            cx,
 394        )
 395    });
 396
 397    let update = rx.expect_update_fields().await;
 398    assert!(
 399        update.content.iter().any(|blocks| {
 400            blocks
 401                .iter()
 402                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 403        }),
 404        "expected tool call update to include terminal content"
 405    );
 406
 407    cx.background_executor
 408        .timer(Duration::from_millis(25))
 409        .await;
 410
 411    assert!(
 412        !handle.was_killed(),
 413        "did not expect terminal handle to be killed without a timeout"
 414    );
 415}
 416
 417#[gpui::test]
 418async fn test_thinking(cx: &mut TestAppContext) {
 419    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 420    let fake_model = model.as_fake();
 421
 422    let events = thread
 423        .update(cx, |thread, cx| {
 424            thread.send(
 425                UserMessageId::new(),
 426                [indoc! {"
 427                    Testing:
 428
 429                    Generate a thinking step where you just think the word 'Think',
 430                    and have your final answer be 'Hello'
 431                "}],
 432                cx,
 433            )
 434        })
 435        .unwrap();
 436    cx.run_until_parked();
 437    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 438        text: "Think".to_string(),
 439        signature: None,
 440    });
 441    fake_model.send_last_completion_stream_text_chunk("Hello");
 442    fake_model
 443        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 444    fake_model.end_last_completion_stream();
 445
 446    let events = events.collect().await;
 447    thread.update(cx, |thread, _cx| {
 448        assert_eq!(
 449            thread.last_received_or_pending_message().unwrap().role(),
 450            Role::Assistant
 451        );
 452        assert_eq!(
 453            thread
 454                .last_received_or_pending_message()
 455                .unwrap()
 456                .to_markdown(),
 457            indoc! {"
 458                <think>Think</think>
 459                Hello
 460            "}
 461        )
 462    });
 463    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 464}
 465
 466#[gpui::test]
 467async fn test_system_prompt(cx: &mut TestAppContext) {
 468    let ThreadTest {
 469        model,
 470        thread,
 471        project_context,
 472        ..
 473    } = setup(cx, TestModel::Fake).await;
 474    let fake_model = model.as_fake();
 475
 476    project_context.update(cx, |project_context, _cx| {
 477        project_context.shell = "test-shell".into()
 478    });
 479    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 480    thread
 481        .update(cx, |thread, cx| {
 482            thread.send(UserMessageId::new(), ["abc"], cx)
 483        })
 484        .unwrap();
 485    cx.run_until_parked();
 486    let mut pending_completions = fake_model.pending_completions();
 487    assert_eq!(
 488        pending_completions.len(),
 489        1,
 490        "unexpected pending completions: {:?}",
 491        pending_completions
 492    );
 493
 494    let pending_completion = pending_completions.pop().unwrap();
 495    assert_eq!(pending_completion.messages[0].role, Role::System);
 496
 497    let system_message = &pending_completion.messages[0];
 498    let system_prompt = system_message.content[0].to_str().unwrap();
 499    assert!(
 500        system_prompt.contains("test-shell"),
 501        "unexpected system message: {:?}",
 502        system_message
 503    );
 504    assert!(
 505        system_prompt.contains("## Fixing Diagnostics"),
 506        "unexpected system message: {:?}",
 507        system_message
 508    );
 509}
 510
 511#[gpui::test]
 512async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 513    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 514    let fake_model = model.as_fake();
 515
 516    thread
 517        .update(cx, |thread, cx| {
 518            thread.send(UserMessageId::new(), ["abc"], cx)
 519        })
 520        .unwrap();
 521    cx.run_until_parked();
 522    let mut pending_completions = fake_model.pending_completions();
 523    assert_eq!(
 524        pending_completions.len(),
 525        1,
 526        "unexpected pending completions: {:?}",
 527        pending_completions
 528    );
 529
 530    let pending_completion = pending_completions.pop().unwrap();
 531    assert_eq!(pending_completion.messages[0].role, Role::System);
 532
 533    let system_message = &pending_completion.messages[0];
 534    let system_prompt = system_message.content[0].to_str().unwrap();
 535    assert!(
 536        !system_prompt.contains("## Tool Use"),
 537        "unexpected system message: {:?}",
 538        system_message
 539    );
 540    assert!(
 541        !system_prompt.contains("## Fixing Diagnostics"),
 542        "unexpected system message: {:?}",
 543        system_message
 544    );
 545}
 546
 547#[gpui::test]
 548async fn test_prompt_caching(cx: &mut TestAppContext) {
 549    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 550    let fake_model = model.as_fake();
 551
 552    // Send initial user message and verify it's cached
 553    thread
 554        .update(cx, |thread, cx| {
 555            thread.send(UserMessageId::new(), ["Message 1"], cx)
 556        })
 557        .unwrap();
 558    cx.run_until_parked();
 559
 560    let completion = fake_model.pending_completions().pop().unwrap();
 561    assert_eq!(
 562        completion.messages[1..],
 563        vec![LanguageModelRequestMessage {
 564            role: Role::User,
 565            content: vec!["Message 1".into()],
 566            cache: true,
 567            reasoning_details: None,
 568        }]
 569    );
 570    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 571        "Response to Message 1".into(),
 572    ));
 573    fake_model.end_last_completion_stream();
 574    cx.run_until_parked();
 575
 576    // Send another user message and verify only the latest is cached
 577    thread
 578        .update(cx, |thread, cx| {
 579            thread.send(UserMessageId::new(), ["Message 2"], cx)
 580        })
 581        .unwrap();
 582    cx.run_until_parked();
 583
 584    let completion = fake_model.pending_completions().pop().unwrap();
 585    assert_eq!(
 586        completion.messages[1..],
 587        vec![
 588            LanguageModelRequestMessage {
 589                role: Role::User,
 590                content: vec!["Message 1".into()],
 591                cache: false,
 592                reasoning_details: None,
 593            },
 594            LanguageModelRequestMessage {
 595                role: Role::Assistant,
 596                content: vec!["Response to Message 1".into()],
 597                cache: false,
 598                reasoning_details: None,
 599            },
 600            LanguageModelRequestMessage {
 601                role: Role::User,
 602                content: vec!["Message 2".into()],
 603                cache: true,
 604                reasoning_details: None,
 605            }
 606        ]
 607    );
 608    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 609        "Response to Message 2".into(),
 610    ));
 611    fake_model.end_last_completion_stream();
 612    cx.run_until_parked();
 613
 614    // Simulate a tool call and verify that the latest tool result is cached
 615    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 616    thread
 617        .update(cx, |thread, cx| {
 618            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 619        })
 620        .unwrap();
 621    cx.run_until_parked();
 622
 623    let tool_use = LanguageModelToolUse {
 624        id: "tool_1".into(),
 625        name: EchoTool::NAME.into(),
 626        raw_input: json!({"text": "test"}).to_string(),
 627        input: json!({"text": "test"}),
 628        is_input_complete: true,
 629        thought_signature: None,
 630    };
 631    fake_model
 632        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 633    fake_model.end_last_completion_stream();
 634    cx.run_until_parked();
 635
 636    let completion = fake_model.pending_completions().pop().unwrap();
 637    let tool_result = LanguageModelToolResult {
 638        tool_use_id: "tool_1".into(),
 639        tool_name: EchoTool::NAME.into(),
 640        is_error: false,
 641        content: "test".into(),
 642        output: Some("test".into()),
 643    };
 644    assert_eq!(
 645        completion.messages[1..],
 646        vec![
 647            LanguageModelRequestMessage {
 648                role: Role::User,
 649                content: vec!["Message 1".into()],
 650                cache: false,
 651                reasoning_details: None,
 652            },
 653            LanguageModelRequestMessage {
 654                role: Role::Assistant,
 655                content: vec!["Response to Message 1".into()],
 656                cache: false,
 657                reasoning_details: None,
 658            },
 659            LanguageModelRequestMessage {
 660                role: Role::User,
 661                content: vec!["Message 2".into()],
 662                cache: false,
 663                reasoning_details: None,
 664            },
 665            LanguageModelRequestMessage {
 666                role: Role::Assistant,
 667                content: vec!["Response to Message 2".into()],
 668                cache: false,
 669                reasoning_details: None,
 670            },
 671            LanguageModelRequestMessage {
 672                role: Role::User,
 673                content: vec!["Use the echo tool".into()],
 674                cache: false,
 675                reasoning_details: None,
 676            },
 677            LanguageModelRequestMessage {
 678                role: Role::Assistant,
 679                content: vec![MessageContent::ToolUse(tool_use)],
 680                cache: false,
 681                reasoning_details: None,
 682            },
 683            LanguageModelRequestMessage {
 684                role: Role::User,
 685                content: vec![MessageContent::ToolResult(tool_result)],
 686                cache: true,
 687                reasoning_details: None,
 688            }
 689        ]
 690    );
 691}
 692
 693#[gpui::test]
 694#[cfg_attr(not(feature = "e2e"), ignore)]
 695async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 696    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 697
 698    // Test a tool call that's likely to complete *before* streaming stops.
 699    let events = thread
 700        .update(cx, |thread, cx| {
 701            thread.add_tool(EchoTool);
 702            thread.send(
 703                UserMessageId::new(),
 704                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 705                cx,
 706            )
 707        })
 708        .unwrap()
 709        .collect()
 710        .await;
 711    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 712
 713    // Test a tool calls that's likely to complete *after* streaming stops.
 714    let events = thread
 715        .update(cx, |thread, cx| {
 716            thread.remove_tool(&EchoTool::NAME);
 717            thread.add_tool(DelayTool);
 718            thread.send(
 719                UserMessageId::new(),
 720                [
 721                    "Now call the delay tool with 200ms.",
 722                    "When the timer goes off, then you echo the output of the tool.",
 723                ],
 724                cx,
 725            )
 726        })
 727        .unwrap()
 728        .collect()
 729        .await;
 730    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 731    thread.update(cx, |thread, _cx| {
 732        assert!(
 733            thread
 734                .last_received_or_pending_message()
 735                .unwrap()
 736                .as_agent_message()
 737                .unwrap()
 738                .content
 739                .iter()
 740                .any(|content| {
 741                    if let AgentMessageContent::Text(text) = content {
 742                        text.contains("Ding")
 743                    } else {
 744                        false
 745                    }
 746                }),
 747            "{}",
 748            thread.to_markdown()
 749        );
 750    });
 751}
 752
 753#[gpui::test]
 754#[cfg_attr(not(feature = "e2e"), ignore)]
 755async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 756    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 757
 758    // Test a tool call that's likely to complete *before* streaming stops.
 759    let mut events = thread
 760        .update(cx, |thread, cx| {
 761            thread.add_tool(WordListTool);
 762            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 763        })
 764        .unwrap();
 765
 766    let mut saw_partial_tool_use = false;
 767    while let Some(event) = events.next().await {
 768        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 769            thread.update(cx, |thread, _cx| {
 770                // Look for a tool use in the thread's last message
 771                let message = thread.last_received_or_pending_message().unwrap();
 772                let agent_message = message.as_agent_message().unwrap();
 773                let last_content = agent_message.content.last().unwrap();
 774                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 775                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 776                    if tool_call.status == acp::ToolCallStatus::Pending {
 777                        if !last_tool_use.is_input_complete
 778                            && last_tool_use.input.get("g").is_none()
 779                        {
 780                            saw_partial_tool_use = true;
 781                        }
 782                    } else {
 783                        last_tool_use
 784                            .input
 785                            .get("a")
 786                            .expect("'a' has streamed because input is now complete");
 787                        last_tool_use
 788                            .input
 789                            .get("g")
 790                            .expect("'g' has streamed because input is now complete");
 791                    }
 792                } else {
 793                    panic!("last content should be a tool use");
 794                }
 795            });
 796        }
 797    }
 798
 799    assert!(
 800        saw_partial_tool_use,
 801        "should see at least one partially streamed tool use in the history"
 802    );
 803}
 804
 805#[gpui::test]
 806async fn test_tool_authorization(cx: &mut TestAppContext) {
 807    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 808    let fake_model = model.as_fake();
 809
 810    let mut events = thread
 811        .update(cx, |thread, cx| {
 812            thread.add_tool(ToolRequiringPermission);
 813            thread.send(UserMessageId::new(), ["abc"], cx)
 814        })
 815        .unwrap();
 816    cx.run_until_parked();
 817    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 818        LanguageModelToolUse {
 819            id: "tool_id_1".into(),
 820            name: ToolRequiringPermission::NAME.into(),
 821            raw_input: "{}".into(),
 822            input: json!({}),
 823            is_input_complete: true,
 824            thought_signature: None,
 825        },
 826    ));
 827    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 828        LanguageModelToolUse {
 829            id: "tool_id_2".into(),
 830            name: ToolRequiringPermission::NAME.into(),
 831            raw_input: "{}".into(),
 832            input: json!({}),
 833            is_input_complete: true,
 834            thought_signature: None,
 835        },
 836    ));
 837    fake_model.end_last_completion_stream();
 838    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 839    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 840
 841    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 842    tool_call_auth_1
 843        .response
 844        .send(acp_thread::SelectedPermissionOutcome::new(
 845            acp::PermissionOptionId::new("allow"),
 846            acp::PermissionOptionKind::AllowOnce,
 847        ))
 848        .unwrap();
 849    cx.run_until_parked();
 850
 851    // Reject the second - send "deny" option_id directly since Deny is now a button
 852    tool_call_auth_2
 853        .response
 854        .send(acp_thread::SelectedPermissionOutcome::new(
 855            acp::PermissionOptionId::new("deny"),
 856            acp::PermissionOptionKind::RejectOnce,
 857        ))
 858        .unwrap();
 859    cx.run_until_parked();
 860
 861    let completion = fake_model.pending_completions().pop().unwrap();
 862    let message = completion.messages.last().unwrap();
 863    assert_eq!(
 864        message.content,
 865        vec![
 866            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 867                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 868                tool_name: ToolRequiringPermission::NAME.into(),
 869                is_error: false,
 870                content: "Allowed".into(),
 871                output: Some("Allowed".into())
 872            }),
 873            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 874                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 875                tool_name: ToolRequiringPermission::NAME.into(),
 876                is_error: true,
 877                content: "Permission to run tool denied by user".into(),
 878                output: Some("Permission to run tool denied by user".into())
 879            })
 880        ]
 881    );
 882
 883    // Simulate yet another tool call.
 884    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 885        LanguageModelToolUse {
 886            id: "tool_id_3".into(),
 887            name: ToolRequiringPermission::NAME.into(),
 888            raw_input: "{}".into(),
 889            input: json!({}),
 890            is_input_complete: true,
 891            thought_signature: None,
 892        },
 893    ));
 894    fake_model.end_last_completion_stream();
 895
 896    // Respond by always allowing tools - send transformed option_id
 897    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 898    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 899    tool_call_auth_3
 900        .response
 901        .send(acp_thread::SelectedPermissionOutcome::new(
 902            acp::PermissionOptionId::new("always_allow:tool_requiring_permission"),
 903            acp::PermissionOptionKind::AllowAlways,
 904        ))
 905        .unwrap();
 906    cx.run_until_parked();
 907    let completion = fake_model.pending_completions().pop().unwrap();
 908    let message = completion.messages.last().unwrap();
 909    assert_eq!(
 910        message.content,
 911        vec![language_model::MessageContent::ToolResult(
 912            LanguageModelToolResult {
 913                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 914                tool_name: ToolRequiringPermission::NAME.into(),
 915                is_error: false,
 916                content: "Allowed".into(),
 917                output: Some("Allowed".into())
 918            }
 919        )]
 920    );
 921
 922    // Simulate a final tool call, ensuring we don't trigger authorization.
 923    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 924        LanguageModelToolUse {
 925            id: "tool_id_4".into(),
 926            name: ToolRequiringPermission::NAME.into(),
 927            raw_input: "{}".into(),
 928            input: json!({}),
 929            is_input_complete: true,
 930            thought_signature: None,
 931        },
 932    ));
 933    fake_model.end_last_completion_stream();
 934    cx.run_until_parked();
 935    let completion = fake_model.pending_completions().pop().unwrap();
 936    let message = completion.messages.last().unwrap();
 937    assert_eq!(
 938        message.content,
 939        vec![language_model::MessageContent::ToolResult(
 940            LanguageModelToolResult {
 941                tool_use_id: "tool_id_4".into(),
 942                tool_name: ToolRequiringPermission::NAME.into(),
 943                is_error: false,
 944                content: "Allowed".into(),
 945                output: Some("Allowed".into())
 946            }
 947        )]
 948    );
 949}
 950
 951#[gpui::test]
 952async fn test_tool_hallucination(cx: &mut TestAppContext) {
 953    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 954    let fake_model = model.as_fake();
 955
 956    let mut events = thread
 957        .update(cx, |thread, cx| {
 958            thread.send(UserMessageId::new(), ["abc"], cx)
 959        })
 960        .unwrap();
 961    cx.run_until_parked();
 962    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 963        LanguageModelToolUse {
 964            id: "tool_id_1".into(),
 965            name: "nonexistent_tool".into(),
 966            raw_input: "{}".into(),
 967            input: json!({}),
 968            is_input_complete: true,
 969            thought_signature: None,
 970        },
 971    ));
 972    fake_model.end_last_completion_stream();
 973
 974    let tool_call = expect_tool_call(&mut events).await;
 975    assert_eq!(tool_call.title, "nonexistent_tool");
 976    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 977    let update = expect_tool_call_update_fields(&mut events).await;
 978    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 979}
 980
 981async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 982    let event = events
 983        .next()
 984        .await
 985        .expect("no tool call authorization event received")
 986        .unwrap();
 987    match event {
 988        ThreadEvent::ToolCall(tool_call) => tool_call,
 989        event => {
 990            panic!("Unexpected event {event:?}");
 991        }
 992    }
 993}
 994
 995async fn expect_tool_call_update_fields(
 996    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 997) -> acp::ToolCallUpdate {
 998    let event = events
 999        .next()
1000        .await
1001        .expect("no tool call authorization event received")
1002        .unwrap();
1003    match event {
1004        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
1005        event => {
1006            panic!("Unexpected event {event:?}");
1007        }
1008    }
1009}
1010
1011async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1012    let event = events
1013        .next()
1014        .await
1015        .expect("no plan event received")
1016        .unwrap();
1017    match event {
1018        ThreadEvent::Plan(plan) => plan,
1019        event => {
1020            panic!("Unexpected event {event:?}");
1021        }
1022    }
1023}
1024
1025async fn next_tool_call_authorization(
1026    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1027) -> ToolCallAuthorization {
1028    loop {
1029        let event = events
1030            .next()
1031            .await
1032            .expect("no tool call authorization event received")
1033            .unwrap();
1034        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1035            let permission_kinds = tool_call_authorization
1036                .options
1037                .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1038                .map(|option| option.kind);
1039            let allow_once = tool_call_authorization
1040                .options
1041                .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1042                .map(|option| option.kind);
1043
1044            assert_eq!(
1045                permission_kinds,
1046                Some(acp::PermissionOptionKind::AllowAlways)
1047            );
1048            assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1049            return tool_call_authorization;
1050        }
1051    }
1052}
1053
1054#[test]
1055fn test_permission_options_terminal_with_pattern() {
1056    let permission_options = ToolPermissionContext::new(
1057        TerminalTool::NAME,
1058        vec!["cargo build --release".to_string()],
1059    )
1060    .build_permission_options();
1061
1062    let PermissionOptions::Dropdown(choices) = permission_options else {
1063        panic!("Expected dropdown permission options");
1064    };
1065
1066    assert_eq!(choices.len(), 3);
1067    let labels: Vec<&str> = choices
1068        .iter()
1069        .map(|choice| choice.allow.name.as_ref())
1070        .collect();
1071    assert!(labels.contains(&"Always for terminal"));
1072    assert!(labels.contains(&"Always for `cargo build` commands"));
1073    assert!(labels.contains(&"Only this time"));
1074}
1075
1076#[test]
1077fn test_permission_options_terminal_command_with_flag_second_token() {
1078    let permission_options =
1079        ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1080            .build_permission_options();
1081
1082    let PermissionOptions::Dropdown(choices) = permission_options else {
1083        panic!("Expected dropdown permission options");
1084    };
1085
1086    assert_eq!(choices.len(), 3);
1087    let labels: Vec<&str> = choices
1088        .iter()
1089        .map(|choice| choice.allow.name.as_ref())
1090        .collect();
1091    assert!(labels.contains(&"Always for terminal"));
1092    assert!(labels.contains(&"Always for `ls` commands"));
1093    assert!(labels.contains(&"Only this time"));
1094}
1095
1096#[test]
1097fn test_permission_options_terminal_single_word_command() {
1098    let permission_options =
1099        ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1100            .build_permission_options();
1101
1102    let PermissionOptions::Dropdown(choices) = permission_options else {
1103        panic!("Expected dropdown permission options");
1104    };
1105
1106    assert_eq!(choices.len(), 3);
1107    let labels: Vec<&str> = choices
1108        .iter()
1109        .map(|choice| choice.allow.name.as_ref())
1110        .collect();
1111    assert!(labels.contains(&"Always for terminal"));
1112    assert!(labels.contains(&"Always for `whoami` commands"));
1113    assert!(labels.contains(&"Only this time"));
1114}
1115
1116#[test]
1117fn test_permission_options_edit_file_with_path_pattern() {
1118    let permission_options =
1119        ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1120            .build_permission_options();
1121
1122    let PermissionOptions::Dropdown(choices) = permission_options else {
1123        panic!("Expected dropdown permission options");
1124    };
1125
1126    let labels: Vec<&str> = choices
1127        .iter()
1128        .map(|choice| choice.allow.name.as_ref())
1129        .collect();
1130    assert!(labels.contains(&"Always for edit file"));
1131    assert!(labels.contains(&"Always for `src/`"));
1132}
1133
1134#[test]
1135fn test_permission_options_fetch_with_domain_pattern() {
1136    let permission_options =
1137        ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1138            .build_permission_options();
1139
1140    let PermissionOptions::Dropdown(choices) = permission_options else {
1141        panic!("Expected dropdown permission options");
1142    };
1143
1144    let labels: Vec<&str> = choices
1145        .iter()
1146        .map(|choice| choice.allow.name.as_ref())
1147        .collect();
1148    assert!(labels.contains(&"Always for fetch"));
1149    assert!(labels.contains(&"Always for `docs.rs`"));
1150}
1151
1152#[test]
1153fn test_permission_options_without_pattern() {
1154    let permission_options = ToolPermissionContext::new(
1155        TerminalTool::NAME,
1156        vec!["./deploy.sh --production".to_string()],
1157    )
1158    .build_permission_options();
1159
1160    let PermissionOptions::Dropdown(choices) = permission_options else {
1161        panic!("Expected dropdown permission options");
1162    };
1163
1164    assert_eq!(choices.len(), 2);
1165    let labels: Vec<&str> = choices
1166        .iter()
1167        .map(|choice| choice.allow.name.as_ref())
1168        .collect();
1169    assert!(labels.contains(&"Always for terminal"));
1170    assert!(labels.contains(&"Only this time"));
1171    assert!(!labels.iter().any(|label| label.contains("commands")));
1172}
1173
1174#[test]
1175fn test_permission_options_symlink_target_are_flat_once_only() {
1176    let permission_options =
1177        ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1178            .build_permission_options();
1179
1180    let PermissionOptions::Flat(options) = permission_options else {
1181        panic!("Expected flat permission options for symlink target authorization");
1182    };
1183
1184    assert_eq!(options.len(), 2);
1185    assert!(options.iter().any(|option| {
1186        option.option_id.0.as_ref() == "allow"
1187            && option.kind == acp::PermissionOptionKind::AllowOnce
1188    }));
1189    assert!(options.iter().any(|option| {
1190        option.option_id.0.as_ref() == "deny"
1191            && option.kind == acp::PermissionOptionKind::RejectOnce
1192    }));
1193}
1194
1195#[test]
1196fn test_permission_option_ids_for_terminal() {
1197    let permission_options = ToolPermissionContext::new(
1198        TerminalTool::NAME,
1199        vec!["cargo build --release".to_string()],
1200    )
1201    .build_permission_options();
1202
1203    let PermissionOptions::Dropdown(choices) = permission_options else {
1204        panic!("Expected dropdown permission options");
1205    };
1206
1207    // Expect 3 choices: always-tool, always-pattern, once
1208    assert_eq!(choices.len(), 3);
1209
1210    // First two choices both use the tool-level option IDs
1211    assert_eq!(
1212        choices[0].allow.option_id.0.as_ref(),
1213        "always_allow:terminal"
1214    );
1215    assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1216    assert!(choices[0].sub_patterns.is_empty());
1217
1218    assert_eq!(
1219        choices[1].allow.option_id.0.as_ref(),
1220        "always_allow:terminal"
1221    );
1222    assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1223    assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1224
1225    // Third choice is the one-time allow/deny
1226    assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1227    assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1228    assert!(choices[2].sub_patterns.is_empty());
1229}
1230
1231#[test]
1232fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1233    let permission_options = ToolPermissionContext::new(
1234        TerminalTool::NAME,
1235        vec!["cargo test 2>&1 | tail".to_string()],
1236    )
1237    .build_permission_options();
1238
1239    let PermissionOptions::DropdownWithPatterns {
1240        choices,
1241        patterns,
1242        tool_name,
1243    } = permission_options
1244    else {
1245        panic!("Expected DropdownWithPatterns permission options for pipeline command");
1246    };
1247
1248    assert_eq!(tool_name, TerminalTool::NAME);
1249
1250    // Should have "Always for terminal" and "Only this time" choices
1251    assert_eq!(choices.len(), 2);
1252    let labels: Vec<&str> = choices
1253        .iter()
1254        .map(|choice| choice.allow.name.as_ref())
1255        .collect();
1256    assert!(labels.contains(&"Always for terminal"));
1257    assert!(labels.contains(&"Only this time"));
1258
1259    // Should have per-command patterns for "cargo test" and "tail"
1260    assert_eq!(patterns.len(), 2);
1261    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1262    assert!(pattern_names.contains(&"cargo test"));
1263    assert!(pattern_names.contains(&"tail"));
1264
1265    // Verify patterns are valid regex patterns
1266    let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1267    assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1268    assert!(regex_patterns.contains(&"^tail\\b"));
1269}
1270
1271#[test]
1272fn test_permission_options_terminal_pipeline_with_chaining() {
1273    let permission_options = ToolPermissionContext::new(
1274        TerminalTool::NAME,
1275        vec!["npm install && npm test | tail".to_string()],
1276    )
1277    .build_permission_options();
1278
1279    let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1280        panic!("Expected DropdownWithPatterns for chained pipeline command");
1281    };
1282
1283    // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1284    assert_eq!(patterns.len(), 3);
1285    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1286    assert!(pattern_names.contains(&"npm install"));
1287    assert!(pattern_names.contains(&"npm test"));
1288    assert!(pattern_names.contains(&"tail"));
1289}
1290
1291#[gpui::test]
1292#[cfg_attr(not(feature = "e2e"), ignore)]
1293async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1294    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1295
1296    // Test concurrent tool calls with different delay times
1297    let events = thread
1298        .update(cx, |thread, cx| {
1299            thread.add_tool(DelayTool);
1300            thread.send(
1301                UserMessageId::new(),
1302                [
1303                    "Call the delay tool twice in the same message.",
1304                    "Once with 100ms. Once with 300ms.",
1305                    "When both timers are complete, describe the outputs.",
1306                ],
1307                cx,
1308            )
1309        })
1310        .unwrap()
1311        .collect()
1312        .await;
1313
1314    let stop_reasons = stop_events(events);
1315    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1316
1317    thread.update(cx, |thread, _cx| {
1318        let last_message = thread.last_received_or_pending_message().unwrap();
1319        let agent_message = last_message.as_agent_message().unwrap();
1320        let text = agent_message
1321            .content
1322            .iter()
1323            .filter_map(|content| {
1324                if let AgentMessageContent::Text(text) = content {
1325                    Some(text.as_str())
1326                } else {
1327                    None
1328                }
1329            })
1330            .collect::<String>();
1331
1332        assert!(text.contains("Ding"));
1333    });
1334}
1335
1336#[gpui::test]
1337async fn test_profiles(cx: &mut TestAppContext) {
1338    let ThreadTest {
1339        model, thread, fs, ..
1340    } = setup(cx, TestModel::Fake).await;
1341    let fake_model = model.as_fake();
1342
1343    thread.update(cx, |thread, _cx| {
1344        thread.add_tool(DelayTool);
1345        thread.add_tool(EchoTool);
1346        thread.add_tool(InfiniteTool);
1347    });
1348
1349    // Override profiles and wait for settings to be loaded.
1350    fs.insert_file(
1351        paths::settings_file(),
1352        json!({
1353            "agent": {
1354                "profiles": {
1355                    "test-1": {
1356                        "name": "Test Profile 1",
1357                        "tools": {
1358                            EchoTool::NAME: true,
1359                            DelayTool::NAME: true,
1360                        }
1361                    },
1362                    "test-2": {
1363                        "name": "Test Profile 2",
1364                        "tools": {
1365                            InfiniteTool::NAME: true,
1366                        }
1367                    }
1368                }
1369            }
1370        })
1371        .to_string()
1372        .into_bytes(),
1373    )
1374    .await;
1375    cx.run_until_parked();
1376
1377    // Test that test-1 profile (default) has echo and delay tools
1378    thread
1379        .update(cx, |thread, cx| {
1380            thread.set_profile(AgentProfileId("test-1".into()), cx);
1381            thread.send(UserMessageId::new(), ["test"], cx)
1382        })
1383        .unwrap();
1384    cx.run_until_parked();
1385
1386    let mut pending_completions = fake_model.pending_completions();
1387    assert_eq!(pending_completions.len(), 1);
1388    let completion = pending_completions.pop().unwrap();
1389    let tool_names: Vec<String> = completion
1390        .tools
1391        .iter()
1392        .map(|tool| tool.name.clone())
1393        .collect();
1394    assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1395    fake_model.end_last_completion_stream();
1396
1397    // Switch to test-2 profile, and verify that it has only the infinite tool.
1398    thread
1399        .update(cx, |thread, cx| {
1400            thread.set_profile(AgentProfileId("test-2".into()), cx);
1401            thread.send(UserMessageId::new(), ["test2"], cx)
1402        })
1403        .unwrap();
1404    cx.run_until_parked();
1405    let mut pending_completions = fake_model.pending_completions();
1406    assert_eq!(pending_completions.len(), 1);
1407    let completion = pending_completions.pop().unwrap();
1408    let tool_names: Vec<String> = completion
1409        .tools
1410        .iter()
1411        .map(|tool| tool.name.clone())
1412        .collect();
1413    assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1414}
1415
1416#[gpui::test]
1417async fn test_mcp_tools(cx: &mut TestAppContext) {
1418    let ThreadTest {
1419        model,
1420        thread,
1421        context_server_store,
1422        fs,
1423        ..
1424    } = setup(cx, TestModel::Fake).await;
1425    let fake_model = model.as_fake();
1426
1427    // Override profiles and wait for settings to be loaded.
1428    fs.insert_file(
1429        paths::settings_file(),
1430        json!({
1431            "agent": {
1432                "tool_permissions": { "default": "allow" },
1433                "profiles": {
1434                    "test": {
1435                        "name": "Test Profile",
1436                        "enable_all_context_servers": true,
1437                        "tools": {
1438                            EchoTool::NAME: true,
1439                        }
1440                    },
1441                }
1442            }
1443        })
1444        .to_string()
1445        .into_bytes(),
1446    )
1447    .await;
1448    cx.run_until_parked();
1449    thread.update(cx, |thread, cx| {
1450        thread.set_profile(AgentProfileId("test".into()), cx)
1451    });
1452
1453    let mut mcp_tool_calls = setup_context_server(
1454        "test_server",
1455        vec![context_server::types::Tool {
1456            name: "echo".into(),
1457            description: None,
1458            input_schema: serde_json::to_value(EchoTool::input_schema(
1459                LanguageModelToolSchemaFormat::JsonSchema,
1460            ))
1461            .unwrap(),
1462            output_schema: None,
1463            annotations: None,
1464        }],
1465        &context_server_store,
1466        cx,
1467    );
1468
1469    let events = thread.update(cx, |thread, cx| {
1470        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1471    });
1472    cx.run_until_parked();
1473
1474    // Simulate the model calling the MCP tool.
1475    let completion = fake_model.pending_completions().pop().unwrap();
1476    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1477    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1478        LanguageModelToolUse {
1479            id: "tool_1".into(),
1480            name: "echo".into(),
1481            raw_input: json!({"text": "test"}).to_string(),
1482            input: json!({"text": "test"}),
1483            is_input_complete: true,
1484            thought_signature: None,
1485        },
1486    ));
1487    fake_model.end_last_completion_stream();
1488    cx.run_until_parked();
1489
1490    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1491    assert_eq!(tool_call_params.name, "echo");
1492    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1493    tool_call_response
1494        .send(context_server::types::CallToolResponse {
1495            content: vec![context_server::types::ToolResponseContent::Text {
1496                text: "test".into(),
1497            }],
1498            is_error: None,
1499            meta: None,
1500            structured_content: None,
1501        })
1502        .unwrap();
1503    cx.run_until_parked();
1504
1505    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1506    fake_model.send_last_completion_stream_text_chunk("Done!");
1507    fake_model.end_last_completion_stream();
1508    events.collect::<Vec<_>>().await;
1509
1510    // Send again after adding the echo tool, ensuring the name collision is resolved.
1511    let events = thread.update(cx, |thread, cx| {
1512        thread.add_tool(EchoTool);
1513        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1514    });
1515    cx.run_until_parked();
1516    let completion = fake_model.pending_completions().pop().unwrap();
1517    assert_eq!(
1518        tool_names_for_completion(&completion),
1519        vec!["echo", "test_server_echo"]
1520    );
1521    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1522        LanguageModelToolUse {
1523            id: "tool_2".into(),
1524            name: "test_server_echo".into(),
1525            raw_input: json!({"text": "mcp"}).to_string(),
1526            input: json!({"text": "mcp"}),
1527            is_input_complete: true,
1528            thought_signature: None,
1529        },
1530    ));
1531    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1532        LanguageModelToolUse {
1533            id: "tool_3".into(),
1534            name: "echo".into(),
1535            raw_input: json!({"text": "native"}).to_string(),
1536            input: json!({"text": "native"}),
1537            is_input_complete: true,
1538            thought_signature: None,
1539        },
1540    ));
1541    fake_model.end_last_completion_stream();
1542    cx.run_until_parked();
1543
1544    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1545    assert_eq!(tool_call_params.name, "echo");
1546    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1547    tool_call_response
1548        .send(context_server::types::CallToolResponse {
1549            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1550            is_error: None,
1551            meta: None,
1552            structured_content: None,
1553        })
1554        .unwrap();
1555    cx.run_until_parked();
1556
1557    // Ensure the tool results were inserted with the correct names.
1558    let completion = fake_model.pending_completions().pop().unwrap();
1559    assert_eq!(
1560        completion.messages.last().unwrap().content,
1561        vec![
1562            MessageContent::ToolResult(LanguageModelToolResult {
1563                tool_use_id: "tool_3".into(),
1564                tool_name: "echo".into(),
1565                is_error: false,
1566                content: "native".into(),
1567                output: Some("native".into()),
1568            },),
1569            MessageContent::ToolResult(LanguageModelToolResult {
1570                tool_use_id: "tool_2".into(),
1571                tool_name: "test_server_echo".into(),
1572                is_error: false,
1573                content: "mcp".into(),
1574                output: Some("mcp".into()),
1575            },),
1576        ]
1577    );
1578    fake_model.end_last_completion_stream();
1579    events.collect::<Vec<_>>().await;
1580}
1581
1582#[gpui::test]
1583async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1584    let ThreadTest {
1585        model,
1586        thread,
1587        context_server_store,
1588        fs,
1589        ..
1590    } = setup(cx, TestModel::Fake).await;
1591    let fake_model = model.as_fake();
1592
1593    // Setup settings to allow MCP tools
1594    fs.insert_file(
1595        paths::settings_file(),
1596        json!({
1597            "agent": {
1598                "always_allow_tool_actions": true,
1599                "profiles": {
1600                    "test": {
1601                        "name": "Test Profile",
1602                        "enable_all_context_servers": true,
1603                        "tools": {}
1604                    },
1605                }
1606            }
1607        })
1608        .to_string()
1609        .into_bytes(),
1610    )
1611    .await;
1612    cx.run_until_parked();
1613    thread.update(cx, |thread, cx| {
1614        thread.set_profile(AgentProfileId("test".into()), cx)
1615    });
1616
1617    // Setup a context server with a tool
1618    let mut mcp_tool_calls = setup_context_server(
1619        "github_server",
1620        vec![context_server::types::Tool {
1621            name: "issue_read".into(),
1622            description: Some("Read a GitHub issue".into()),
1623            input_schema: json!({
1624                "type": "object",
1625                "properties": {
1626                    "issue_url": { "type": "string" }
1627                }
1628            }),
1629            output_schema: None,
1630            annotations: None,
1631        }],
1632        &context_server_store,
1633        cx,
1634    );
1635
1636    // Send a message and have the model call the MCP tool
1637    let events = thread.update(cx, |thread, cx| {
1638        thread
1639            .send(UserMessageId::new(), ["Read issue #47404"], cx)
1640            .unwrap()
1641    });
1642    cx.run_until_parked();
1643
1644    // Verify the MCP tool is available to the model
1645    let completion = fake_model.pending_completions().pop().unwrap();
1646    assert_eq!(
1647        tool_names_for_completion(&completion),
1648        vec!["issue_read"],
1649        "MCP tool should be available"
1650    );
1651
1652    // Simulate the model calling the MCP tool
1653    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1654        LanguageModelToolUse {
1655            id: "tool_1".into(),
1656            name: "issue_read".into(),
1657            raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1658                .to_string(),
1659            input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1660            is_input_complete: true,
1661            thought_signature: None,
1662        },
1663    ));
1664    fake_model.end_last_completion_stream();
1665    cx.run_until_parked();
1666
1667    // The MCP server receives the tool call and responds with content
1668    let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1669    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1670    assert_eq!(tool_call_params.name, "issue_read");
1671    tool_call_response
1672        .send(context_server::types::CallToolResponse {
1673            content: vec![context_server::types::ToolResponseContent::Text {
1674                text: expected_tool_output.into(),
1675            }],
1676            is_error: None,
1677            meta: None,
1678            structured_content: None,
1679        })
1680        .unwrap();
1681    cx.run_until_parked();
1682
1683    // After tool completes, the model continues with a new completion request
1684    // that includes the tool results. We need to respond to this.
1685    let _completion = fake_model.pending_completions().pop().unwrap();
1686    fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1687    fake_model
1688        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1689    fake_model.end_last_completion_stream();
1690    events.collect::<Vec<_>>().await;
1691
1692    // Verify the tool result is stored in the thread by checking the markdown output.
1693    // The tool result is in the first assistant message (not the last one, which is
1694    // the model's response after the tool completed).
1695    thread.update(cx, |thread, _cx| {
1696        let markdown = thread.to_markdown();
1697        assert!(
1698            markdown.contains("**Tool Result**: issue_read"),
1699            "Thread should contain tool result header"
1700        );
1701        assert!(
1702            markdown.contains(expected_tool_output),
1703            "Thread should contain tool output: {}",
1704            expected_tool_output
1705        );
1706    });
1707
1708    // Simulate app restart: disconnect the MCP server.
1709    // After restart, the MCP server won't be connected yet when the thread is replayed.
1710    context_server_store.update(cx, |store, cx| {
1711        let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1712    });
1713    cx.run_until_parked();
1714
1715    // Replay the thread (this is what happens when loading a saved thread)
1716    let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1717
1718    let mut found_tool_call = None;
1719    let mut found_tool_call_update_with_output = None;
1720
1721    while let Some(event) = replay_events.next().await {
1722        let event = event.unwrap();
1723        match &event {
1724            ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1725                found_tool_call = Some(tc.clone());
1726            }
1727            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1728                if update.tool_call_id.to_string() == "tool_1" =>
1729            {
1730                if update.fields.raw_output.is_some() {
1731                    found_tool_call_update_with_output = Some(update.clone());
1732                }
1733            }
1734            _ => {}
1735        }
1736    }
1737
1738    // The tool call should be found
1739    assert!(
1740        found_tool_call.is_some(),
1741        "Tool call should be emitted during replay"
1742    );
1743
1744    assert!(
1745        found_tool_call_update_with_output.is_some(),
1746        "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1747    );
1748
1749    let update = found_tool_call_update_with_output.unwrap();
1750    assert_eq!(
1751        update.fields.raw_output,
1752        Some(expected_tool_output.into()),
1753        "raw_output should contain the saved tool result"
1754    );
1755
1756    // Also verify the status is correct (completed, not failed)
1757    assert_eq!(
1758        update.fields.status,
1759        Some(acp::ToolCallStatus::Completed),
1760        "Tool call status should reflect the original completion status"
1761    );
1762}
1763
1764#[gpui::test]
1765async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1766    let ThreadTest {
1767        model,
1768        thread,
1769        context_server_store,
1770        fs,
1771        ..
1772    } = setup(cx, TestModel::Fake).await;
1773    let fake_model = model.as_fake();
1774
1775    // Set up a profile with all tools enabled
1776    fs.insert_file(
1777        paths::settings_file(),
1778        json!({
1779            "agent": {
1780                "profiles": {
1781                    "test": {
1782                        "name": "Test Profile",
1783                        "enable_all_context_servers": true,
1784                        "tools": {
1785                            EchoTool::NAME: true,
1786                            DelayTool::NAME: true,
1787                            WordListTool::NAME: true,
1788                            ToolRequiringPermission::NAME: true,
1789                            InfiniteTool::NAME: true,
1790                        }
1791                    },
1792                }
1793            }
1794        })
1795        .to_string()
1796        .into_bytes(),
1797    )
1798    .await;
1799    cx.run_until_parked();
1800
1801    thread.update(cx, |thread, cx| {
1802        thread.set_profile(AgentProfileId("test".into()), cx);
1803        thread.add_tool(EchoTool);
1804        thread.add_tool(DelayTool);
1805        thread.add_tool(WordListTool);
1806        thread.add_tool(ToolRequiringPermission);
1807        thread.add_tool(InfiniteTool);
1808    });
1809
1810    // Set up multiple context servers with some overlapping tool names
1811    let _server1_calls = setup_context_server(
1812        "xxx",
1813        vec![
1814            context_server::types::Tool {
1815                name: "echo".into(), // Conflicts with native EchoTool
1816                description: None,
1817                input_schema: serde_json::to_value(EchoTool::input_schema(
1818                    LanguageModelToolSchemaFormat::JsonSchema,
1819                ))
1820                .unwrap(),
1821                output_schema: None,
1822                annotations: None,
1823            },
1824            context_server::types::Tool {
1825                name: "unique_tool_1".into(),
1826                description: None,
1827                input_schema: json!({"type": "object", "properties": {}}),
1828                output_schema: None,
1829                annotations: None,
1830            },
1831        ],
1832        &context_server_store,
1833        cx,
1834    );
1835
1836    let _server2_calls = setup_context_server(
1837        "yyy",
1838        vec![
1839            context_server::types::Tool {
1840                name: "echo".into(), // Also conflicts with native EchoTool
1841                description: None,
1842                input_schema: serde_json::to_value(EchoTool::input_schema(
1843                    LanguageModelToolSchemaFormat::JsonSchema,
1844                ))
1845                .unwrap(),
1846                output_schema: None,
1847                annotations: None,
1848            },
1849            context_server::types::Tool {
1850                name: "unique_tool_2".into(),
1851                description: None,
1852                input_schema: json!({"type": "object", "properties": {}}),
1853                output_schema: None,
1854                annotations: None,
1855            },
1856            context_server::types::Tool {
1857                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1858                description: None,
1859                input_schema: json!({"type": "object", "properties": {}}),
1860                output_schema: None,
1861                annotations: None,
1862            },
1863            context_server::types::Tool {
1864                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1865                description: None,
1866                input_schema: json!({"type": "object", "properties": {}}),
1867                output_schema: None,
1868                annotations: None,
1869            },
1870        ],
1871        &context_server_store,
1872        cx,
1873    );
1874    let _server3_calls = setup_context_server(
1875        "zzz",
1876        vec![
1877            context_server::types::Tool {
1878                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1879                description: None,
1880                input_schema: json!({"type": "object", "properties": {}}),
1881                output_schema: None,
1882                annotations: None,
1883            },
1884            context_server::types::Tool {
1885                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1886                description: None,
1887                input_schema: json!({"type": "object", "properties": {}}),
1888                output_schema: None,
1889                annotations: None,
1890            },
1891            context_server::types::Tool {
1892                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1893                description: None,
1894                input_schema: json!({"type": "object", "properties": {}}),
1895                output_schema: None,
1896                annotations: None,
1897            },
1898        ],
1899        &context_server_store,
1900        cx,
1901    );
1902
1903    // Server with spaces in name - tests snake_case conversion for API compatibility
1904    let _server4_calls = setup_context_server(
1905        "Azure DevOps",
1906        vec![context_server::types::Tool {
1907            name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1908            description: None,
1909            input_schema: serde_json::to_value(EchoTool::input_schema(
1910                LanguageModelToolSchemaFormat::JsonSchema,
1911            ))
1912            .unwrap(),
1913            output_schema: None,
1914            annotations: None,
1915        }],
1916        &context_server_store,
1917        cx,
1918    );
1919
1920    thread
1921        .update(cx, |thread, cx| {
1922            thread.send(UserMessageId::new(), ["Go"], cx)
1923        })
1924        .unwrap();
1925    cx.run_until_parked();
1926    let completion = fake_model.pending_completions().pop().unwrap();
1927    assert_eq!(
1928        tool_names_for_completion(&completion),
1929        vec![
1930            "azure_dev_ops_echo",
1931            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1932            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1933            "delay",
1934            "echo",
1935            "infinite",
1936            "tool_requiring_permission",
1937            "unique_tool_1",
1938            "unique_tool_2",
1939            "word_list",
1940            "xxx_echo",
1941            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1942            "yyy_echo",
1943            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1944        ]
1945    );
1946}
1947
1948#[gpui::test]
1949#[cfg_attr(not(feature = "e2e"), ignore)]
1950async fn test_cancellation(cx: &mut TestAppContext) {
1951    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1952
1953    let mut events = thread
1954        .update(cx, |thread, cx| {
1955            thread.add_tool(InfiniteTool);
1956            thread.add_tool(EchoTool);
1957            thread.send(
1958                UserMessageId::new(),
1959                ["Call the echo tool, then call the infinite tool, then explain their output"],
1960                cx,
1961            )
1962        })
1963        .unwrap();
1964
1965    // Wait until both tools are called.
1966    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1967    let mut echo_id = None;
1968    let mut echo_completed = false;
1969    while let Some(event) = events.next().await {
1970        match event.unwrap() {
1971            ThreadEvent::ToolCall(tool_call) => {
1972                assert_eq!(tool_call.title, expected_tools.remove(0));
1973                if tool_call.title == "Echo" {
1974                    echo_id = Some(tool_call.tool_call_id);
1975                }
1976            }
1977            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1978                acp::ToolCallUpdate {
1979                    tool_call_id,
1980                    fields:
1981                        acp::ToolCallUpdateFields {
1982                            status: Some(acp::ToolCallStatus::Completed),
1983                            ..
1984                        },
1985                    ..
1986                },
1987            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1988                echo_completed = true;
1989            }
1990            _ => {}
1991        }
1992
1993        if expected_tools.is_empty() && echo_completed {
1994            break;
1995        }
1996    }
1997
1998    // Cancel the current send and ensure that the event stream is closed, even
1999    // if one of the tools is still running.
2000    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2001    let events = events.collect::<Vec<_>>().await;
2002    let last_event = events.last();
2003    assert!(
2004        matches!(
2005            last_event,
2006            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2007        ),
2008        "unexpected event {last_event:?}"
2009    );
2010
2011    // Ensure we can still send a new message after cancellation.
2012    let events = thread
2013        .update(cx, |thread, cx| {
2014            thread.send(
2015                UserMessageId::new(),
2016                ["Testing: reply with 'Hello' then stop."],
2017                cx,
2018            )
2019        })
2020        .unwrap()
2021        .collect::<Vec<_>>()
2022        .await;
2023    thread.update(cx, |thread, _cx| {
2024        let message = thread.last_received_or_pending_message().unwrap();
2025        let agent_message = message.as_agent_message().unwrap();
2026        assert_eq!(
2027            agent_message.content,
2028            vec![AgentMessageContent::Text("Hello".to_string())]
2029        );
2030    });
2031    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2032}
2033
2034#[gpui::test]
2035async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2036    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2037    always_allow_tools(cx);
2038    let fake_model = model.as_fake();
2039
2040    let environment = Rc::new(cx.update(|cx| {
2041        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2042    }));
2043    let handle = environment.terminal_handle.clone().unwrap();
2044
2045    let mut events = thread
2046        .update(cx, |thread, cx| {
2047            thread.add_tool(crate::TerminalTool::new(
2048                thread.project().clone(),
2049                environment,
2050            ));
2051            thread.send(UserMessageId::new(), ["run a command"], cx)
2052        })
2053        .unwrap();
2054
2055    cx.run_until_parked();
2056
2057    // Simulate the model calling the terminal tool
2058    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2059        LanguageModelToolUse {
2060            id: "terminal_tool_1".into(),
2061            name: TerminalTool::NAME.into(),
2062            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2063            input: json!({"command": "sleep 1000", "cd": "."}),
2064            is_input_complete: true,
2065            thought_signature: None,
2066        },
2067    ));
2068    fake_model.end_last_completion_stream();
2069
2070    // Wait for the terminal tool to start running
2071    wait_for_terminal_tool_started(&mut events, cx).await;
2072
2073    // Cancel the thread while the terminal is running
2074    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2075
2076    // Collect remaining events, driving the executor to let cancellation complete
2077    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2078
2079    // Verify the terminal was killed
2080    assert!(
2081        handle.was_killed(),
2082        "expected terminal handle to be killed on cancellation"
2083    );
2084
2085    // Verify we got a cancellation stop event
2086    assert_eq!(
2087        stop_events(remaining_events),
2088        vec![acp::StopReason::Cancelled],
2089    );
2090
2091    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2092    thread.update(cx, |thread, _cx| {
2093        let message = thread.last_received_or_pending_message().unwrap();
2094        let agent_message = message.as_agent_message().unwrap();
2095
2096        let tool_use = agent_message
2097            .content
2098            .iter()
2099            .find_map(|content| match content {
2100                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2101                _ => None,
2102            })
2103            .expect("expected tool use in agent message");
2104
2105        let tool_result = agent_message
2106            .tool_results
2107            .get(&tool_use.id)
2108            .expect("expected tool result");
2109
2110        let result_text = match &tool_result.content {
2111            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2112            _ => panic!("expected text content in tool result"),
2113        };
2114
2115        // "partial output" comes from FakeTerminalHandle's output field
2116        assert!(
2117            result_text.contains("partial output"),
2118            "expected tool result to contain terminal output, got: {result_text}"
2119        );
2120        // Match the actual format from process_content in terminal_tool.rs
2121        assert!(
2122            result_text.contains("The user stopped this command"),
2123            "expected tool result to indicate user stopped, got: {result_text}"
2124        );
2125    });
2126
2127    // Verify we can send a new message after cancellation
2128    verify_thread_recovery(&thread, &fake_model, cx).await;
2129}
2130
2131#[gpui::test]
2132async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2133    // This test verifies that tools which properly handle cancellation via
2134    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2135    // to cancellation and report that they were cancelled.
2136    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2137    always_allow_tools(cx);
2138    let fake_model = model.as_fake();
2139
2140    let (tool, was_cancelled) = CancellationAwareTool::new();
2141
2142    let mut events = thread
2143        .update(cx, |thread, cx| {
2144            thread.add_tool(tool);
2145            thread.send(
2146                UserMessageId::new(),
2147                ["call the cancellation aware tool"],
2148                cx,
2149            )
2150        })
2151        .unwrap();
2152
2153    cx.run_until_parked();
2154
2155    // Simulate the model calling the cancellation-aware tool
2156    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2157        LanguageModelToolUse {
2158            id: "cancellation_aware_1".into(),
2159            name: "cancellation_aware".into(),
2160            raw_input: r#"{}"#.into(),
2161            input: json!({}),
2162            is_input_complete: true,
2163            thought_signature: None,
2164        },
2165    ));
2166    fake_model.end_last_completion_stream();
2167
2168    cx.run_until_parked();
2169
2170    // Wait for the tool call to be reported
2171    let mut tool_started = false;
2172    let deadline = cx.executor().num_cpus() * 100;
2173    for _ in 0..deadline {
2174        cx.run_until_parked();
2175
2176        while let Some(Some(event)) = events.next().now_or_never() {
2177            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2178                if tool_call.title == "Cancellation Aware Tool" {
2179                    tool_started = true;
2180                    break;
2181                }
2182            }
2183        }
2184
2185        if tool_started {
2186            break;
2187        }
2188
2189        cx.background_executor
2190            .timer(Duration::from_millis(10))
2191            .await;
2192    }
2193    assert!(tool_started, "expected cancellation aware tool to start");
2194
2195    // Cancel the thread and wait for it to complete
2196    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2197
2198    // The cancel task should complete promptly because the tool handles cancellation
2199    let timeout = cx.background_executor.timer(Duration::from_secs(5));
2200    futures::select! {
2201        _ = cancel_task.fuse() => {}
2202        _ = timeout.fuse() => {
2203            panic!("cancel task timed out - tool did not respond to cancellation");
2204        }
2205    }
2206
2207    // Verify the tool detected cancellation via its flag
2208    assert!(
2209        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2210        "tool should have detected cancellation via event_stream.cancelled_by_user()"
2211    );
2212
2213    // Collect remaining events
2214    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2215
2216    // Verify we got a cancellation stop event
2217    assert_eq!(
2218        stop_events(remaining_events),
2219        vec![acp::StopReason::Cancelled],
2220    );
2221
2222    // Verify we can send a new message after cancellation
2223    verify_thread_recovery(&thread, &fake_model, cx).await;
2224}
2225
2226/// Helper to verify thread can recover after cancellation by sending a simple message.
2227async fn verify_thread_recovery(
2228    thread: &Entity<Thread>,
2229    fake_model: &FakeLanguageModel,
2230    cx: &mut TestAppContext,
2231) {
2232    let events = thread
2233        .update(cx, |thread, cx| {
2234            thread.send(
2235                UserMessageId::new(),
2236                ["Testing: reply with 'Hello' then stop."],
2237                cx,
2238            )
2239        })
2240        .unwrap();
2241    cx.run_until_parked();
2242    fake_model.send_last_completion_stream_text_chunk("Hello");
2243    fake_model
2244        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2245    fake_model.end_last_completion_stream();
2246
2247    let events = events.collect::<Vec<_>>().await;
2248    thread.update(cx, |thread, _cx| {
2249        let message = thread.last_received_or_pending_message().unwrap();
2250        let agent_message = message.as_agent_message().unwrap();
2251        assert_eq!(
2252            agent_message.content,
2253            vec![AgentMessageContent::Text("Hello".to_string())]
2254        );
2255    });
2256    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2257}
2258
2259/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2260async fn wait_for_terminal_tool_started(
2261    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2262    cx: &mut TestAppContext,
2263) {
2264    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2265    for _ in 0..deadline {
2266        cx.run_until_parked();
2267
2268        while let Some(Some(event)) = events.next().now_or_never() {
2269            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2270                update,
2271            ))) = &event
2272            {
2273                if update.fields.content.as_ref().is_some_and(|content| {
2274                    content
2275                        .iter()
2276                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2277                }) {
2278                    return;
2279                }
2280            }
2281        }
2282
2283        cx.background_executor
2284            .timer(Duration::from_millis(10))
2285            .await;
2286    }
2287    panic!("terminal tool did not start within the expected time");
2288}
2289
2290/// Collects events until a Stop event is received, driving the executor to completion.
2291async fn collect_events_until_stop(
2292    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2293    cx: &mut TestAppContext,
2294) -> Vec<Result<ThreadEvent>> {
2295    let mut collected = Vec::new();
2296    let deadline = cx.executor().num_cpus() * 200;
2297
2298    for _ in 0..deadline {
2299        cx.executor().advance_clock(Duration::from_millis(10));
2300        cx.run_until_parked();
2301
2302        while let Some(Some(event)) = events.next().now_or_never() {
2303            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2304            collected.push(event);
2305            if is_stop {
2306                return collected;
2307            }
2308        }
2309    }
2310    panic!(
2311        "did not receive Stop event within the expected time; collected {} events",
2312        collected.len()
2313    );
2314}
2315
2316#[gpui::test]
2317async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2318    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2319    always_allow_tools(cx);
2320    let fake_model = model.as_fake();
2321
2322    let environment = Rc::new(cx.update(|cx| {
2323        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2324    }));
2325    let handle = environment.terminal_handle.clone().unwrap();
2326
2327    let message_id = UserMessageId::new();
2328    let mut events = thread
2329        .update(cx, |thread, cx| {
2330            thread.add_tool(crate::TerminalTool::new(
2331                thread.project().clone(),
2332                environment,
2333            ));
2334            thread.send(message_id.clone(), ["run a command"], cx)
2335        })
2336        .unwrap();
2337
2338    cx.run_until_parked();
2339
2340    // Simulate the model calling the terminal tool
2341    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2342        LanguageModelToolUse {
2343            id: "terminal_tool_1".into(),
2344            name: TerminalTool::NAME.into(),
2345            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2346            input: json!({"command": "sleep 1000", "cd": "."}),
2347            is_input_complete: true,
2348            thought_signature: None,
2349        },
2350    ));
2351    fake_model.end_last_completion_stream();
2352
2353    // Wait for the terminal tool to start running
2354    wait_for_terminal_tool_started(&mut events, cx).await;
2355
2356    // Truncate the thread while the terminal is running
2357    thread
2358        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2359        .unwrap();
2360
2361    // Drive the executor to let cancellation complete
2362    let _ = collect_events_until_stop(&mut events, cx).await;
2363
2364    // Verify the terminal was killed
2365    assert!(
2366        handle.was_killed(),
2367        "expected terminal handle to be killed on truncate"
2368    );
2369
2370    // Verify the thread is empty after truncation
2371    thread.update(cx, |thread, _cx| {
2372        assert_eq!(
2373            thread.to_markdown(),
2374            "",
2375            "expected thread to be empty after truncating the only message"
2376        );
2377    });
2378
2379    // Verify we can send a new message after truncation
2380    verify_thread_recovery(&thread, &fake_model, cx).await;
2381}
2382
2383#[gpui::test]
2384async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2385    // Tests that cancellation properly kills all running terminal tools when multiple are active.
2386    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2387    always_allow_tools(cx);
2388    let fake_model = model.as_fake();
2389
2390    let environment = Rc::new(MultiTerminalEnvironment::new());
2391
2392    let mut events = thread
2393        .update(cx, |thread, cx| {
2394            thread.add_tool(crate::TerminalTool::new(
2395                thread.project().clone(),
2396                environment.clone(),
2397            ));
2398            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2399        })
2400        .unwrap();
2401
2402    cx.run_until_parked();
2403
2404    // Simulate the model calling two terminal tools
2405    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2406        LanguageModelToolUse {
2407            id: "terminal_tool_1".into(),
2408            name: TerminalTool::NAME.into(),
2409            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2410            input: json!({"command": "sleep 1000", "cd": "."}),
2411            is_input_complete: true,
2412            thought_signature: None,
2413        },
2414    ));
2415    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2416        LanguageModelToolUse {
2417            id: "terminal_tool_2".into(),
2418            name: TerminalTool::NAME.into(),
2419            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2420            input: json!({"command": "sleep 2000", "cd": "."}),
2421            is_input_complete: true,
2422            thought_signature: None,
2423        },
2424    ));
2425    fake_model.end_last_completion_stream();
2426
2427    // Wait for both terminal tools to start by counting terminal content updates
2428    let mut terminals_started = 0;
2429    let deadline = cx.executor().num_cpus() * 100;
2430    for _ in 0..deadline {
2431        cx.run_until_parked();
2432
2433        while let Some(Some(event)) = events.next().now_or_never() {
2434            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2435                update,
2436            ))) = &event
2437            {
2438                if update.fields.content.as_ref().is_some_and(|content| {
2439                    content
2440                        .iter()
2441                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2442                }) {
2443                    terminals_started += 1;
2444                    if terminals_started >= 2 {
2445                        break;
2446                    }
2447                }
2448            }
2449        }
2450        if terminals_started >= 2 {
2451            break;
2452        }
2453
2454        cx.background_executor
2455            .timer(Duration::from_millis(10))
2456            .await;
2457    }
2458    assert!(
2459        terminals_started >= 2,
2460        "expected 2 terminal tools to start, got {terminals_started}"
2461    );
2462
2463    // Cancel the thread while both terminals are running
2464    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2465
2466    // Collect remaining events
2467    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2468
2469    // Verify both terminal handles were killed
2470    let handles = environment.handles();
2471    assert_eq!(
2472        handles.len(),
2473        2,
2474        "expected 2 terminal handles to be created"
2475    );
2476    assert!(
2477        handles[0].was_killed(),
2478        "expected first terminal handle to be killed on cancellation"
2479    );
2480    assert!(
2481        handles[1].was_killed(),
2482        "expected second terminal handle to be killed on cancellation"
2483    );
2484
2485    // Verify we got a cancellation stop event
2486    assert_eq!(
2487        stop_events(remaining_events),
2488        vec![acp::StopReason::Cancelled],
2489    );
2490}
2491
2492#[gpui::test]
2493async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2494    // Tests that clicking the stop button on the terminal card (as opposed to the main
2495    // cancel button) properly reports user stopped via the was_stopped_by_user path.
2496    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2497    always_allow_tools(cx);
2498    let fake_model = model.as_fake();
2499
2500    let environment = Rc::new(cx.update(|cx| {
2501        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2502    }));
2503    let handle = environment.terminal_handle.clone().unwrap();
2504
2505    let mut events = thread
2506        .update(cx, |thread, cx| {
2507            thread.add_tool(crate::TerminalTool::new(
2508                thread.project().clone(),
2509                environment,
2510            ));
2511            thread.send(UserMessageId::new(), ["run a command"], cx)
2512        })
2513        .unwrap();
2514
2515    cx.run_until_parked();
2516
2517    // Simulate the model calling the terminal tool
2518    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2519        LanguageModelToolUse {
2520            id: "terminal_tool_1".into(),
2521            name: TerminalTool::NAME.into(),
2522            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2523            input: json!({"command": "sleep 1000", "cd": "."}),
2524            is_input_complete: true,
2525            thought_signature: None,
2526        },
2527    ));
2528    fake_model.end_last_completion_stream();
2529
2530    // Wait for the terminal tool to start running
2531    wait_for_terminal_tool_started(&mut events, cx).await;
2532
2533    // Simulate user clicking stop on the terminal card itself.
2534    // This sets the flag and signals exit (simulating what the real UI would do).
2535    handle.set_stopped_by_user(true);
2536    handle.killed.store(true, Ordering::SeqCst);
2537    handle.signal_exit();
2538
2539    // Wait for the tool to complete
2540    cx.run_until_parked();
2541
2542    // The thread continues after tool completion - simulate the model ending its turn
2543    fake_model
2544        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2545    fake_model.end_last_completion_stream();
2546
2547    // Collect remaining events
2548    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2549
2550    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2551    assert_eq!(
2552        stop_events(remaining_events),
2553        vec![acp::StopReason::EndTurn],
2554    );
2555
2556    // Verify the tool result indicates user stopped
2557    thread.update(cx, |thread, _cx| {
2558        let message = thread.last_received_or_pending_message().unwrap();
2559        let agent_message = message.as_agent_message().unwrap();
2560
2561        let tool_use = agent_message
2562            .content
2563            .iter()
2564            .find_map(|content| match content {
2565                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2566                _ => None,
2567            })
2568            .expect("expected tool use in agent message");
2569
2570        let tool_result = agent_message
2571            .tool_results
2572            .get(&tool_use.id)
2573            .expect("expected tool result");
2574
2575        let result_text = match &tool_result.content {
2576            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2577            _ => panic!("expected text content in tool result"),
2578        };
2579
2580        assert!(
2581            result_text.contains("The user stopped this command"),
2582            "expected tool result to indicate user stopped, got: {result_text}"
2583        );
2584    });
2585}
2586
2587#[gpui::test]
2588async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2589    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2590    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2591    always_allow_tools(cx);
2592    let fake_model = model.as_fake();
2593
2594    let environment = Rc::new(cx.update(|cx| {
2595        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2596    }));
2597    let handle = environment.terminal_handle.clone().unwrap();
2598
2599    let mut events = thread
2600        .update(cx, |thread, cx| {
2601            thread.add_tool(crate::TerminalTool::new(
2602                thread.project().clone(),
2603                environment,
2604            ));
2605            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2606        })
2607        .unwrap();
2608
2609    cx.run_until_parked();
2610
2611    // Simulate the model calling the terminal tool with a short timeout
2612    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2613        LanguageModelToolUse {
2614            id: "terminal_tool_1".into(),
2615            name: TerminalTool::NAME.into(),
2616            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2617            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2618            is_input_complete: true,
2619            thought_signature: None,
2620        },
2621    ));
2622    fake_model.end_last_completion_stream();
2623
2624    // Wait for the terminal tool to start running
2625    wait_for_terminal_tool_started(&mut events, cx).await;
2626
2627    // Advance clock past the timeout
2628    cx.executor().advance_clock(Duration::from_millis(200));
2629    cx.run_until_parked();
2630
2631    // The thread continues after tool completion - simulate the model ending its turn
2632    fake_model
2633        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2634    fake_model.end_last_completion_stream();
2635
2636    // Collect remaining events
2637    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2638
2639    // Verify the terminal was killed due to timeout
2640    assert!(
2641        handle.was_killed(),
2642        "expected terminal handle to be killed on timeout"
2643    );
2644
2645    // Verify we got an EndTurn (the tool completed, just with timeout)
2646    assert_eq!(
2647        stop_events(remaining_events),
2648        vec![acp::StopReason::EndTurn],
2649    );
2650
2651    // Verify the tool result indicates timeout, not user stopped
2652    thread.update(cx, |thread, _cx| {
2653        let message = thread.last_received_or_pending_message().unwrap();
2654        let agent_message = message.as_agent_message().unwrap();
2655
2656        let tool_use = agent_message
2657            .content
2658            .iter()
2659            .find_map(|content| match content {
2660                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2661                _ => None,
2662            })
2663            .expect("expected tool use in agent message");
2664
2665        let tool_result = agent_message
2666            .tool_results
2667            .get(&tool_use.id)
2668            .expect("expected tool result");
2669
2670        let result_text = match &tool_result.content {
2671            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2672            _ => panic!("expected text content in tool result"),
2673        };
2674
2675        assert!(
2676            result_text.contains("timed out"),
2677            "expected tool result to indicate timeout, got: {result_text}"
2678        );
2679        assert!(
2680            !result_text.contains("The user stopped"),
2681            "tool result should not mention user stopped when it timed out, got: {result_text}"
2682        );
2683    });
2684}
2685
2686#[gpui::test]
2687async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2688    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2689    let fake_model = model.as_fake();
2690
2691    let events_1 = thread
2692        .update(cx, |thread, cx| {
2693            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2694        })
2695        .unwrap();
2696    cx.run_until_parked();
2697    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2698    cx.run_until_parked();
2699
2700    let events_2 = thread
2701        .update(cx, |thread, cx| {
2702            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2703        })
2704        .unwrap();
2705    cx.run_until_parked();
2706    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2707    fake_model
2708        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2709    fake_model.end_last_completion_stream();
2710
2711    let events_1 = events_1.collect::<Vec<_>>().await;
2712    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2713    let events_2 = events_2.collect::<Vec<_>>().await;
2714    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2715}
2716
2717#[gpui::test]
2718async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2719    // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2720    // the retry loop waits on a timer. If the user switches models and sends a new message
2721    // during that delay, the old turn should exit immediately instead of retrying with the
2722    // stale model.
2723    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2724    let model_a = model.as_fake();
2725
2726    // Start a turn with model_a.
2727    let events_1 = thread
2728        .update(cx, |thread, cx| {
2729            thread.send(UserMessageId::new(), ["Hello"], cx)
2730        })
2731        .unwrap();
2732    cx.run_until_parked();
2733    assert_eq!(model_a.completion_count(), 1);
2734
2735    // Model returns a retryable upstream 500. The turn enters the retry delay.
2736    model_a.send_last_completion_stream_error(
2737        LanguageModelCompletionError::UpstreamProviderError {
2738            message: "Internal server error".to_string(),
2739            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2740            retry_after: None,
2741        },
2742    );
2743    model_a.end_last_completion_stream();
2744    cx.run_until_parked();
2745
2746    // The old completion was consumed; model_a has no pending requests yet because the
2747    // retry timer hasn't fired.
2748    assert_eq!(model_a.completion_count(), 0);
2749
2750    // Switch to model_b and send a new message. This cancels the old turn.
2751    let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2752        "fake", "model-b", "Model B", false,
2753    ));
2754    thread.update(cx, |thread, cx| {
2755        thread.set_model(model_b.clone(), cx);
2756    });
2757    let events_2 = thread
2758        .update(cx, |thread, cx| {
2759            thread.send(UserMessageId::new(), ["Continue"], cx)
2760        })
2761        .unwrap();
2762    cx.run_until_parked();
2763
2764    // model_b should have received its completion request.
2765    assert_eq!(model_b.as_fake().completion_count(), 1);
2766
2767    // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2768    cx.executor().advance_clock(Duration::from_secs(10));
2769    cx.run_until_parked();
2770
2771    // model_a must NOT have received another completion request — the cancelled turn
2772    // should have exited during the retry delay rather than retrying with the old model.
2773    assert_eq!(
2774        model_a.completion_count(),
2775        0,
2776        "old model should not receive a retry request after cancellation"
2777    );
2778
2779    // Complete model_b's turn.
2780    model_b
2781        .as_fake()
2782        .send_last_completion_stream_text_chunk("Done!");
2783    model_b
2784        .as_fake()
2785        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2786    model_b.as_fake().end_last_completion_stream();
2787
2788    let events_1 = events_1.collect::<Vec<_>>().await;
2789    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2790
2791    let events_2 = events_2.collect::<Vec<_>>().await;
2792    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2793}
2794
2795#[gpui::test]
2796async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2797    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2798    let fake_model = model.as_fake();
2799
2800    let events_1 = thread
2801        .update(cx, |thread, cx| {
2802            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2803        })
2804        .unwrap();
2805    cx.run_until_parked();
2806    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2807    fake_model
2808        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2809    fake_model.end_last_completion_stream();
2810    let events_1 = events_1.collect::<Vec<_>>().await;
2811
2812    let events_2 = thread
2813        .update(cx, |thread, cx| {
2814            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2815        })
2816        .unwrap();
2817    cx.run_until_parked();
2818    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2819    fake_model
2820        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2821    fake_model.end_last_completion_stream();
2822    let events_2 = events_2.collect::<Vec<_>>().await;
2823
2824    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2825    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2826}
2827
2828#[gpui::test]
2829async fn test_refusal(cx: &mut TestAppContext) {
2830    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2831    let fake_model = model.as_fake();
2832
2833    let events = thread
2834        .update(cx, |thread, cx| {
2835            thread.send(UserMessageId::new(), ["Hello"], cx)
2836        })
2837        .unwrap();
2838    cx.run_until_parked();
2839    thread.read_with(cx, |thread, _| {
2840        assert_eq!(
2841            thread.to_markdown(),
2842            indoc! {"
2843                ## User
2844
2845                Hello
2846            "}
2847        );
2848    });
2849
2850    fake_model.send_last_completion_stream_text_chunk("Hey!");
2851    cx.run_until_parked();
2852    thread.read_with(cx, |thread, _| {
2853        assert_eq!(
2854            thread.to_markdown(),
2855            indoc! {"
2856                ## User
2857
2858                Hello
2859
2860                ## Assistant
2861
2862                Hey!
2863            "}
2864        );
2865    });
2866
2867    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2868    fake_model
2869        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2870    let events = events.collect::<Vec<_>>().await;
2871    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2872    thread.read_with(cx, |thread, _| {
2873        assert_eq!(thread.to_markdown(), "");
2874    });
2875}
2876
2877#[gpui::test]
2878async fn test_truncate_first_message(cx: &mut TestAppContext) {
2879    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2880    let fake_model = model.as_fake();
2881
2882    let message_id = UserMessageId::new();
2883    thread
2884        .update(cx, |thread, cx| {
2885            thread.send(message_id.clone(), ["Hello"], cx)
2886        })
2887        .unwrap();
2888    cx.run_until_parked();
2889    thread.read_with(cx, |thread, _| {
2890        assert_eq!(
2891            thread.to_markdown(),
2892            indoc! {"
2893                ## User
2894
2895                Hello
2896            "}
2897        );
2898        assert_eq!(thread.latest_token_usage(), None);
2899    });
2900
2901    fake_model.send_last_completion_stream_text_chunk("Hey!");
2902    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2903        language_model::TokenUsage {
2904            input_tokens: 32_000,
2905            output_tokens: 16_000,
2906            cache_creation_input_tokens: 0,
2907            cache_read_input_tokens: 0,
2908        },
2909    ));
2910    cx.run_until_parked();
2911    thread.read_with(cx, |thread, _| {
2912        assert_eq!(
2913            thread.to_markdown(),
2914            indoc! {"
2915                ## User
2916
2917                Hello
2918
2919                ## Assistant
2920
2921                Hey!
2922            "}
2923        );
2924        assert_eq!(
2925            thread.latest_token_usage(),
2926            Some(acp_thread::TokenUsage {
2927                used_tokens: 32_000 + 16_000,
2928                max_tokens: 1_000_000,
2929                max_output_tokens: None,
2930                input_tokens: 32_000,
2931                output_tokens: 16_000,
2932            })
2933        );
2934    });
2935
2936    thread
2937        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2938        .unwrap();
2939    cx.run_until_parked();
2940    thread.read_with(cx, |thread, _| {
2941        assert_eq!(thread.to_markdown(), "");
2942        assert_eq!(thread.latest_token_usage(), None);
2943    });
2944
2945    // Ensure we can still send a new message after truncation.
2946    thread
2947        .update(cx, |thread, cx| {
2948            thread.send(UserMessageId::new(), ["Hi"], cx)
2949        })
2950        .unwrap();
2951    thread.update(cx, |thread, _cx| {
2952        assert_eq!(
2953            thread.to_markdown(),
2954            indoc! {"
2955                ## User
2956
2957                Hi
2958            "}
2959        );
2960    });
2961    cx.run_until_parked();
2962    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2963    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2964        language_model::TokenUsage {
2965            input_tokens: 40_000,
2966            output_tokens: 20_000,
2967            cache_creation_input_tokens: 0,
2968            cache_read_input_tokens: 0,
2969        },
2970    ));
2971    cx.run_until_parked();
2972    thread.read_with(cx, |thread, _| {
2973        assert_eq!(
2974            thread.to_markdown(),
2975            indoc! {"
2976                ## User
2977
2978                Hi
2979
2980                ## Assistant
2981
2982                Ahoy!
2983            "}
2984        );
2985
2986        assert_eq!(
2987            thread.latest_token_usage(),
2988            Some(acp_thread::TokenUsage {
2989                used_tokens: 40_000 + 20_000,
2990                max_tokens: 1_000_000,
2991                max_output_tokens: None,
2992                input_tokens: 40_000,
2993                output_tokens: 20_000,
2994            })
2995        );
2996    });
2997}
2998
2999#[gpui::test]
3000async fn test_truncate_second_message(cx: &mut TestAppContext) {
3001    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3002    let fake_model = model.as_fake();
3003
3004    thread
3005        .update(cx, |thread, cx| {
3006            thread.send(UserMessageId::new(), ["Message 1"], cx)
3007        })
3008        .unwrap();
3009    cx.run_until_parked();
3010    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3011    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3012        language_model::TokenUsage {
3013            input_tokens: 32_000,
3014            output_tokens: 16_000,
3015            cache_creation_input_tokens: 0,
3016            cache_read_input_tokens: 0,
3017        },
3018    ));
3019    fake_model.end_last_completion_stream();
3020    cx.run_until_parked();
3021
3022    let assert_first_message_state = |cx: &mut TestAppContext| {
3023        thread.clone().read_with(cx, |thread, _| {
3024            assert_eq!(
3025                thread.to_markdown(),
3026                indoc! {"
3027                    ## User
3028
3029                    Message 1
3030
3031                    ## Assistant
3032
3033                    Message 1 response
3034                "}
3035            );
3036
3037            assert_eq!(
3038                thread.latest_token_usage(),
3039                Some(acp_thread::TokenUsage {
3040                    used_tokens: 32_000 + 16_000,
3041                    max_tokens: 1_000_000,
3042                    max_output_tokens: None,
3043                    input_tokens: 32_000,
3044                    output_tokens: 16_000,
3045                })
3046            );
3047        });
3048    };
3049
3050    assert_first_message_state(cx);
3051
3052    let second_message_id = UserMessageId::new();
3053    thread
3054        .update(cx, |thread, cx| {
3055            thread.send(second_message_id.clone(), ["Message 2"], cx)
3056        })
3057        .unwrap();
3058    cx.run_until_parked();
3059
3060    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3061    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3062        language_model::TokenUsage {
3063            input_tokens: 40_000,
3064            output_tokens: 20_000,
3065            cache_creation_input_tokens: 0,
3066            cache_read_input_tokens: 0,
3067        },
3068    ));
3069    fake_model.end_last_completion_stream();
3070    cx.run_until_parked();
3071
3072    thread.read_with(cx, |thread, _| {
3073        assert_eq!(
3074            thread.to_markdown(),
3075            indoc! {"
3076                ## User
3077
3078                Message 1
3079
3080                ## Assistant
3081
3082                Message 1 response
3083
3084                ## User
3085
3086                Message 2
3087
3088                ## Assistant
3089
3090                Message 2 response
3091            "}
3092        );
3093
3094        assert_eq!(
3095            thread.latest_token_usage(),
3096            Some(acp_thread::TokenUsage {
3097                used_tokens: 40_000 + 20_000,
3098                max_tokens: 1_000_000,
3099                max_output_tokens: None,
3100                input_tokens: 40_000,
3101                output_tokens: 20_000,
3102            })
3103        );
3104    });
3105
3106    thread
3107        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3108        .unwrap();
3109    cx.run_until_parked();
3110
3111    assert_first_message_state(cx);
3112}
3113
3114#[gpui::test]
3115async fn test_title_generation(cx: &mut TestAppContext) {
3116    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3117    let fake_model = model.as_fake();
3118
3119    let summary_model = Arc::new(FakeLanguageModel::default());
3120    thread.update(cx, |thread, cx| {
3121        thread.set_summarization_model(Some(summary_model.clone()), cx)
3122    });
3123
3124    let send = thread
3125        .update(cx, |thread, cx| {
3126            thread.send(UserMessageId::new(), ["Hello"], cx)
3127        })
3128        .unwrap();
3129    cx.run_until_parked();
3130
3131    fake_model.send_last_completion_stream_text_chunk("Hey!");
3132    fake_model.end_last_completion_stream();
3133    cx.run_until_parked();
3134    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), None));
3135
3136    // Ensure the summary model has been invoked to generate a title.
3137    summary_model.send_last_completion_stream_text_chunk("Hello ");
3138    summary_model.send_last_completion_stream_text_chunk("world\nG");
3139    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3140    summary_model.end_last_completion_stream();
3141    send.collect::<Vec<_>>().await;
3142    cx.run_until_parked();
3143    thread.read_with(cx, |thread, _| {
3144        assert_eq!(thread.title(), Some("Hello world".into()))
3145    });
3146
3147    // Send another message, ensuring no title is generated this time.
3148    let send = thread
3149        .update(cx, |thread, cx| {
3150            thread.send(UserMessageId::new(), ["Hello again"], cx)
3151        })
3152        .unwrap();
3153    cx.run_until_parked();
3154    fake_model.send_last_completion_stream_text_chunk("Hey again!");
3155    fake_model.end_last_completion_stream();
3156    cx.run_until_parked();
3157    assert_eq!(summary_model.pending_completions(), Vec::new());
3158    send.collect::<Vec<_>>().await;
3159    thread.read_with(cx, |thread, _| {
3160        assert_eq!(thread.title(), Some("Hello world".into()))
3161    });
3162}
3163
3164#[gpui::test]
3165async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3166    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3167    let fake_model = model.as_fake();
3168
3169    let _events = thread
3170        .update(cx, |thread, cx| {
3171            thread.add_tool(ToolRequiringPermission);
3172            thread.add_tool(EchoTool);
3173            thread.send(UserMessageId::new(), ["Hey!"], cx)
3174        })
3175        .unwrap();
3176    cx.run_until_parked();
3177
3178    let permission_tool_use = LanguageModelToolUse {
3179        id: "tool_id_1".into(),
3180        name: ToolRequiringPermission::NAME.into(),
3181        raw_input: "{}".into(),
3182        input: json!({}),
3183        is_input_complete: true,
3184        thought_signature: None,
3185    };
3186    let echo_tool_use = LanguageModelToolUse {
3187        id: "tool_id_2".into(),
3188        name: EchoTool::NAME.into(),
3189        raw_input: json!({"text": "test"}).to_string(),
3190        input: json!({"text": "test"}),
3191        is_input_complete: true,
3192        thought_signature: None,
3193    };
3194    fake_model.send_last_completion_stream_text_chunk("Hi!");
3195    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3196        permission_tool_use,
3197    ));
3198    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3199        echo_tool_use.clone(),
3200    ));
3201    fake_model.end_last_completion_stream();
3202    cx.run_until_parked();
3203
3204    // Ensure pending tools are skipped when building a request.
3205    let request = thread
3206        .read_with(cx, |thread, cx| {
3207            thread.build_completion_request(CompletionIntent::EditFile, cx)
3208        })
3209        .unwrap();
3210    assert_eq!(
3211        request.messages[1..],
3212        vec![
3213            LanguageModelRequestMessage {
3214                role: Role::User,
3215                content: vec!["Hey!".into()],
3216                cache: true,
3217                reasoning_details: None,
3218            },
3219            LanguageModelRequestMessage {
3220                role: Role::Assistant,
3221                content: vec![
3222                    MessageContent::Text("Hi!".into()),
3223                    MessageContent::ToolUse(echo_tool_use.clone())
3224                ],
3225                cache: false,
3226                reasoning_details: None,
3227            },
3228            LanguageModelRequestMessage {
3229                role: Role::User,
3230                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3231                    tool_use_id: echo_tool_use.id.clone(),
3232                    tool_name: echo_tool_use.name,
3233                    is_error: false,
3234                    content: "test".into(),
3235                    output: Some("test".into())
3236                })],
3237                cache: false,
3238                reasoning_details: None,
3239            },
3240        ],
3241    );
3242}
3243
3244#[gpui::test]
3245async fn test_agent_connection(cx: &mut TestAppContext) {
3246    cx.update(settings::init);
3247    let templates = Templates::new();
3248
3249    // Initialize language model system with test provider
3250    cx.update(|cx| {
3251        gpui_tokio::init(cx);
3252
3253        let http_client = FakeHttpClient::with_404_response();
3254        let clock = Arc::new(clock::FakeSystemClock::new());
3255        let client = Client::new(clock, http_client, cx);
3256        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3257        language_model::init(user_store.clone(), client.clone(), cx);
3258        language_models::init(user_store, client.clone(), cx);
3259        LanguageModelRegistry::test(cx);
3260    });
3261    cx.executor().forbid_parking();
3262
3263    // Create a project for new_thread
3264    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3265    fake_fs.insert_tree(path!("/test"), json!({})).await;
3266    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3267    let cwd = PathList::new(&[Path::new("/test")]);
3268    let thread_store = cx.new(|cx| ThreadStore::new(cx));
3269
3270    // Create agent and connection
3271    let agent = cx
3272        .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3273    let connection = NativeAgentConnection(agent.clone());
3274
3275    // Create a thread using new_thread
3276    let connection_rc = Rc::new(connection.clone());
3277    let acp_thread = cx
3278        .update(|cx| connection_rc.new_session(project, cwd, cx))
3279        .await
3280        .expect("new_thread should succeed");
3281
3282    // Get the session_id from the AcpThread
3283    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3284
3285    // Test model_selector returns Some
3286    let selector_opt = connection.model_selector(&session_id);
3287    assert!(
3288        selector_opt.is_some(),
3289        "agent should always support ModelSelector"
3290    );
3291    let selector = selector_opt.unwrap();
3292
3293    // Test list_models
3294    let listed_models = cx
3295        .update(|cx| selector.list_models(cx))
3296        .await
3297        .expect("list_models should succeed");
3298    let AgentModelList::Grouped(listed_models) = listed_models else {
3299        panic!("Unexpected model list type");
3300    };
3301    assert!(!listed_models.is_empty(), "should have at least one model");
3302    assert_eq!(
3303        listed_models[&AgentModelGroupName("Fake".into())][0]
3304            .id
3305            .0
3306            .as_ref(),
3307        "fake/fake"
3308    );
3309
3310    // Test selected_model returns the default
3311    let model = cx
3312        .update(|cx| selector.selected_model(cx))
3313        .await
3314        .expect("selected_model should succeed");
3315    let model = cx
3316        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3317        .unwrap();
3318    let model = model.as_fake();
3319    assert_eq!(model.id().0, "fake", "should return default model");
3320
3321    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3322    cx.run_until_parked();
3323    model.send_last_completion_stream_text_chunk("def");
3324    cx.run_until_parked();
3325    acp_thread.read_with(cx, |thread, cx| {
3326        assert_eq!(
3327            thread.to_markdown(cx),
3328            indoc! {"
3329                ## User
3330
3331                abc
3332
3333                ## Assistant
3334
3335                def
3336
3337            "}
3338        )
3339    });
3340
3341    // Test cancel
3342    cx.update(|cx| connection.cancel(&session_id, cx));
3343    request.await.expect("prompt should fail gracefully");
3344
3345    // Explicitly close the session and drop the ACP thread.
3346    cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3347        .await
3348        .unwrap();
3349    drop(acp_thread);
3350    let result = cx
3351        .update(|cx| {
3352            connection.prompt(
3353                Some(acp_thread::UserMessageId::new()),
3354                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3355                cx,
3356            )
3357        })
3358        .await;
3359    assert_eq!(
3360        result.as_ref().unwrap_err().to_string(),
3361        "Session not found",
3362        "unexpected result: {:?}",
3363        result
3364    );
3365}
3366
3367#[gpui::test]
3368async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3369    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3370    thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3371    let fake_model = model.as_fake();
3372
3373    let mut events = thread
3374        .update(cx, |thread, cx| {
3375            thread.send(UserMessageId::new(), ["Echo something"], cx)
3376        })
3377        .unwrap();
3378    cx.run_until_parked();
3379
3380    // Simulate streaming partial input.
3381    let input = json!({});
3382    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3383        LanguageModelToolUse {
3384            id: "1".into(),
3385            name: EchoTool::NAME.into(),
3386            raw_input: input.to_string(),
3387            input,
3388            is_input_complete: false,
3389            thought_signature: None,
3390        },
3391    ));
3392
3393    // Input streaming completed
3394    let input = json!({ "text": "Hello!" });
3395    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3396        LanguageModelToolUse {
3397            id: "1".into(),
3398            name: "echo".into(),
3399            raw_input: input.to_string(),
3400            input,
3401            is_input_complete: true,
3402            thought_signature: None,
3403        },
3404    ));
3405    fake_model.end_last_completion_stream();
3406    cx.run_until_parked();
3407
3408    let tool_call = expect_tool_call(&mut events).await;
3409    assert_eq!(
3410        tool_call,
3411        acp::ToolCall::new("1", "Echo")
3412            .raw_input(json!({}))
3413            .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3414    );
3415    let update = expect_tool_call_update_fields(&mut events).await;
3416    assert_eq!(
3417        update,
3418        acp::ToolCallUpdate::new(
3419            "1",
3420            acp::ToolCallUpdateFields::new()
3421                .title("Echo")
3422                .kind(acp::ToolKind::Other)
3423                .raw_input(json!({ "text": "Hello!"}))
3424        )
3425    );
3426    let update = expect_tool_call_update_fields(&mut events).await;
3427    assert_eq!(
3428        update,
3429        acp::ToolCallUpdate::new(
3430            "1",
3431            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3432        )
3433    );
3434    let update = expect_tool_call_update_fields(&mut events).await;
3435    assert_eq!(
3436        update,
3437        acp::ToolCallUpdate::new(
3438            "1",
3439            acp::ToolCallUpdateFields::new()
3440                .status(acp::ToolCallStatus::Completed)
3441                .raw_output("Hello!")
3442        )
3443    );
3444}
3445
3446#[gpui::test]
3447async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3448    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3449    thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3450    let fake_model = model.as_fake();
3451
3452    let mut events = thread
3453        .update(cx, |thread, cx| {
3454            thread.send(UserMessageId::new(), ["Make a plan"], cx)
3455        })
3456        .unwrap();
3457    cx.run_until_parked();
3458
3459    let input = json!({
3460        "plan": [
3461            {
3462                "step": "Inspect the code",
3463                "status": "completed",
3464            },
3465            {
3466                "step": "Implement the tool",
3467                "status": "in_progress"
3468            },
3469            {
3470                "step": "Run tests",
3471                "status": "pending",
3472            }
3473        ]
3474    });
3475    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3476        LanguageModelToolUse {
3477            id: "plan_1".into(),
3478            name: UpdatePlanTool::NAME.into(),
3479            raw_input: input.to_string(),
3480            input,
3481            is_input_complete: true,
3482            thought_signature: None,
3483        },
3484    ));
3485    fake_model.end_last_completion_stream();
3486    cx.run_until_parked();
3487
3488    let tool_call = expect_tool_call(&mut events).await;
3489    assert_eq!(
3490        tool_call,
3491        acp::ToolCall::new("plan_1", "Update plan")
3492            .kind(acp::ToolKind::Think)
3493            .raw_input(json!({
3494                "plan": [
3495                    {
3496                        "step": "Inspect the code",
3497                        "status": "completed",
3498                    },
3499                    {
3500                        "step": "Implement the tool",
3501                        "status": "in_progress"
3502                    },
3503                    {
3504                        "step": "Run tests",
3505                        "status": "pending",
3506                    }
3507                ]
3508            }))
3509            .meta(acp::Meta::from_iter([(
3510                "tool_name".into(),
3511                "update_plan".into()
3512            )]))
3513    );
3514
3515    let update = expect_tool_call_update_fields(&mut events).await;
3516    assert_eq!(
3517        update,
3518        acp::ToolCallUpdate::new(
3519            "plan_1",
3520            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3521        )
3522    );
3523
3524    let plan = expect_plan(&mut events).await;
3525    assert_eq!(
3526        plan,
3527        acp::Plan::new(vec![
3528            acp::PlanEntry::new(
3529                "Inspect the code",
3530                acp::PlanEntryPriority::Medium,
3531                acp::PlanEntryStatus::Completed,
3532            ),
3533            acp::PlanEntry::new(
3534                "Implement the tool",
3535                acp::PlanEntryPriority::Medium,
3536                acp::PlanEntryStatus::InProgress,
3537            ),
3538            acp::PlanEntry::new(
3539                "Run tests",
3540                acp::PlanEntryPriority::Medium,
3541                acp::PlanEntryStatus::Pending,
3542            ),
3543        ])
3544    );
3545
3546    let update = expect_tool_call_update_fields(&mut events).await;
3547    assert_eq!(
3548        update,
3549        acp::ToolCallUpdate::new(
3550            "plan_1",
3551            acp::ToolCallUpdateFields::new()
3552                .status(acp::ToolCallStatus::Completed)
3553                .raw_output("Plan updated")
3554        )
3555    );
3556}
3557
3558#[gpui::test]
3559async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3560    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3561    let fake_model = model.as_fake();
3562
3563    let mut events = thread
3564        .update(cx, |thread, cx| {
3565            thread.send(UserMessageId::new(), ["Hello!"], cx)
3566        })
3567        .unwrap();
3568    cx.run_until_parked();
3569
3570    fake_model.send_last_completion_stream_text_chunk("Hey!");
3571    fake_model.end_last_completion_stream();
3572
3573    let mut retry_events = Vec::new();
3574    while let Some(Ok(event)) = events.next().await {
3575        match event {
3576            ThreadEvent::Retry(retry_status) => {
3577                retry_events.push(retry_status);
3578            }
3579            ThreadEvent::Stop(..) => break,
3580            _ => {}
3581        }
3582    }
3583
3584    assert_eq!(retry_events.len(), 0);
3585    thread.read_with(cx, |thread, _cx| {
3586        assert_eq!(
3587            thread.to_markdown(),
3588            indoc! {"
3589                ## User
3590
3591                Hello!
3592
3593                ## Assistant
3594
3595                Hey!
3596            "}
3597        )
3598    });
3599}
3600
3601#[gpui::test]
3602async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3603    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3604    let fake_model = model.as_fake();
3605
3606    let mut events = thread
3607        .update(cx, |thread, cx| {
3608            thread.send(UserMessageId::new(), ["Hello!"], cx)
3609        })
3610        .unwrap();
3611    cx.run_until_parked();
3612
3613    fake_model.send_last_completion_stream_text_chunk("Hey,");
3614    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3615        provider: LanguageModelProviderName::new("Anthropic"),
3616        retry_after: Some(Duration::from_secs(3)),
3617    });
3618    fake_model.end_last_completion_stream();
3619
3620    cx.executor().advance_clock(Duration::from_secs(3));
3621    cx.run_until_parked();
3622
3623    fake_model.send_last_completion_stream_text_chunk("there!");
3624    fake_model.end_last_completion_stream();
3625    cx.run_until_parked();
3626
3627    let mut retry_events = Vec::new();
3628    while let Some(Ok(event)) = events.next().await {
3629        match event {
3630            ThreadEvent::Retry(retry_status) => {
3631                retry_events.push(retry_status);
3632            }
3633            ThreadEvent::Stop(..) => break,
3634            _ => {}
3635        }
3636    }
3637
3638    assert_eq!(retry_events.len(), 1);
3639    assert!(matches!(
3640        retry_events[0],
3641        acp_thread::RetryStatus { attempt: 1, .. }
3642    ));
3643    thread.read_with(cx, |thread, _cx| {
3644        assert_eq!(
3645            thread.to_markdown(),
3646            indoc! {"
3647                ## User
3648
3649                Hello!
3650
3651                ## Assistant
3652
3653                Hey,
3654
3655                [resume]
3656
3657                ## Assistant
3658
3659                there!
3660            "}
3661        )
3662    });
3663}
3664
3665#[gpui::test]
3666async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3667    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3668    let fake_model = model.as_fake();
3669
3670    let events = thread
3671        .update(cx, |thread, cx| {
3672            thread.add_tool(EchoTool);
3673            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3674        })
3675        .unwrap();
3676    cx.run_until_parked();
3677
3678    let tool_use_1 = LanguageModelToolUse {
3679        id: "tool_1".into(),
3680        name: EchoTool::NAME.into(),
3681        raw_input: json!({"text": "test"}).to_string(),
3682        input: json!({"text": "test"}),
3683        is_input_complete: true,
3684        thought_signature: None,
3685    };
3686    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3687        tool_use_1.clone(),
3688    ));
3689    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3690        provider: LanguageModelProviderName::new("Anthropic"),
3691        retry_after: Some(Duration::from_secs(3)),
3692    });
3693    fake_model.end_last_completion_stream();
3694
3695    cx.executor().advance_clock(Duration::from_secs(3));
3696    let completion = fake_model.pending_completions().pop().unwrap();
3697    assert_eq!(
3698        completion.messages[1..],
3699        vec![
3700            LanguageModelRequestMessage {
3701                role: Role::User,
3702                content: vec!["Call the echo tool!".into()],
3703                cache: false,
3704                reasoning_details: None,
3705            },
3706            LanguageModelRequestMessage {
3707                role: Role::Assistant,
3708                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3709                cache: false,
3710                reasoning_details: None,
3711            },
3712            LanguageModelRequestMessage {
3713                role: Role::User,
3714                content: vec![language_model::MessageContent::ToolResult(
3715                    LanguageModelToolResult {
3716                        tool_use_id: tool_use_1.id.clone(),
3717                        tool_name: tool_use_1.name.clone(),
3718                        is_error: false,
3719                        content: "test".into(),
3720                        output: Some("test".into())
3721                    }
3722                )],
3723                cache: true,
3724                reasoning_details: None,
3725            },
3726        ]
3727    );
3728
3729    fake_model.send_last_completion_stream_text_chunk("Done");
3730    fake_model.end_last_completion_stream();
3731    cx.run_until_parked();
3732    events.collect::<Vec<_>>().await;
3733    thread.read_with(cx, |thread, _cx| {
3734        assert_eq!(
3735            thread.last_received_or_pending_message(),
3736            Some(Message::Agent(AgentMessage {
3737                content: vec![AgentMessageContent::Text("Done".into())],
3738                tool_results: IndexMap::default(),
3739                reasoning_details: None,
3740            }))
3741        );
3742    })
3743}
3744
3745#[gpui::test]
3746async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3747    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3748    let fake_model = model.as_fake();
3749
3750    let mut events = thread
3751        .update(cx, |thread, cx| {
3752            thread.send(UserMessageId::new(), ["Hello!"], cx)
3753        })
3754        .unwrap();
3755    cx.run_until_parked();
3756
3757    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3758        fake_model.send_last_completion_stream_error(
3759            LanguageModelCompletionError::ServerOverloaded {
3760                provider: LanguageModelProviderName::new("Anthropic"),
3761                retry_after: Some(Duration::from_secs(3)),
3762            },
3763        );
3764        fake_model.end_last_completion_stream();
3765        cx.executor().advance_clock(Duration::from_secs(3));
3766        cx.run_until_parked();
3767    }
3768
3769    let mut errors = Vec::new();
3770    let mut retry_events = Vec::new();
3771    while let Some(event) = events.next().await {
3772        match event {
3773            Ok(ThreadEvent::Retry(retry_status)) => {
3774                retry_events.push(retry_status);
3775            }
3776            Ok(ThreadEvent::Stop(..)) => break,
3777            Err(error) => errors.push(error),
3778            _ => {}
3779        }
3780    }
3781
3782    assert_eq!(
3783        retry_events.len(),
3784        crate::thread::MAX_RETRY_ATTEMPTS as usize
3785    );
3786    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3787        assert_eq!(retry_events[i].attempt, i + 1);
3788    }
3789    assert_eq!(errors.len(), 1);
3790    let error = errors[0]
3791        .downcast_ref::<LanguageModelCompletionError>()
3792        .unwrap();
3793    assert!(matches!(
3794        error,
3795        LanguageModelCompletionError::ServerOverloaded { .. }
3796    ));
3797}
3798
3799#[gpui::test]
3800async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3801    cx: &mut TestAppContext,
3802) {
3803    init_test(cx);
3804    always_allow_tools(cx);
3805
3806    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3807    let fake_model = model.as_fake();
3808
3809    thread.update(cx, |thread, _cx| {
3810        thread.add_tool(StreamingEchoTool::new());
3811    });
3812
3813    let _events = thread
3814        .update(cx, |thread, cx| {
3815            thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3816        })
3817        .unwrap();
3818    cx.run_until_parked();
3819
3820    // Send a partial tool use (is_input_complete = false), simulating the LLM
3821    // streaming input for a tool.
3822    let tool_use = LanguageModelToolUse {
3823        id: "tool_1".into(),
3824        name: "streaming_echo".into(),
3825        raw_input: r#"{"text": "partial"}"#.into(),
3826        input: json!({"text": "partial"}),
3827        is_input_complete: false,
3828        thought_signature: None,
3829    };
3830    fake_model
3831        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3832    cx.run_until_parked();
3833
3834    // Send a stream error WITHOUT ever sending is_input_complete = true.
3835    // Before the fix, this would deadlock: the tool waits for more partials
3836    // (or cancellation), run_turn_internal waits for the tool, and the sender
3837    // keeping the channel open lives inside RunningTurn.
3838    fake_model.send_last_completion_stream_error(
3839        LanguageModelCompletionError::UpstreamProviderError {
3840            message: "Internal server error".to_string(),
3841            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3842            retry_after: None,
3843        },
3844    );
3845    fake_model.end_last_completion_stream();
3846
3847    // Advance past the retry delay so run_turn_internal retries.
3848    cx.executor().advance_clock(Duration::from_secs(5));
3849    cx.run_until_parked();
3850
3851    // The retry request should contain the streaming tool's error result,
3852    // proving the tool terminated and its result was forwarded.
3853    let completion = fake_model
3854        .pending_completions()
3855        .pop()
3856        .expect("No running turn");
3857    assert_eq!(
3858        completion.messages[1..],
3859        vec![
3860            LanguageModelRequestMessage {
3861                role: Role::User,
3862                content: vec!["Use the streaming_echo tool".into()],
3863                cache: false,
3864                reasoning_details: None,
3865            },
3866            LanguageModelRequestMessage {
3867                role: Role::Assistant,
3868                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3869                cache: false,
3870                reasoning_details: None,
3871            },
3872            LanguageModelRequestMessage {
3873                role: Role::User,
3874                content: vec![language_model::MessageContent::ToolResult(
3875                    LanguageModelToolResult {
3876                        tool_use_id: tool_use.id.clone(),
3877                        tool_name: tool_use.name,
3878                        is_error: true,
3879                        content: "Failed to receive tool input: tool input was not fully received"
3880                            .into(),
3881                        output: Some(
3882                            "Failed to receive tool input: tool input was not fully received"
3883                                .into()
3884                        ),
3885                    }
3886                )],
3887                cache: true,
3888                reasoning_details: None,
3889            },
3890        ]
3891    );
3892
3893    // Finish the retry round so the turn completes cleanly.
3894    fake_model.send_last_completion_stream_text_chunk("Done");
3895    fake_model.end_last_completion_stream();
3896    cx.run_until_parked();
3897
3898    thread.read_with(cx, |thread, _cx| {
3899        assert!(
3900            thread.is_turn_complete(),
3901            "Thread should not be stuck; the turn should have completed",
3902        );
3903    });
3904}
3905
3906/// Filters out the stop events for asserting against in tests
3907fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3908    result_events
3909        .into_iter()
3910        .filter_map(|event| match event.unwrap() {
3911            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3912            _ => None,
3913        })
3914        .collect()
3915}
3916
3917struct ThreadTest {
3918    model: Arc<dyn LanguageModel>,
3919    thread: Entity<Thread>,
3920    project_context: Entity<ProjectContext>,
3921    context_server_store: Entity<ContextServerStore>,
3922    fs: Arc<FakeFs>,
3923}
3924
3925enum TestModel {
3926    Sonnet4,
3927    Fake,
3928}
3929
3930impl TestModel {
3931    fn id(&self) -> LanguageModelId {
3932        match self {
3933            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3934            TestModel::Fake => unreachable!(),
3935        }
3936    }
3937}
3938
3939async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3940    cx.executor().allow_parking();
3941
3942    let fs = FakeFs::new(cx.background_executor.clone());
3943    fs.create_dir(paths::settings_file().parent().unwrap())
3944        .await
3945        .unwrap();
3946    fs.insert_file(
3947        paths::settings_file(),
3948        json!({
3949            "agent": {
3950                "default_profile": "test-profile",
3951                "profiles": {
3952                    "test-profile": {
3953                        "name": "Test Profile",
3954                        "tools": {
3955                            EchoTool::NAME: true,
3956                            DelayTool::NAME: true,
3957                            WordListTool::NAME: true,
3958                            ToolRequiringPermission::NAME: true,
3959                            InfiniteTool::NAME: true,
3960                            CancellationAwareTool::NAME: true,
3961                            StreamingEchoTool::NAME: true,
3962                            StreamingFailingEchoTool::NAME: true,
3963                            TerminalTool::NAME: true,
3964                            UpdatePlanTool::NAME: true,
3965                        }
3966                    }
3967                }
3968            }
3969        })
3970        .to_string()
3971        .into_bytes(),
3972    )
3973    .await;
3974
3975    cx.update(|cx| {
3976        settings::init(cx);
3977
3978        match model {
3979            TestModel::Fake => {}
3980            TestModel::Sonnet4 => {
3981                gpui_tokio::init(cx);
3982                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3983                cx.set_http_client(Arc::new(http_client));
3984                let client = Client::production(cx);
3985                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3986                language_model::init(user_store.clone(), client.clone(), cx);
3987                language_models::init(user_store, client.clone(), cx);
3988            }
3989        };
3990
3991        watch_settings(fs.clone(), cx);
3992    });
3993
3994    let templates = Templates::new();
3995
3996    fs.insert_tree(path!("/test"), json!({})).await;
3997    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3998
3999    let model = cx
4000        .update(|cx| {
4001            if let TestModel::Fake = model {
4002                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
4003            } else {
4004                let model_id = model.id();
4005                let models = LanguageModelRegistry::read_global(cx);
4006                let model = models
4007                    .available_models(cx)
4008                    .find(|model| model.id() == model_id)
4009                    .unwrap();
4010
4011                let provider = models.provider(&model.provider_id()).unwrap();
4012                let authenticated = provider.authenticate(cx);
4013
4014                cx.spawn(async move |_cx| {
4015                    authenticated.await.unwrap();
4016                    model
4017                })
4018            }
4019        })
4020        .await;
4021
4022    let project_context = cx.new(|_cx| ProjectContext::default());
4023    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4024    let context_server_registry =
4025        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4026    let thread = cx.new(|cx| {
4027        Thread::new(
4028            project,
4029            project_context.clone(),
4030            context_server_registry,
4031            templates,
4032            Some(model.clone()),
4033            cx,
4034        )
4035    });
4036    ThreadTest {
4037        model,
4038        thread,
4039        project_context,
4040        context_server_store,
4041        fs,
4042    }
4043}
4044
4045#[cfg(test)]
4046#[ctor::ctor]
4047fn init_logger() {
4048    if std::env::var("RUST_LOG").is_ok() {
4049        env_logger::init();
4050    }
4051}
4052
4053fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4054    let fs = fs.clone();
4055    cx.spawn({
4056        async move |cx| {
4057            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4058                cx.background_executor(),
4059                fs,
4060                paths::settings_file().clone(),
4061            );
4062            let _watcher_task = watcher_task;
4063
4064            while let Some(new_settings_content) = new_settings_content_rx.next().await {
4065                cx.update(|cx| {
4066                    SettingsStore::update_global(cx, |settings, cx| {
4067                        settings.set_user_settings(&new_settings_content, cx)
4068                    })
4069                })
4070                .ok();
4071            }
4072        }
4073    })
4074    .detach();
4075}
4076
4077fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4078    completion
4079        .tools
4080        .iter()
4081        .map(|tool| tool.name.clone())
4082        .collect()
4083}
4084
4085fn setup_context_server(
4086    name: &'static str,
4087    tools: Vec<context_server::types::Tool>,
4088    context_server_store: &Entity<ContextServerStore>,
4089    cx: &mut TestAppContext,
4090) -> mpsc::UnboundedReceiver<(
4091    context_server::types::CallToolParams,
4092    oneshot::Sender<context_server::types::CallToolResponse>,
4093)> {
4094    cx.update(|cx| {
4095        let mut settings = ProjectSettings::get_global(cx).clone();
4096        settings.context_servers.insert(
4097            name.into(),
4098            project::project_settings::ContextServerSettings::Stdio {
4099                enabled: true,
4100                remote: false,
4101                command: ContextServerCommand {
4102                    path: "somebinary".into(),
4103                    args: Vec::new(),
4104                    env: None,
4105                    timeout: None,
4106                },
4107            },
4108        );
4109        ProjectSettings::override_global(settings, cx);
4110    });
4111
4112    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4113    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4114        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4115            context_server::types::InitializeResponse {
4116                protocol_version: context_server::types::ProtocolVersion(
4117                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4118                ),
4119                server_info: context_server::types::Implementation {
4120                    name: name.into(),
4121                    version: "1.0.0".to_string(),
4122                },
4123                capabilities: context_server::types::ServerCapabilities {
4124                    tools: Some(context_server::types::ToolsCapabilities {
4125                        list_changed: Some(true),
4126                    }),
4127                    ..Default::default()
4128                },
4129                meta: None,
4130            }
4131        })
4132        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4133            let tools = tools.clone();
4134            async move {
4135                context_server::types::ListToolsResponse {
4136                    tools,
4137                    next_cursor: None,
4138                    meta: None,
4139                }
4140            }
4141        })
4142        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4143            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4144            async move {
4145                let (response_tx, response_rx) = oneshot::channel();
4146                mcp_tool_calls_tx
4147                    .unbounded_send((params, response_tx))
4148                    .unwrap();
4149                response_rx.await.unwrap()
4150            }
4151        });
4152    context_server_store.update(cx, |store, cx| {
4153        store.start_server(
4154            Arc::new(ContextServer::new(
4155                ContextServerId(name.into()),
4156                Arc::new(fake_transport),
4157            )),
4158            cx,
4159        );
4160    });
4161    cx.run_until_parked();
4162    mcp_tool_calls_rx
4163}
4164
4165#[gpui::test]
4166async fn test_tokens_before_message(cx: &mut TestAppContext) {
4167    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4168    let fake_model = model.as_fake();
4169
4170    // First message
4171    let message_1_id = UserMessageId::new();
4172    thread
4173        .update(cx, |thread, cx| {
4174            thread.send(message_1_id.clone(), ["First message"], cx)
4175        })
4176        .unwrap();
4177    cx.run_until_parked();
4178
4179    // Before any response, tokens_before_message should return None for first message
4180    thread.read_with(cx, |thread, _| {
4181        assert_eq!(
4182            thread.tokens_before_message(&message_1_id),
4183            None,
4184            "First message should have no tokens before it"
4185        );
4186    });
4187
4188    // Complete first message with usage
4189    fake_model.send_last_completion_stream_text_chunk("Response 1");
4190    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4191        language_model::TokenUsage {
4192            input_tokens: 100,
4193            output_tokens: 50,
4194            cache_creation_input_tokens: 0,
4195            cache_read_input_tokens: 0,
4196        },
4197    ));
4198    fake_model.end_last_completion_stream();
4199    cx.run_until_parked();
4200
4201    // First message still has no tokens before it
4202    thread.read_with(cx, |thread, _| {
4203        assert_eq!(
4204            thread.tokens_before_message(&message_1_id),
4205            None,
4206            "First message should still have no tokens before it after response"
4207        );
4208    });
4209
4210    // Second message
4211    let message_2_id = UserMessageId::new();
4212    thread
4213        .update(cx, |thread, cx| {
4214            thread.send(message_2_id.clone(), ["Second message"], cx)
4215        })
4216        .unwrap();
4217    cx.run_until_parked();
4218
4219    // Second message should have first message's input tokens before it
4220    thread.read_with(cx, |thread, _| {
4221        assert_eq!(
4222            thread.tokens_before_message(&message_2_id),
4223            Some(100),
4224            "Second message should have 100 tokens before it (from first request)"
4225        );
4226    });
4227
4228    // Complete second message
4229    fake_model.send_last_completion_stream_text_chunk("Response 2");
4230    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4231        language_model::TokenUsage {
4232            input_tokens: 250, // Total for this request (includes previous context)
4233            output_tokens: 75,
4234            cache_creation_input_tokens: 0,
4235            cache_read_input_tokens: 0,
4236        },
4237    ));
4238    fake_model.end_last_completion_stream();
4239    cx.run_until_parked();
4240
4241    // Third message
4242    let message_3_id = UserMessageId::new();
4243    thread
4244        .update(cx, |thread, cx| {
4245            thread.send(message_3_id.clone(), ["Third message"], cx)
4246        })
4247        .unwrap();
4248    cx.run_until_parked();
4249
4250    // Third message should have second message's input tokens (250) before it
4251    thread.read_with(cx, |thread, _| {
4252        assert_eq!(
4253            thread.tokens_before_message(&message_3_id),
4254            Some(250),
4255            "Third message should have 250 tokens before it (from second request)"
4256        );
4257        // Second message should still have 100
4258        assert_eq!(
4259            thread.tokens_before_message(&message_2_id),
4260            Some(100),
4261            "Second message should still have 100 tokens before it"
4262        );
4263        // First message still has none
4264        assert_eq!(
4265            thread.tokens_before_message(&message_1_id),
4266            None,
4267            "First message should still have no tokens before it"
4268        );
4269    });
4270}
4271
4272#[gpui::test]
4273async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4274    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4275    let fake_model = model.as_fake();
4276
4277    // Set up three messages with responses
4278    let message_1_id = UserMessageId::new();
4279    thread
4280        .update(cx, |thread, cx| {
4281            thread.send(message_1_id.clone(), ["Message 1"], cx)
4282        })
4283        .unwrap();
4284    cx.run_until_parked();
4285    fake_model.send_last_completion_stream_text_chunk("Response 1");
4286    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4287        language_model::TokenUsage {
4288            input_tokens: 100,
4289            output_tokens: 50,
4290            cache_creation_input_tokens: 0,
4291            cache_read_input_tokens: 0,
4292        },
4293    ));
4294    fake_model.end_last_completion_stream();
4295    cx.run_until_parked();
4296
4297    let message_2_id = UserMessageId::new();
4298    thread
4299        .update(cx, |thread, cx| {
4300            thread.send(message_2_id.clone(), ["Message 2"], cx)
4301        })
4302        .unwrap();
4303    cx.run_until_parked();
4304    fake_model.send_last_completion_stream_text_chunk("Response 2");
4305    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4306        language_model::TokenUsage {
4307            input_tokens: 250,
4308            output_tokens: 75,
4309            cache_creation_input_tokens: 0,
4310            cache_read_input_tokens: 0,
4311        },
4312    ));
4313    fake_model.end_last_completion_stream();
4314    cx.run_until_parked();
4315
4316    // Verify initial state
4317    thread.read_with(cx, |thread, _| {
4318        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4319    });
4320
4321    // Truncate at message 2 (removes message 2 and everything after)
4322    thread
4323        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4324        .unwrap();
4325    cx.run_until_parked();
4326
4327    // After truncation, message_2_id no longer exists, so lookup should return None
4328    thread.read_with(cx, |thread, _| {
4329        assert_eq!(
4330            thread.tokens_before_message(&message_2_id),
4331            None,
4332            "After truncation, message 2 no longer exists"
4333        );
4334        // Message 1 still exists but has no tokens before it
4335        assert_eq!(
4336            thread.tokens_before_message(&message_1_id),
4337            None,
4338            "First message still has no tokens before it"
4339        );
4340    });
4341}
4342
4343#[gpui::test]
4344async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4345    init_test(cx);
4346
4347    let fs = FakeFs::new(cx.executor());
4348    fs.insert_tree("/root", json!({})).await;
4349    let project = Project::test(fs, ["/root".as_ref()], cx).await;
4350
4351    // Test 1: Deny rule blocks command
4352    {
4353        let environment = Rc::new(cx.update(|cx| {
4354            FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4355        }));
4356
4357        cx.update(|cx| {
4358            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4359            settings.tool_permissions.tools.insert(
4360                TerminalTool::NAME.into(),
4361                agent_settings::ToolRules {
4362                    default: Some(settings::ToolPermissionMode::Confirm),
4363                    always_allow: vec![],
4364                    always_deny: vec![
4365                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4366                    ],
4367                    always_confirm: vec![],
4368                    invalid_patterns: vec![],
4369                },
4370            );
4371            agent_settings::AgentSettings::override_global(settings, cx);
4372        });
4373
4374        #[allow(clippy::arc_with_non_send_sync)]
4375        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4376        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4377
4378        let task = cx.update(|cx| {
4379            tool.run(
4380                ToolInput::resolved(crate::TerminalToolInput {
4381                    command: "rm -rf /".to_string(),
4382                    cd: ".".to_string(),
4383                    timeout_ms: None,
4384                }),
4385                event_stream,
4386                cx,
4387            )
4388        });
4389
4390        let result = task.await;
4391        assert!(
4392            result.is_err(),
4393            "expected command to be blocked by deny rule"
4394        );
4395        let err_msg = result.unwrap_err().to_lowercase();
4396        assert!(
4397            err_msg.contains("blocked"),
4398            "error should mention the command was blocked"
4399        );
4400    }
4401
4402    // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4403    {
4404        let environment = Rc::new(cx.update(|cx| {
4405            FakeThreadEnvironment::default()
4406                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4407        }));
4408
4409        cx.update(|cx| {
4410            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4411            settings.tool_permissions.tools.insert(
4412                TerminalTool::NAME.into(),
4413                agent_settings::ToolRules {
4414                    default: Some(settings::ToolPermissionMode::Deny),
4415                    always_allow: vec![
4416                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4417                    ],
4418                    always_deny: vec![],
4419                    always_confirm: vec![],
4420                    invalid_patterns: vec![],
4421                },
4422            );
4423            agent_settings::AgentSettings::override_global(settings, cx);
4424        });
4425
4426        #[allow(clippy::arc_with_non_send_sync)]
4427        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4428        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4429
4430        let task = cx.update(|cx| {
4431            tool.run(
4432                ToolInput::resolved(crate::TerminalToolInput {
4433                    command: "echo hello".to_string(),
4434                    cd: ".".to_string(),
4435                    timeout_ms: None,
4436                }),
4437                event_stream,
4438                cx,
4439            )
4440        });
4441
4442        let update = rx.expect_update_fields().await;
4443        assert!(
4444            update.content.iter().any(|blocks| {
4445                blocks
4446                    .iter()
4447                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4448            }),
4449            "expected terminal content (allow rule should skip confirmation and override default deny)"
4450        );
4451
4452        let result = task.await;
4453        assert!(
4454            result.is_ok(),
4455            "expected command to succeed without confirmation"
4456        );
4457    }
4458
4459    // Test 3: global default: allow does NOT override always_confirm patterns
4460    {
4461        let environment = Rc::new(cx.update(|cx| {
4462            FakeThreadEnvironment::default()
4463                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4464        }));
4465
4466        cx.update(|cx| {
4467            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4468            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4469            settings.tool_permissions.tools.insert(
4470                TerminalTool::NAME.into(),
4471                agent_settings::ToolRules {
4472                    default: Some(settings::ToolPermissionMode::Allow),
4473                    always_allow: vec![],
4474                    always_deny: vec![],
4475                    always_confirm: vec![
4476                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4477                    ],
4478                    invalid_patterns: vec![],
4479                },
4480            );
4481            agent_settings::AgentSettings::override_global(settings, cx);
4482        });
4483
4484        #[allow(clippy::arc_with_non_send_sync)]
4485        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4486        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4487
4488        let _task = cx.update(|cx| {
4489            tool.run(
4490                ToolInput::resolved(crate::TerminalToolInput {
4491                    command: "sudo rm file".to_string(),
4492                    cd: ".".to_string(),
4493                    timeout_ms: None,
4494                }),
4495                event_stream,
4496                cx,
4497            )
4498        });
4499
4500        // With global default: allow, confirm patterns are still respected
4501        // The expect_authorization() call will panic if no authorization is requested,
4502        // which validates that the confirm pattern still triggers confirmation
4503        let _auth = rx.expect_authorization().await;
4504
4505        drop(_task);
4506    }
4507
4508    // Test 4: tool-specific default: deny is respected even with global default: allow
4509    {
4510        let environment = Rc::new(cx.update(|cx| {
4511            FakeThreadEnvironment::default()
4512                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4513        }));
4514
4515        cx.update(|cx| {
4516            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4517            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4518            settings.tool_permissions.tools.insert(
4519                TerminalTool::NAME.into(),
4520                agent_settings::ToolRules {
4521                    default: Some(settings::ToolPermissionMode::Deny),
4522                    always_allow: vec![],
4523                    always_deny: vec![],
4524                    always_confirm: vec![],
4525                    invalid_patterns: vec![],
4526                },
4527            );
4528            agent_settings::AgentSettings::override_global(settings, cx);
4529        });
4530
4531        #[allow(clippy::arc_with_non_send_sync)]
4532        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4533        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4534
4535        let task = cx.update(|cx| {
4536            tool.run(
4537                ToolInput::resolved(crate::TerminalToolInput {
4538                    command: "echo hello".to_string(),
4539                    cd: ".".to_string(),
4540                    timeout_ms: None,
4541                }),
4542                event_stream,
4543                cx,
4544            )
4545        });
4546
4547        // tool-specific default: deny is respected even with global default: allow
4548        let result = task.await;
4549        assert!(
4550            result.is_err(),
4551            "expected command to be blocked by tool-specific deny default"
4552        );
4553        let err_msg = result.unwrap_err().to_lowercase();
4554        assert!(
4555            err_msg.contains("disabled"),
4556            "error should mention the tool is disabled, got: {err_msg}"
4557        );
4558    }
4559}
4560
4561#[gpui::test]
4562async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4563    init_test(cx);
4564    cx.update(|cx| {
4565        LanguageModelRegistry::test(cx);
4566    });
4567    cx.update(|cx| {
4568        cx.update_flags(true, vec!["subagents".to_string()]);
4569    });
4570
4571    let fs = FakeFs::new(cx.executor());
4572    fs.insert_tree(
4573        "/",
4574        json!({
4575            "a": {
4576                "b.md": "Lorem"
4577            }
4578        }),
4579    )
4580    .await;
4581    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4582    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4583    let agent = cx.update(|cx| {
4584        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4585    });
4586    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4587
4588    let acp_thread = cx
4589        .update(|cx| {
4590            connection
4591                .clone()
4592                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4593        })
4594        .await
4595        .unwrap();
4596    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4597    let thread = agent.read_with(cx, |agent, _| {
4598        agent.sessions.get(&session_id).unwrap().thread.clone()
4599    });
4600    let model = Arc::new(FakeLanguageModel::default());
4601
4602    // Ensure empty threads are not saved, even if they get mutated.
4603    thread.update(cx, |thread, cx| {
4604        thread.set_model(model.clone(), cx);
4605    });
4606    cx.run_until_parked();
4607
4608    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4609    cx.run_until_parked();
4610    model.send_last_completion_stream_text_chunk("spawning subagent");
4611    let subagent_tool_input = SpawnAgentToolInput {
4612        label: "label".to_string(),
4613        message: "subagent task prompt".to_string(),
4614        session_id: None,
4615    };
4616    let subagent_tool_use = LanguageModelToolUse {
4617        id: "subagent_1".into(),
4618        name: SpawnAgentTool::NAME.into(),
4619        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4620        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4621        is_input_complete: true,
4622        thought_signature: None,
4623    };
4624    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4625        subagent_tool_use,
4626    ));
4627    model.end_last_completion_stream();
4628
4629    cx.run_until_parked();
4630
4631    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4632        thread
4633            .running_subagent_ids(cx)
4634            .get(0)
4635            .expect("subagent thread should be running")
4636            .clone()
4637    });
4638
4639    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4640        agent
4641            .sessions
4642            .get(&subagent_session_id)
4643            .expect("subagent session should exist")
4644            .acp_thread
4645            .clone()
4646    });
4647
4648    model.send_last_completion_stream_text_chunk("subagent task response");
4649    model.end_last_completion_stream();
4650
4651    cx.run_until_parked();
4652
4653    assert_eq!(
4654        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4655        indoc! {"
4656            ## User
4657
4658            subagent task prompt
4659
4660            ## Assistant
4661
4662            subagent task response
4663
4664        "}
4665    );
4666
4667    model.send_last_completion_stream_text_chunk("Response");
4668    model.end_last_completion_stream();
4669
4670    send.await.unwrap();
4671
4672    assert_eq!(
4673        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4674        indoc! {r#"
4675            ## User
4676
4677            Prompt
4678
4679            ## Assistant
4680
4681            spawning subagent
4682
4683            **Tool Call: label**
4684            Status: Completed
4685
4686            subagent task response
4687
4688            ## Assistant
4689
4690            Response
4691
4692        "#},
4693    );
4694}
4695
4696#[gpui::test]
4697async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4698    init_test(cx);
4699    cx.update(|cx| {
4700        LanguageModelRegistry::test(cx);
4701    });
4702    cx.update(|cx| {
4703        cx.update_flags(true, vec!["subagents".to_string()]);
4704    });
4705
4706    let fs = FakeFs::new(cx.executor());
4707    fs.insert_tree(
4708        "/",
4709        json!({
4710            "a": {
4711                "b.md": "Lorem"
4712            }
4713        }),
4714    )
4715    .await;
4716    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4717    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4718    let agent = cx.update(|cx| {
4719        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4720    });
4721    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4722
4723    let acp_thread = cx
4724        .update(|cx| {
4725            connection
4726                .clone()
4727                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4728        })
4729        .await
4730        .unwrap();
4731    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4732    let thread = agent.read_with(cx, |agent, _| {
4733        agent.sessions.get(&session_id).unwrap().thread.clone()
4734    });
4735    let model = Arc::new(FakeLanguageModel::default());
4736
4737    // Ensure empty threads are not saved, even if they get mutated.
4738    thread.update(cx, |thread, cx| {
4739        thread.set_model(model.clone(), cx);
4740    });
4741    cx.run_until_parked();
4742
4743    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4744    cx.run_until_parked();
4745    model.send_last_completion_stream_text_chunk("spawning subagent");
4746    let subagent_tool_input = SpawnAgentToolInput {
4747        label: "label".to_string(),
4748        message: "subagent task prompt".to_string(),
4749        session_id: None,
4750    };
4751    let subagent_tool_use = LanguageModelToolUse {
4752        id: "subagent_1".into(),
4753        name: SpawnAgentTool::NAME.into(),
4754        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4755        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4756        is_input_complete: true,
4757        thought_signature: None,
4758    };
4759    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4760        subagent_tool_use,
4761    ));
4762    model.end_last_completion_stream();
4763
4764    cx.run_until_parked();
4765
4766    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4767        thread
4768            .running_subagent_ids(cx)
4769            .get(0)
4770            .expect("subagent thread should be running")
4771            .clone()
4772    });
4773
4774    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4775        agent
4776            .sessions
4777            .get(&subagent_session_id)
4778            .expect("subagent session should exist")
4779            .acp_thread
4780            .clone()
4781    });
4782
4783    model.send_last_completion_stream_text_chunk("subagent task response 1");
4784    model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4785        text: "thinking more about the subagent task".into(),
4786        signature: None,
4787    });
4788    model.send_last_completion_stream_text_chunk("subagent task response 2");
4789    model.end_last_completion_stream();
4790
4791    cx.run_until_parked();
4792
4793    assert_eq!(
4794        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4795        indoc! {"
4796            ## User
4797
4798            subagent task prompt
4799
4800            ## Assistant
4801
4802            subagent task response 1
4803
4804            <thinking>
4805            thinking more about the subagent task
4806            </thinking>
4807
4808            subagent task response 2
4809
4810        "}
4811    );
4812
4813    model.send_last_completion_stream_text_chunk("Response");
4814    model.end_last_completion_stream();
4815
4816    send.await.unwrap();
4817
4818    assert_eq!(
4819        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4820        indoc! {r#"
4821            ## User
4822
4823            Prompt
4824
4825            ## Assistant
4826
4827            spawning subagent
4828
4829            **Tool Call: label**
4830            Status: Completed
4831
4832            subagent task response 1
4833
4834            subagent task response 2
4835
4836            ## Assistant
4837
4838            Response
4839
4840        "#},
4841    );
4842}
4843
4844#[gpui::test]
4845async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4846    init_test(cx);
4847    cx.update(|cx| {
4848        LanguageModelRegistry::test(cx);
4849    });
4850    cx.update(|cx| {
4851        cx.update_flags(true, vec!["subagents".to_string()]);
4852    });
4853
4854    let fs = FakeFs::new(cx.executor());
4855    fs.insert_tree(
4856        "/",
4857        json!({
4858            "a": {
4859                "b.md": "Lorem"
4860            }
4861        }),
4862    )
4863    .await;
4864    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4865    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4866    let agent = cx.update(|cx| {
4867        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4868    });
4869    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4870
4871    let acp_thread = cx
4872        .update(|cx| {
4873            connection
4874                .clone()
4875                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4876        })
4877        .await
4878        .unwrap();
4879    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4880    let thread = agent.read_with(cx, |agent, _| {
4881        agent.sessions.get(&session_id).unwrap().thread.clone()
4882    });
4883    let model = Arc::new(FakeLanguageModel::default());
4884
4885    // Ensure empty threads are not saved, even if they get mutated.
4886    thread.update(cx, |thread, cx| {
4887        thread.set_model(model.clone(), cx);
4888    });
4889    cx.run_until_parked();
4890
4891    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4892    cx.run_until_parked();
4893    model.send_last_completion_stream_text_chunk("spawning subagent");
4894    let subagent_tool_input = SpawnAgentToolInput {
4895        label: "label".to_string(),
4896        message: "subagent task prompt".to_string(),
4897        session_id: None,
4898    };
4899    let subagent_tool_use = LanguageModelToolUse {
4900        id: "subagent_1".into(),
4901        name: SpawnAgentTool::NAME.into(),
4902        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4903        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4904        is_input_complete: true,
4905        thought_signature: None,
4906    };
4907    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4908        subagent_tool_use,
4909    ));
4910    model.end_last_completion_stream();
4911
4912    cx.run_until_parked();
4913
4914    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4915        thread
4916            .running_subagent_ids(cx)
4917            .get(0)
4918            .expect("subagent thread should be running")
4919            .clone()
4920    });
4921    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4922        agent
4923            .sessions
4924            .get(&subagent_session_id)
4925            .expect("subagent session should exist")
4926            .acp_thread
4927            .clone()
4928    });
4929
4930    // model.send_last_completion_stream_text_chunk("subagent task response");
4931    // model.end_last_completion_stream();
4932
4933    // cx.run_until_parked();
4934
4935    acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4936
4937    cx.run_until_parked();
4938
4939    send.await.unwrap();
4940
4941    acp_thread.read_with(cx, |thread, cx| {
4942        assert_eq!(thread.status(), ThreadStatus::Idle);
4943        assert_eq!(
4944            thread.to_markdown(cx),
4945            indoc! {"
4946                ## User
4947
4948                Prompt
4949
4950                ## Assistant
4951
4952                spawning subagent
4953
4954                **Tool Call: label**
4955                Status: Canceled
4956
4957            "}
4958        );
4959    });
4960    subagent_acp_thread.read_with(cx, |thread, cx| {
4961        assert_eq!(thread.status(), ThreadStatus::Idle);
4962        assert_eq!(
4963            thread.to_markdown(cx),
4964            indoc! {"
4965                ## User
4966
4967                subagent task prompt
4968
4969            "}
4970        );
4971    });
4972}
4973
4974#[gpui::test]
4975async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4976    init_test(cx);
4977    cx.update(|cx| {
4978        LanguageModelRegistry::test(cx);
4979    });
4980    cx.update(|cx| {
4981        cx.update_flags(true, vec!["subagents".to_string()]);
4982    });
4983
4984    let fs = FakeFs::new(cx.executor());
4985    fs.insert_tree(
4986        "/",
4987        json!({
4988            "a": {
4989                "b.md": "Lorem"
4990            }
4991        }),
4992    )
4993    .await;
4994    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4995    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4996    let agent = cx.update(|cx| {
4997        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4998    });
4999    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5000
5001    let acp_thread = cx
5002        .update(|cx| {
5003            connection
5004                .clone()
5005                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5006        })
5007        .await
5008        .unwrap();
5009    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5010    let thread = agent.read_with(cx, |agent, _| {
5011        agent.sessions.get(&session_id).unwrap().thread.clone()
5012    });
5013    let model = Arc::new(FakeLanguageModel::default());
5014
5015    thread.update(cx, |thread, cx| {
5016        thread.set_model(model.clone(), cx);
5017    });
5018    cx.run_until_parked();
5019
5020    // === First turn: create subagent ===
5021    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5022    cx.run_until_parked();
5023    model.send_last_completion_stream_text_chunk("spawning subagent");
5024    let subagent_tool_input = SpawnAgentToolInput {
5025        label: "initial task".to_string(),
5026        message: "do the first task".to_string(),
5027        session_id: None,
5028    };
5029    let subagent_tool_use = LanguageModelToolUse {
5030        id: "subagent_1".into(),
5031        name: SpawnAgentTool::NAME.into(),
5032        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5033        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5034        is_input_complete: true,
5035        thought_signature: None,
5036    };
5037    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5038        subagent_tool_use,
5039    ));
5040    model.end_last_completion_stream();
5041
5042    cx.run_until_parked();
5043
5044    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5045        thread
5046            .running_subagent_ids(cx)
5047            .get(0)
5048            .expect("subagent thread should be running")
5049            .clone()
5050    });
5051
5052    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5053        agent
5054            .sessions
5055            .get(&subagent_session_id)
5056            .expect("subagent session should exist")
5057            .acp_thread
5058            .clone()
5059    });
5060
5061    // Subagent responds
5062    model.send_last_completion_stream_text_chunk("first task response");
5063    model.end_last_completion_stream();
5064
5065    cx.run_until_parked();
5066
5067    // Parent model responds to complete first turn
5068    model.send_last_completion_stream_text_chunk("First response");
5069    model.end_last_completion_stream();
5070
5071    send.await.unwrap();
5072
5073    // Verify subagent is no longer running
5074    thread.read_with(cx, |thread, cx| {
5075        assert!(
5076            thread.running_subagent_ids(cx).is_empty(),
5077            "subagent should not be running after completion"
5078        );
5079    });
5080
5081    // === Second turn: resume subagent with session_id ===
5082    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5083    cx.run_until_parked();
5084    model.send_last_completion_stream_text_chunk("resuming subagent");
5085    let resume_tool_input = SpawnAgentToolInput {
5086        label: "follow-up task".to_string(),
5087        message: "do the follow-up task".to_string(),
5088        session_id: Some(subagent_session_id.clone()),
5089    };
5090    let resume_tool_use = LanguageModelToolUse {
5091        id: "subagent_2".into(),
5092        name: SpawnAgentTool::NAME.into(),
5093        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5094        input: serde_json::to_value(&resume_tool_input).unwrap(),
5095        is_input_complete: true,
5096        thought_signature: None,
5097    };
5098    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5099    model.end_last_completion_stream();
5100
5101    cx.run_until_parked();
5102
5103    // Subagent should be running again with the same session
5104    thread.read_with(cx, |thread, cx| {
5105        let running = thread.running_subagent_ids(cx);
5106        assert_eq!(running.len(), 1, "subagent should be running");
5107        assert_eq!(running[0], subagent_session_id, "should be same session");
5108    });
5109
5110    // Subagent responds to follow-up
5111    model.send_last_completion_stream_text_chunk("follow-up task response");
5112    model.end_last_completion_stream();
5113
5114    cx.run_until_parked();
5115
5116    // Parent model responds to complete second turn
5117    model.send_last_completion_stream_text_chunk("Second response");
5118    model.end_last_completion_stream();
5119
5120    send2.await.unwrap();
5121
5122    // Verify subagent is no longer running
5123    thread.read_with(cx, |thread, cx| {
5124        assert!(
5125            thread.running_subagent_ids(cx).is_empty(),
5126            "subagent should not be running after resume completion"
5127        );
5128    });
5129
5130    // Verify the subagent's acp thread has both conversation turns
5131    assert_eq!(
5132        subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5133        indoc! {"
5134            ## User
5135
5136            do the first task
5137
5138            ## Assistant
5139
5140            first task response
5141
5142            ## User
5143
5144            do the follow-up task
5145
5146            ## Assistant
5147
5148            follow-up task response
5149
5150        "}
5151    );
5152}
5153
5154#[gpui::test]
5155async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5156    init_test(cx);
5157
5158    cx.update(|cx| {
5159        cx.update_flags(true, vec!["subagents".to_string()]);
5160    });
5161
5162    let fs = FakeFs::new(cx.executor());
5163    fs.insert_tree(path!("/test"), json!({})).await;
5164    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5165    let project_context = cx.new(|_cx| ProjectContext::default());
5166    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5167    let context_server_registry =
5168        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5169    let model = Arc::new(FakeLanguageModel::default());
5170
5171    let parent_thread = cx.new(|cx| {
5172        Thread::new(
5173            project.clone(),
5174            project_context,
5175            context_server_registry,
5176            Templates::new(),
5177            Some(model.clone()),
5178            cx,
5179        )
5180    });
5181
5182    let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5183    subagent_thread.read_with(cx, |subagent_thread, cx| {
5184        assert!(subagent_thread.is_subagent());
5185        assert_eq!(subagent_thread.depth(), 1);
5186        assert_eq!(
5187            subagent_thread.model().map(|model| model.id()),
5188            Some(model.id())
5189        );
5190        assert_eq!(
5191            subagent_thread.parent_thread_id(),
5192            Some(parent_thread.read(cx).id().clone())
5193        );
5194
5195        let request = subagent_thread
5196            .build_completion_request(CompletionIntent::UserPrompt, cx)
5197            .unwrap();
5198        assert_eq!(request.intent, Some(CompletionIntent::Subagent));
5199    });
5200}
5201
5202#[gpui::test]
5203async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5204    init_test(cx);
5205
5206    cx.update(|cx| {
5207        cx.update_flags(true, vec!["subagents".to_string()]);
5208    });
5209
5210    let fs = FakeFs::new(cx.executor());
5211    fs.insert_tree(path!("/test"), json!({})).await;
5212    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5213    let project_context = cx.new(|_cx| ProjectContext::default());
5214    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5215    let context_server_registry =
5216        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5217    let model = Arc::new(FakeLanguageModel::default());
5218    let environment = Rc::new(cx.update(|cx| {
5219        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5220    }));
5221
5222    let deep_parent_thread = cx.new(|cx| {
5223        let mut thread = Thread::new(
5224            project.clone(),
5225            project_context,
5226            context_server_registry,
5227            Templates::new(),
5228            Some(model.clone()),
5229            cx,
5230        );
5231        thread.set_subagent_context(SubagentContext {
5232            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5233            depth: MAX_SUBAGENT_DEPTH - 1,
5234        });
5235        thread
5236    });
5237    let deep_subagent_thread = cx.new(|cx| {
5238        let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5239        thread.add_default_tools(environment, cx);
5240        thread
5241    });
5242
5243    deep_subagent_thread.read_with(cx, |thread, _| {
5244        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5245        assert!(
5246            !thread.has_registered_tool(SpawnAgentTool::NAME),
5247            "subagent tool should not be present at max depth"
5248        );
5249    });
5250}
5251
5252#[gpui::test]
5253async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5254    init_test(cx);
5255
5256    cx.update(|cx| {
5257        cx.update_flags(true, vec!["subagents".to_string()]);
5258    });
5259
5260    let fs = FakeFs::new(cx.executor());
5261    fs.insert_tree(path!("/test"), json!({})).await;
5262    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5263    let project_context = cx.new(|_cx| ProjectContext::default());
5264    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5265    let context_server_registry =
5266        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5267    let model = Arc::new(FakeLanguageModel::default());
5268
5269    let parent = cx.new(|cx| {
5270        Thread::new(
5271            project.clone(),
5272            project_context.clone(),
5273            context_server_registry.clone(),
5274            Templates::new(),
5275            Some(model.clone()),
5276            cx,
5277        )
5278    });
5279
5280    let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5281
5282    parent.update(cx, |thread, _cx| {
5283        thread.register_running_subagent(subagent.downgrade());
5284    });
5285
5286    subagent
5287        .update(cx, |thread, cx| {
5288            thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5289        })
5290        .unwrap();
5291    cx.run_until_parked();
5292
5293    subagent.read_with(cx, |thread, _| {
5294        assert!(!thread.is_turn_complete(), "subagent should be running");
5295    });
5296
5297    parent.update(cx, |thread, cx| {
5298        thread.cancel(cx).detach();
5299    });
5300
5301    subagent.read_with(cx, |thread, _| {
5302        assert!(
5303            thread.is_turn_complete(),
5304            "subagent should be cancelled when parent cancels"
5305        );
5306    });
5307}
5308
5309#[gpui::test]
5310async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5311    init_test(cx);
5312    cx.update(|cx| {
5313        LanguageModelRegistry::test(cx);
5314    });
5315    cx.update(|cx| {
5316        cx.update_flags(true, vec!["subagents".to_string()]);
5317    });
5318
5319    let fs = FakeFs::new(cx.executor());
5320    fs.insert_tree(
5321        "/",
5322        json!({
5323            "a": {
5324                "b.md": "Lorem"
5325            }
5326        }),
5327    )
5328    .await;
5329    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5330    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5331    let agent = cx.update(|cx| {
5332        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5333    });
5334    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5335
5336    let acp_thread = cx
5337        .update(|cx| {
5338            connection
5339                .clone()
5340                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5341        })
5342        .await
5343        .unwrap();
5344    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5345    let thread = agent.read_with(cx, |agent, _| {
5346        agent.sessions.get(&session_id).unwrap().thread.clone()
5347    });
5348    let model = Arc::new(FakeLanguageModel::default());
5349
5350    thread.update(cx, |thread, cx| {
5351        thread.set_model(model.clone(), cx);
5352    });
5353    cx.run_until_parked();
5354
5355    // Start the parent turn
5356    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5357    cx.run_until_parked();
5358    model.send_last_completion_stream_text_chunk("spawning subagent");
5359    let subagent_tool_input = SpawnAgentToolInput {
5360        label: "label".to_string(),
5361        message: "subagent task prompt".to_string(),
5362        session_id: None,
5363    };
5364    let subagent_tool_use = LanguageModelToolUse {
5365        id: "subagent_1".into(),
5366        name: SpawnAgentTool::NAME.into(),
5367        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5368        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5369        is_input_complete: true,
5370        thought_signature: None,
5371    };
5372    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5373        subagent_tool_use,
5374    ));
5375    model.end_last_completion_stream();
5376
5377    cx.run_until_parked();
5378
5379    // Verify subagent is running
5380    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5381        thread
5382            .running_subagent_ids(cx)
5383            .get(0)
5384            .expect("subagent thread should be running")
5385            .clone()
5386    });
5387
5388    // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5389    model.send_last_completion_stream_text_chunk("partial work");
5390    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5391        TokenUsage {
5392            input_tokens: 850_000,
5393            output_tokens: 0,
5394            cache_creation_input_tokens: 0,
5395            cache_read_input_tokens: 0,
5396        },
5397    ));
5398
5399    cx.run_until_parked();
5400
5401    // The subagent should no longer be running
5402    thread.read_with(cx, |thread, cx| {
5403        assert!(
5404            thread.running_subagent_ids(cx).is_empty(),
5405            "subagent should be stopped after context window warning"
5406        );
5407    });
5408
5409    // The parent model should get a new completion request to respond to the tool error
5410    model.send_last_completion_stream_text_chunk("Response after warning");
5411    model.end_last_completion_stream();
5412
5413    send.await.unwrap();
5414
5415    // Verify the parent thread shows the warning error in the tool call
5416    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5417    assert!(
5418        markdown.contains("nearing the end of its context window"),
5419        "tool output should contain context window warning message, got:\n{markdown}"
5420    );
5421    assert!(
5422        markdown.contains("Status: Failed"),
5423        "tool call should have Failed status, got:\n{markdown}"
5424    );
5425
5426    // Verify the subagent session still exists (can be resumed)
5427    agent.read_with(cx, |agent, _cx| {
5428        assert!(
5429            agent.sessions.contains_key(&subagent_session_id),
5430            "subagent session should still exist for potential resume"
5431        );
5432    });
5433}
5434
5435#[gpui::test]
5436async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5437    init_test(cx);
5438    cx.update(|cx| {
5439        LanguageModelRegistry::test(cx);
5440    });
5441    cx.update(|cx| {
5442        cx.update_flags(true, vec!["subagents".to_string()]);
5443    });
5444
5445    let fs = FakeFs::new(cx.executor());
5446    fs.insert_tree(
5447        "/",
5448        json!({
5449            "a": {
5450                "b.md": "Lorem"
5451            }
5452        }),
5453    )
5454    .await;
5455    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5456    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5457    let agent = cx.update(|cx| {
5458        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5459    });
5460    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5461
5462    let acp_thread = cx
5463        .update(|cx| {
5464            connection
5465                .clone()
5466                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5467        })
5468        .await
5469        .unwrap();
5470    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5471    let thread = agent.read_with(cx, |agent, _| {
5472        agent.sessions.get(&session_id).unwrap().thread.clone()
5473    });
5474    let model = Arc::new(FakeLanguageModel::default());
5475
5476    thread.update(cx, |thread, cx| {
5477        thread.set_model(model.clone(), cx);
5478    });
5479    cx.run_until_parked();
5480
5481    // === First turn: create subagent, trigger context window warning ===
5482    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5483    cx.run_until_parked();
5484    model.send_last_completion_stream_text_chunk("spawning subagent");
5485    let subagent_tool_input = SpawnAgentToolInput {
5486        label: "initial task".to_string(),
5487        message: "do the first task".to_string(),
5488        session_id: None,
5489    };
5490    let subagent_tool_use = LanguageModelToolUse {
5491        id: "subagent_1".into(),
5492        name: SpawnAgentTool::NAME.into(),
5493        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5494        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5495        is_input_complete: true,
5496        thought_signature: None,
5497    };
5498    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5499        subagent_tool_use,
5500    ));
5501    model.end_last_completion_stream();
5502
5503    cx.run_until_parked();
5504
5505    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5506        thread
5507            .running_subagent_ids(cx)
5508            .get(0)
5509            .expect("subagent thread should be running")
5510            .clone()
5511    });
5512
5513    // Subagent sends a usage update that crosses the warning threshold.
5514    // This triggers Normal→Warning, stopping the subagent.
5515    model.send_last_completion_stream_text_chunk("partial work");
5516    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5517        TokenUsage {
5518            input_tokens: 850_000,
5519            output_tokens: 0,
5520            cache_creation_input_tokens: 0,
5521            cache_read_input_tokens: 0,
5522        },
5523    ));
5524
5525    cx.run_until_parked();
5526
5527    // Verify the first turn was stopped with a context window warning
5528    thread.read_with(cx, |thread, cx| {
5529        assert!(
5530            thread.running_subagent_ids(cx).is_empty(),
5531            "subagent should be stopped after context window warning"
5532        );
5533    });
5534
5535    // Parent model responds to complete first turn
5536    model.send_last_completion_stream_text_chunk("First response");
5537    model.end_last_completion_stream();
5538
5539    send.await.unwrap();
5540
5541    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5542    assert!(
5543        markdown.contains("nearing the end of its context window"),
5544        "first turn should have context window warning, got:\n{markdown}"
5545    );
5546
5547    // === Second turn: resume the same subagent (now at Warning level) ===
5548    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5549    cx.run_until_parked();
5550    model.send_last_completion_stream_text_chunk("resuming subagent");
5551    let resume_tool_input = SpawnAgentToolInput {
5552        label: "follow-up task".to_string(),
5553        message: "do the follow-up task".to_string(),
5554        session_id: Some(subagent_session_id.clone()),
5555    };
5556    let resume_tool_use = LanguageModelToolUse {
5557        id: "subagent_2".into(),
5558        name: SpawnAgentTool::NAME.into(),
5559        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5560        input: serde_json::to_value(&resume_tool_input).unwrap(),
5561        is_input_complete: true,
5562        thought_signature: None,
5563    };
5564    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5565    model.end_last_completion_stream();
5566
5567    cx.run_until_parked();
5568
5569    // Subagent responds with tokens still at warning level (no worse).
5570    // Since ratio_before_prompt was already Warning, this should NOT
5571    // trigger the context window warning again.
5572    model.send_last_completion_stream_text_chunk("follow-up task response");
5573    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5574        TokenUsage {
5575            input_tokens: 870_000,
5576            output_tokens: 0,
5577            cache_creation_input_tokens: 0,
5578            cache_read_input_tokens: 0,
5579        },
5580    ));
5581    model.end_last_completion_stream();
5582
5583    cx.run_until_parked();
5584
5585    // Parent model responds to complete second turn
5586    model.send_last_completion_stream_text_chunk("Second response");
5587    model.end_last_completion_stream();
5588
5589    send2.await.unwrap();
5590
5591    // The resumed subagent should have completed normally since the ratio
5592    // didn't transition (it was Warning before and stayed at Warning)
5593    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5594    assert!(
5595        markdown.contains("follow-up task response"),
5596        "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5597    );
5598    // The second tool call should NOT have a context window warning
5599    let second_tool_pos = markdown
5600        .find("follow-up task")
5601        .expect("should find follow-up tool call");
5602    let after_second_tool = &markdown[second_tool_pos..];
5603    assert!(
5604        !after_second_tool.contains("nearing the end of its context window"),
5605        "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5606    );
5607}
5608
5609#[gpui::test]
5610async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5611    init_test(cx);
5612    cx.update(|cx| {
5613        LanguageModelRegistry::test(cx);
5614    });
5615    cx.update(|cx| {
5616        cx.update_flags(true, vec!["subagents".to_string()]);
5617    });
5618
5619    let fs = FakeFs::new(cx.executor());
5620    fs.insert_tree(
5621        "/",
5622        json!({
5623            "a": {
5624                "b.md": "Lorem"
5625            }
5626        }),
5627    )
5628    .await;
5629    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5630    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5631    let agent = cx.update(|cx| {
5632        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5633    });
5634    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5635
5636    let acp_thread = cx
5637        .update(|cx| {
5638            connection
5639                .clone()
5640                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5641        })
5642        .await
5643        .unwrap();
5644    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5645    let thread = agent.read_with(cx, |agent, _| {
5646        agent.sessions.get(&session_id).unwrap().thread.clone()
5647    });
5648    let model = Arc::new(FakeLanguageModel::default());
5649
5650    thread.update(cx, |thread, cx| {
5651        thread.set_model(model.clone(), cx);
5652    });
5653    cx.run_until_parked();
5654
5655    // Start the parent turn
5656    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5657    cx.run_until_parked();
5658    model.send_last_completion_stream_text_chunk("spawning subagent");
5659    let subagent_tool_input = SpawnAgentToolInput {
5660        label: "label".to_string(),
5661        message: "subagent task prompt".to_string(),
5662        session_id: None,
5663    };
5664    let subagent_tool_use = LanguageModelToolUse {
5665        id: "subagent_1".into(),
5666        name: SpawnAgentTool::NAME.into(),
5667        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5668        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5669        is_input_complete: true,
5670        thought_signature: None,
5671    };
5672    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5673        subagent_tool_use,
5674    ));
5675    model.end_last_completion_stream();
5676
5677    cx.run_until_parked();
5678
5679    // Verify subagent is running
5680    thread.read_with(cx, |thread, cx| {
5681        assert!(
5682            !thread.running_subagent_ids(cx).is_empty(),
5683            "subagent should be running"
5684        );
5685    });
5686
5687    // The subagent's model returns a non-retryable error
5688    model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5689        tokens: None,
5690    });
5691
5692    cx.run_until_parked();
5693
5694    // The subagent should no longer be running
5695    thread.read_with(cx, |thread, cx| {
5696        assert!(
5697            thread.running_subagent_ids(cx).is_empty(),
5698            "subagent should not be running after error"
5699        );
5700    });
5701
5702    // The parent model should get a new completion request to respond to the tool error
5703    model.send_last_completion_stream_text_chunk("Response after error");
5704    model.end_last_completion_stream();
5705
5706    send.await.unwrap();
5707
5708    // Verify the parent thread shows the error in the tool call
5709    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5710    assert!(
5711        markdown.contains("Status: Failed"),
5712        "tool call should have Failed status after model error, got:\n{markdown}"
5713    );
5714}
5715
5716#[gpui::test]
5717async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5718    init_test(cx);
5719
5720    let fs = FakeFs::new(cx.executor());
5721    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5722        .await;
5723    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5724
5725    cx.update(|cx| {
5726        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5727        settings.tool_permissions.tools.insert(
5728            EditFileTool::NAME.into(),
5729            agent_settings::ToolRules {
5730                default: Some(settings::ToolPermissionMode::Allow),
5731                always_allow: vec![],
5732                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5733                always_confirm: vec![],
5734                invalid_patterns: vec![],
5735            },
5736        );
5737        agent_settings::AgentSettings::override_global(settings, cx);
5738    });
5739
5740    let context_server_registry =
5741        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5742    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5743    let templates = crate::Templates::new();
5744    let thread = cx.new(|cx| {
5745        crate::Thread::new(
5746            project.clone(),
5747            cx.new(|_cx| prompt_store::ProjectContext::default()),
5748            context_server_registry,
5749            templates.clone(),
5750            None,
5751            cx,
5752        )
5753    });
5754
5755    #[allow(clippy::arc_with_non_send_sync)]
5756    let tool = Arc::new(crate::EditFileTool::new(
5757        project.clone(),
5758        thread.downgrade(),
5759        language_registry,
5760        templates,
5761    ));
5762    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5763
5764    let task = cx.update(|cx| {
5765        tool.run(
5766            ToolInput::resolved(crate::EditFileToolInput {
5767                display_description: "Edit sensitive file".to_string(),
5768                path: "root/sensitive_config.txt".into(),
5769                mode: crate::EditFileMode::Edit,
5770            }),
5771            event_stream,
5772            cx,
5773        )
5774    });
5775
5776    let result = task.await;
5777    assert!(result.is_err(), "expected edit to be blocked");
5778    assert!(
5779        result.unwrap_err().to_string().contains("blocked"),
5780        "error should mention the edit was blocked"
5781    );
5782}
5783
5784#[gpui::test]
5785async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5786    init_test(cx);
5787
5788    let fs = FakeFs::new(cx.executor());
5789    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5790        .await;
5791    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5792
5793    cx.update(|cx| {
5794        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5795        settings.tool_permissions.tools.insert(
5796            DeletePathTool::NAME.into(),
5797            agent_settings::ToolRules {
5798                default: Some(settings::ToolPermissionMode::Allow),
5799                always_allow: vec![],
5800                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5801                always_confirm: vec![],
5802                invalid_patterns: vec![],
5803            },
5804        );
5805        agent_settings::AgentSettings::override_global(settings, cx);
5806    });
5807
5808    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5809
5810    #[allow(clippy::arc_with_non_send_sync)]
5811    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5812    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5813
5814    let task = cx.update(|cx| {
5815        tool.run(
5816            ToolInput::resolved(crate::DeletePathToolInput {
5817                path: "root/important_data.txt".to_string(),
5818            }),
5819            event_stream,
5820            cx,
5821        )
5822    });
5823
5824    let result = task.await;
5825    assert!(result.is_err(), "expected deletion to be blocked");
5826    assert!(
5827        result.unwrap_err().contains("blocked"),
5828        "error should mention the deletion was blocked"
5829    );
5830}
5831
5832#[gpui::test]
5833async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5834    init_test(cx);
5835
5836    let fs = FakeFs::new(cx.executor());
5837    fs.insert_tree(
5838        "/root",
5839        json!({
5840            "safe.txt": "content",
5841            "protected": {}
5842        }),
5843    )
5844    .await;
5845    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5846
5847    cx.update(|cx| {
5848        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5849        settings.tool_permissions.tools.insert(
5850            MovePathTool::NAME.into(),
5851            agent_settings::ToolRules {
5852                default: Some(settings::ToolPermissionMode::Allow),
5853                always_allow: vec![],
5854                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5855                always_confirm: vec![],
5856                invalid_patterns: vec![],
5857            },
5858        );
5859        agent_settings::AgentSettings::override_global(settings, cx);
5860    });
5861
5862    #[allow(clippy::arc_with_non_send_sync)]
5863    let tool = Arc::new(crate::MovePathTool::new(project));
5864    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5865
5866    let task = cx.update(|cx| {
5867        tool.run(
5868            ToolInput::resolved(crate::MovePathToolInput {
5869                source_path: "root/safe.txt".to_string(),
5870                destination_path: "root/protected/safe.txt".to_string(),
5871            }),
5872            event_stream,
5873            cx,
5874        )
5875    });
5876
5877    let result = task.await;
5878    assert!(
5879        result.is_err(),
5880        "expected move to be blocked due to destination path"
5881    );
5882    assert!(
5883        result.unwrap_err().contains("blocked"),
5884        "error should mention the move was blocked"
5885    );
5886}
5887
5888#[gpui::test]
5889async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5890    init_test(cx);
5891
5892    let fs = FakeFs::new(cx.executor());
5893    fs.insert_tree(
5894        "/root",
5895        json!({
5896            "secret.txt": "secret content",
5897            "public": {}
5898        }),
5899    )
5900    .await;
5901    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5902
5903    cx.update(|cx| {
5904        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5905        settings.tool_permissions.tools.insert(
5906            MovePathTool::NAME.into(),
5907            agent_settings::ToolRules {
5908                default: Some(settings::ToolPermissionMode::Allow),
5909                always_allow: vec![],
5910                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5911                always_confirm: vec![],
5912                invalid_patterns: vec![],
5913            },
5914        );
5915        agent_settings::AgentSettings::override_global(settings, cx);
5916    });
5917
5918    #[allow(clippy::arc_with_non_send_sync)]
5919    let tool = Arc::new(crate::MovePathTool::new(project));
5920    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5921
5922    let task = cx.update(|cx| {
5923        tool.run(
5924            ToolInput::resolved(crate::MovePathToolInput {
5925                source_path: "root/secret.txt".to_string(),
5926                destination_path: "root/public/not_secret.txt".to_string(),
5927            }),
5928            event_stream,
5929            cx,
5930        )
5931    });
5932
5933    let result = task.await;
5934    assert!(
5935        result.is_err(),
5936        "expected move to be blocked due to source path"
5937    );
5938    assert!(
5939        result.unwrap_err().contains("blocked"),
5940        "error should mention the move was blocked"
5941    );
5942}
5943
5944#[gpui::test]
5945async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5946    init_test(cx);
5947
5948    let fs = FakeFs::new(cx.executor());
5949    fs.insert_tree(
5950        "/root",
5951        json!({
5952            "confidential.txt": "confidential data",
5953            "dest": {}
5954        }),
5955    )
5956    .await;
5957    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5958
5959    cx.update(|cx| {
5960        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5961        settings.tool_permissions.tools.insert(
5962            CopyPathTool::NAME.into(),
5963            agent_settings::ToolRules {
5964                default: Some(settings::ToolPermissionMode::Allow),
5965                always_allow: vec![],
5966                always_deny: vec![
5967                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5968                ],
5969                always_confirm: vec![],
5970                invalid_patterns: vec![],
5971            },
5972        );
5973        agent_settings::AgentSettings::override_global(settings, cx);
5974    });
5975
5976    #[allow(clippy::arc_with_non_send_sync)]
5977    let tool = Arc::new(crate::CopyPathTool::new(project));
5978    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5979
5980    let task = cx.update(|cx| {
5981        tool.run(
5982            ToolInput::resolved(crate::CopyPathToolInput {
5983                source_path: "root/confidential.txt".to_string(),
5984                destination_path: "root/dest/copy.txt".to_string(),
5985            }),
5986            event_stream,
5987            cx,
5988        )
5989    });
5990
5991    let result = task.await;
5992    assert!(result.is_err(), "expected copy to be blocked");
5993    assert!(
5994        result.unwrap_err().contains("blocked"),
5995        "error should mention the copy was blocked"
5996    );
5997}
5998
5999#[gpui::test]
6000async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
6001    init_test(cx);
6002
6003    let fs = FakeFs::new(cx.executor());
6004    fs.insert_tree(
6005        "/root",
6006        json!({
6007            "normal.txt": "normal content",
6008            "readonly": {
6009                "config.txt": "readonly content"
6010            }
6011        }),
6012    )
6013    .await;
6014    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6015
6016    cx.update(|cx| {
6017        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6018        settings.tool_permissions.tools.insert(
6019            SaveFileTool::NAME.into(),
6020            agent_settings::ToolRules {
6021                default: Some(settings::ToolPermissionMode::Allow),
6022                always_allow: vec![],
6023                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6024                always_confirm: vec![],
6025                invalid_patterns: vec![],
6026            },
6027        );
6028        agent_settings::AgentSettings::override_global(settings, cx);
6029    });
6030
6031    #[allow(clippy::arc_with_non_send_sync)]
6032    let tool = Arc::new(crate::SaveFileTool::new(project));
6033    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6034
6035    let task = cx.update(|cx| {
6036        tool.run(
6037            ToolInput::resolved(crate::SaveFileToolInput {
6038                paths: vec![
6039                    std::path::PathBuf::from("root/normal.txt"),
6040                    std::path::PathBuf::from("root/readonly/config.txt"),
6041                ],
6042            }),
6043            event_stream,
6044            cx,
6045        )
6046    });
6047
6048    let result = task.await;
6049    assert!(
6050        result.is_err(),
6051        "expected save to be blocked due to denied path"
6052    );
6053    assert!(
6054        result.unwrap_err().contains("blocked"),
6055        "error should mention the save was blocked"
6056    );
6057}
6058
6059#[gpui::test]
6060async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6061    init_test(cx);
6062
6063    let fs = FakeFs::new(cx.executor());
6064    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6065        .await;
6066    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6067
6068    cx.update(|cx| {
6069        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6070        settings.tool_permissions.tools.insert(
6071            SaveFileTool::NAME.into(),
6072            agent_settings::ToolRules {
6073                default: Some(settings::ToolPermissionMode::Allow),
6074                always_allow: vec![],
6075                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6076                always_confirm: vec![],
6077                invalid_patterns: vec![],
6078            },
6079        );
6080        agent_settings::AgentSettings::override_global(settings, cx);
6081    });
6082
6083    #[allow(clippy::arc_with_non_send_sync)]
6084    let tool = Arc::new(crate::SaveFileTool::new(project));
6085    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6086
6087    let task = cx.update(|cx| {
6088        tool.run(
6089            ToolInput::resolved(crate::SaveFileToolInput {
6090                paths: vec![std::path::PathBuf::from("root/config.secret")],
6091            }),
6092            event_stream,
6093            cx,
6094        )
6095    });
6096
6097    let result = task.await;
6098    assert!(result.is_err(), "expected save to be blocked");
6099    assert!(
6100        result.unwrap_err().contains("blocked"),
6101        "error should mention the save was blocked"
6102    );
6103}
6104
6105#[gpui::test]
6106async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6107    init_test(cx);
6108
6109    cx.update(|cx| {
6110        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6111        settings.tool_permissions.tools.insert(
6112            WebSearchTool::NAME.into(),
6113            agent_settings::ToolRules {
6114                default: Some(settings::ToolPermissionMode::Allow),
6115                always_allow: vec![],
6116                always_deny: vec![
6117                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6118                ],
6119                always_confirm: vec![],
6120                invalid_patterns: vec![],
6121            },
6122        );
6123        agent_settings::AgentSettings::override_global(settings, cx);
6124    });
6125
6126    #[allow(clippy::arc_with_non_send_sync)]
6127    let tool = Arc::new(crate::WebSearchTool);
6128    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6129
6130    let input: crate::WebSearchToolInput =
6131        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6132
6133    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6134
6135    let result = task.await;
6136    assert!(result.is_err(), "expected search to be blocked");
6137    match result.unwrap_err() {
6138        crate::WebSearchToolOutput::Error { error } => {
6139            assert!(
6140                error.contains("blocked"),
6141                "error should mention the search was blocked"
6142            );
6143        }
6144        other => panic!("expected Error variant, got: {other:?}"),
6145    }
6146}
6147
6148#[gpui::test]
6149async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6150    init_test(cx);
6151
6152    let fs = FakeFs::new(cx.executor());
6153    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6154        .await;
6155    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6156
6157    cx.update(|cx| {
6158        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6159        settings.tool_permissions.tools.insert(
6160            EditFileTool::NAME.into(),
6161            agent_settings::ToolRules {
6162                default: Some(settings::ToolPermissionMode::Confirm),
6163                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6164                always_deny: vec![],
6165                always_confirm: vec![],
6166                invalid_patterns: vec![],
6167            },
6168        );
6169        agent_settings::AgentSettings::override_global(settings, cx);
6170    });
6171
6172    let context_server_registry =
6173        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6174    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6175    let templates = crate::Templates::new();
6176    let thread = cx.new(|cx| {
6177        crate::Thread::new(
6178            project.clone(),
6179            cx.new(|_cx| prompt_store::ProjectContext::default()),
6180            context_server_registry,
6181            templates.clone(),
6182            None,
6183            cx,
6184        )
6185    });
6186
6187    #[allow(clippy::arc_with_non_send_sync)]
6188    let tool = Arc::new(crate::EditFileTool::new(
6189        project,
6190        thread.downgrade(),
6191        language_registry,
6192        templates,
6193    ));
6194    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6195
6196    let _task = cx.update(|cx| {
6197        tool.run(
6198            ToolInput::resolved(crate::EditFileToolInput {
6199                display_description: "Edit README".to_string(),
6200                path: "root/README.md".into(),
6201                mode: crate::EditFileMode::Edit,
6202            }),
6203            event_stream,
6204            cx,
6205        )
6206    });
6207
6208    cx.run_until_parked();
6209
6210    let event = rx.try_next();
6211    assert!(
6212        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6213        "expected no authorization request for allowed .md file"
6214    );
6215}
6216
6217#[gpui::test]
6218async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6219    init_test(cx);
6220
6221    let fs = FakeFs::new(cx.executor());
6222    fs.insert_tree(
6223        "/root",
6224        json!({
6225            ".zed": {
6226                "settings.json": "{}"
6227            },
6228            "README.md": "# Hello"
6229        }),
6230    )
6231    .await;
6232    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6233
6234    cx.update(|cx| {
6235        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6236        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6237        agent_settings::AgentSettings::override_global(settings, cx);
6238    });
6239
6240    let context_server_registry =
6241        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6242    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6243    let templates = crate::Templates::new();
6244    let thread = cx.new(|cx| {
6245        crate::Thread::new(
6246            project.clone(),
6247            cx.new(|_cx| prompt_store::ProjectContext::default()),
6248            context_server_registry,
6249            templates.clone(),
6250            None,
6251            cx,
6252        )
6253    });
6254
6255    #[allow(clippy::arc_with_non_send_sync)]
6256    let tool = Arc::new(crate::EditFileTool::new(
6257        project,
6258        thread.downgrade(),
6259        language_registry,
6260        templates,
6261    ));
6262
6263    // Editing a file inside .zed/ should still prompt even with global default: allow,
6264    // because local settings paths are sensitive and require confirmation regardless.
6265    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6266    let _task = cx.update(|cx| {
6267        tool.run(
6268            ToolInput::resolved(crate::EditFileToolInput {
6269                display_description: "Edit local settings".to_string(),
6270                path: "root/.zed/settings.json".into(),
6271                mode: crate::EditFileMode::Edit,
6272            }),
6273            event_stream,
6274            cx,
6275        )
6276    });
6277
6278    let _update = rx.expect_update_fields().await;
6279    let _auth = rx.expect_authorization().await;
6280}
6281
6282#[gpui::test]
6283async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6284    init_test(cx);
6285
6286    cx.update(|cx| {
6287        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6288        settings.tool_permissions.tools.insert(
6289            FetchTool::NAME.into(),
6290            agent_settings::ToolRules {
6291                default: Some(settings::ToolPermissionMode::Allow),
6292                always_allow: vec![],
6293                always_deny: vec![
6294                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6295                ],
6296                always_confirm: vec![],
6297                invalid_patterns: vec![],
6298            },
6299        );
6300        agent_settings::AgentSettings::override_global(settings, cx);
6301    });
6302
6303    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6304
6305    #[allow(clippy::arc_with_non_send_sync)]
6306    let tool = Arc::new(crate::FetchTool::new(http_client));
6307    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6308
6309    let input: crate::FetchToolInput =
6310        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6311
6312    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6313
6314    let result = task.await;
6315    assert!(result.is_err(), "expected fetch to be blocked");
6316    assert!(
6317        result.unwrap_err().contains("blocked"),
6318        "error should mention the fetch was blocked"
6319    );
6320}
6321
6322#[gpui::test]
6323async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6324    init_test(cx);
6325
6326    cx.update(|cx| {
6327        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6328        settings.tool_permissions.tools.insert(
6329            FetchTool::NAME.into(),
6330            agent_settings::ToolRules {
6331                default: Some(settings::ToolPermissionMode::Confirm),
6332                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6333                always_deny: vec![],
6334                always_confirm: vec![],
6335                invalid_patterns: vec![],
6336            },
6337        );
6338        agent_settings::AgentSettings::override_global(settings, cx);
6339    });
6340
6341    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6342
6343    #[allow(clippy::arc_with_non_send_sync)]
6344    let tool = Arc::new(crate::FetchTool::new(http_client));
6345    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6346
6347    let input: crate::FetchToolInput =
6348        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6349
6350    let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6351
6352    cx.run_until_parked();
6353
6354    let event = rx.try_next();
6355    assert!(
6356        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6357        "expected no authorization request for allowed docs.rs URL"
6358    );
6359}
6360
6361#[gpui::test]
6362async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6363    init_test(cx);
6364    always_allow_tools(cx);
6365
6366    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6367    let fake_model = model.as_fake();
6368
6369    // Add a tool so we can simulate tool calls
6370    thread.update(cx, |thread, _cx| {
6371        thread.add_tool(EchoTool);
6372    });
6373
6374    // Start a turn by sending a message
6375    let mut events = thread
6376        .update(cx, |thread, cx| {
6377            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6378        })
6379        .unwrap();
6380    cx.run_until_parked();
6381
6382    // Simulate the model making a tool call
6383    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6384        LanguageModelToolUse {
6385            id: "tool_1".into(),
6386            name: "echo".into(),
6387            raw_input: r#"{"text": "hello"}"#.into(),
6388            input: json!({"text": "hello"}),
6389            is_input_complete: true,
6390            thought_signature: None,
6391        },
6392    ));
6393    fake_model
6394        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6395
6396    // Signal that a message is queued before ending the stream
6397    thread.update(cx, |thread, _cx| {
6398        thread.set_has_queued_message(true);
6399    });
6400
6401    // Now end the stream - tool will run, and the boundary check should see the queue
6402    fake_model.end_last_completion_stream();
6403
6404    // Collect all events until the turn stops
6405    let all_events = collect_events_until_stop(&mut events, cx).await;
6406
6407    // Verify we received the tool call event
6408    let tool_call_ids: Vec<_> = all_events
6409        .iter()
6410        .filter_map(|e| match e {
6411            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6412            _ => None,
6413        })
6414        .collect();
6415    assert_eq!(
6416        tool_call_ids,
6417        vec!["tool_1"],
6418        "Should have received a tool call event for our echo tool"
6419    );
6420
6421    // The turn should have stopped with EndTurn
6422    let stop_reasons = stop_events(all_events);
6423    assert_eq!(
6424        stop_reasons,
6425        vec![acp::StopReason::EndTurn],
6426        "Turn should have ended after tool completion due to queued message"
6427    );
6428
6429    // Verify the queued message flag is still set
6430    thread.update(cx, |thread, _cx| {
6431        assert!(
6432            thread.has_queued_message(),
6433            "Should still have queued message flag set"
6434        );
6435    });
6436
6437    // Thread should be idle now
6438    thread.update(cx, |thread, _cx| {
6439        assert!(
6440            thread.is_turn_complete(),
6441            "Thread should not be running after turn ends"
6442        );
6443    });
6444}
6445
6446#[gpui::test]
6447async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6448    init_test(cx);
6449    always_allow_tools(cx);
6450
6451    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6452    let fake_model = model.as_fake();
6453
6454    thread.update(cx, |thread, _cx| {
6455        thread.add_tool(StreamingFailingEchoTool {
6456            receive_chunks_until_failure: 1,
6457        });
6458    });
6459
6460    let _events = thread
6461        .update(cx, |thread, cx| {
6462            thread.send(
6463                UserMessageId::new(),
6464                ["Use the streaming_failing_echo tool"],
6465                cx,
6466            )
6467        })
6468        .unwrap();
6469    cx.run_until_parked();
6470
6471    let tool_use = LanguageModelToolUse {
6472        id: "call_1".into(),
6473        name: StreamingFailingEchoTool::NAME.into(),
6474        raw_input: "hello".into(),
6475        input: json!({}),
6476        is_input_complete: false,
6477        thought_signature: None,
6478    };
6479
6480    fake_model
6481        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6482
6483    cx.run_until_parked();
6484
6485    let completions = fake_model.pending_completions();
6486    let last_completion = completions.last().unwrap();
6487
6488    assert_eq!(
6489        last_completion.messages[1..],
6490        vec![
6491            LanguageModelRequestMessage {
6492                role: Role::User,
6493                content: vec!["Use the streaming_failing_echo tool".into()],
6494                cache: false,
6495                reasoning_details: None,
6496            },
6497            LanguageModelRequestMessage {
6498                role: Role::Assistant,
6499                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6500                cache: false,
6501                reasoning_details: None,
6502            },
6503            LanguageModelRequestMessage {
6504                role: Role::User,
6505                content: vec![language_model::MessageContent::ToolResult(
6506                    LanguageModelToolResult {
6507                        tool_use_id: tool_use.id.clone(),
6508                        tool_name: tool_use.name,
6509                        is_error: true,
6510                        content: "failed".into(),
6511                        output: Some("failed".into()),
6512                    }
6513                )],
6514                cache: true,
6515                reasoning_details: None,
6516            },
6517        ]
6518    );
6519}
6520
6521#[gpui::test]
6522async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6523    init_test(cx);
6524    always_allow_tools(cx);
6525
6526    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6527    let fake_model = model.as_fake();
6528
6529    let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6530        oneshot::channel();
6531
6532    thread.update(cx, |thread, _cx| {
6533        thread.add_tool(
6534            StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6535        );
6536        thread.add_tool(StreamingFailingEchoTool {
6537            receive_chunks_until_failure: 1,
6538        });
6539    });
6540
6541    let _events = thread
6542        .update(cx, |thread, cx| {
6543            thread.send(
6544                UserMessageId::new(),
6545                ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6546                cx,
6547            )
6548        })
6549        .unwrap();
6550    cx.run_until_parked();
6551
6552    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6553        LanguageModelToolUse {
6554            id: "call_1".into(),
6555            name: StreamingEchoTool::NAME.into(),
6556            raw_input: "hello".into(),
6557            input: json!({ "text": "hello" }),
6558            is_input_complete: false,
6559            thought_signature: None,
6560        },
6561    ));
6562    let first_tool_use = LanguageModelToolUse {
6563        id: "call_1".into(),
6564        name: StreamingEchoTool::NAME.into(),
6565        raw_input: "hello world".into(),
6566        input: json!({ "text": "hello world" }),
6567        is_input_complete: true,
6568        thought_signature: None,
6569    };
6570    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6571        first_tool_use.clone(),
6572    ));
6573    let second_tool_use = LanguageModelToolUse {
6574        name: StreamingFailingEchoTool::NAME.into(),
6575        raw_input: "hello".into(),
6576        input: json!({ "text": "hello" }),
6577        is_input_complete: false,
6578        thought_signature: None,
6579        id: "call_2".into(),
6580    };
6581    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6582        second_tool_use.clone(),
6583    ));
6584
6585    cx.run_until_parked();
6586
6587    complete_streaming_echo_tool_call_tx.send(()).unwrap();
6588
6589    cx.run_until_parked();
6590
6591    let completions = fake_model.pending_completions();
6592    let last_completion = completions.last().unwrap();
6593
6594    assert_eq!(
6595        last_completion.messages[1..],
6596        vec![
6597            LanguageModelRequestMessage {
6598                role: Role::User,
6599                content: vec![
6600                    "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6601                ],
6602                cache: false,
6603                reasoning_details: None,
6604            },
6605            LanguageModelRequestMessage {
6606                role: Role::Assistant,
6607                content: vec![
6608                    language_model::MessageContent::ToolUse(first_tool_use.clone()),
6609                    language_model::MessageContent::ToolUse(second_tool_use.clone())
6610                ],
6611                cache: false,
6612                reasoning_details: None,
6613            },
6614            LanguageModelRequestMessage {
6615                role: Role::User,
6616                content: vec![
6617                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6618                        tool_use_id: second_tool_use.id.clone(),
6619                        tool_name: second_tool_use.name,
6620                        is_error: true,
6621                        content: "failed".into(),
6622                        output: Some("failed".into()),
6623                    }),
6624                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6625                        tool_use_id: first_tool_use.id.clone(),
6626                        tool_name: first_tool_use.name,
6627                        is_error: false,
6628                        content: "hello world".into(),
6629                        output: Some("hello world".into()),
6630                    }),
6631                ],
6632                cache: true,
6633                reasoning_details: None,
6634            },
6635        ]
6636    );
6637}
6638
6639#[gpui::test]
6640async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6641    let ThreadTest {
6642        model, thread, fs, ..
6643    } = setup(cx, TestModel::Fake).await;
6644    let fake_model_a = model.as_fake();
6645
6646    thread.update(cx, |thread, _cx| {
6647        thread.add_tool(EchoTool);
6648        thread.add_tool(DelayTool);
6649    });
6650
6651    // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6652    fs.insert_file(
6653        paths::settings_file(),
6654        json!({
6655            "agent": {
6656                "profiles": {
6657                    "profile-a": {
6658                        "name": "Profile A",
6659                        "tools": {
6660                            EchoTool::NAME: true,
6661                            DelayTool::NAME: true,
6662                        }
6663                    },
6664                    "profile-b": {
6665                        "name": "Profile B",
6666                        "tools": {
6667                            DelayTool::NAME: true,
6668                        }
6669                    }
6670                }
6671            }
6672        })
6673        .to_string()
6674        .into_bytes(),
6675    )
6676    .await;
6677    cx.run_until_parked();
6678
6679    thread.update(cx, |thread, cx| {
6680        thread.set_profile(AgentProfileId("profile-a".into()), cx);
6681        thread.set_thinking_enabled(false, cx);
6682    });
6683
6684    // Send a message — first iteration starts with model A, profile-a, thinking off.
6685    thread
6686        .update(cx, |thread, cx| {
6687            thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6688        })
6689        .unwrap();
6690    cx.run_until_parked();
6691
6692    // Verify first request has both tools and thinking disabled.
6693    let completions = fake_model_a.pending_completions();
6694    assert_eq!(completions.len(), 1);
6695    let first_tools = tool_names_for_completion(&completions[0]);
6696    assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6697    assert!(!completions[0].thinking_allowed);
6698
6699    // Model A responds with an echo tool call.
6700    fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6701        LanguageModelToolUse {
6702            id: "tool_1".into(),
6703            name: "echo".into(),
6704            raw_input: r#"{"text":"hello"}"#.into(),
6705            input: json!({"text": "hello"}),
6706            is_input_complete: true,
6707            thought_signature: None,
6708        },
6709    ));
6710    fake_model_a.end_last_completion_stream();
6711
6712    // Before the next iteration runs, switch to profile-b (only DelayTool),
6713    // swap in a new model, and enable thinking.
6714    let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6715        "test-provider",
6716        "model-b",
6717        "Model B",
6718        true,
6719    ));
6720    thread.update(cx, |thread, cx| {
6721        thread.set_profile(AgentProfileId("profile-b".into()), cx);
6722        thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6723        thread.set_thinking_enabled(true, cx);
6724    });
6725
6726    // Run until parked — processes the echo tool call, loops back, picks up
6727    // the new model/profile/thinking, and makes a second request to model B.
6728    cx.run_until_parked();
6729
6730    // The second request should have gone to model B.
6731    let model_b_completions = fake_model_b.pending_completions();
6732    assert_eq!(
6733        model_b_completions.len(),
6734        1,
6735        "second request should go to model B"
6736    );
6737
6738    // Profile-b only has DelayTool, so echo should be gone.
6739    let second_tools = tool_names_for_completion(&model_b_completions[0]);
6740    assert_eq!(second_tools, vec![DelayTool::NAME]);
6741
6742    // Thinking should now be enabled.
6743    assert!(model_b_completions[0].thinking_allowed);
6744}