mod.rs

   1use super::*;
   2use acp_thread::{
   3    AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
   4    UserMessageId,
   5};
   6use agent_client_protocol::{self as acp};
   7use agent_settings::AgentProfileId;
   8use anyhow::Result;
   9use client::{Client, UserStore};
  10use cloud_llm_client::CompletionIntent;
  11use collections::IndexMap;
  12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  13use feature_flags::FeatureFlagAppExt as _;
  14use fs::{FakeFs, Fs};
  15use futures::{
  16    FutureExt as _, StreamExt,
  17    channel::{
  18        mpsc::{self, UnboundedReceiver},
  19        oneshot,
  20    },
  21    future::{Fuse, Shared},
  22};
  23use gpui::{
  24    App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
  25    http_client::FakeHttpClient,
  26};
  27use indoc::indoc;
  28use language_model::{
  29    LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
  30    LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
  31    LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
  32    LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
  33    fake_provider::FakeLanguageModel,
  34};
  35use pretty_assertions::assert_eq;
  36use project::{
  37    Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
  38};
  39use prompt_store::ProjectContext;
  40use reqwest_client::ReqwestClient;
  41use schemars::JsonSchema;
  42use serde::{Deserialize, Serialize};
  43use serde_json::json;
  44use settings::{Settings, SettingsStore};
  45use std::{
  46    path::Path,
  47    pin::Pin,
  48    rc::Rc,
  49    sync::{
  50        Arc,
  51        atomic::{AtomicBool, AtomicUsize, Ordering},
  52    },
  53    time::Duration,
  54};
  55use util::path;
  56
  57mod edit_file_thread_test;
  58mod test_tools;
  59use test_tools::*;
  60
  61pub(crate) fn init_test(cx: &mut TestAppContext) {
  62    cx.update(|cx| {
  63        let settings_store = SettingsStore::test(cx);
  64        cx.set_global(settings_store);
  65    });
  66}
  67
  68pub(crate) struct FakeTerminalHandle {
  69    killed: Arc<AtomicBool>,
  70    stopped_by_user: Arc<AtomicBool>,
  71    exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
  72    wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
  73    output: acp::TerminalOutputResponse,
  74    id: acp::TerminalId,
  75}
  76
  77impl FakeTerminalHandle {
  78    pub(crate) fn new_never_exits(cx: &mut App) -> Self {
  79        let killed = Arc::new(AtomicBool::new(false));
  80        let stopped_by_user = Arc::new(AtomicBool::new(false));
  81
  82        let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
  83
  84        let wait_for_exit = cx
  85            .spawn(async move |_cx| {
  86                // Wait for the exit signal (sent when kill() is called)
  87                let _ = exit_receiver.await;
  88                acp::TerminalExitStatus::new()
  89            })
  90            .shared();
  91
  92        Self {
  93            killed,
  94            stopped_by_user,
  95            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
  96            wait_for_exit,
  97            output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
  98            id: acp::TerminalId::new("fake_terminal".to_string()),
  99        }
 100    }
 101
 102    pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
 103        let killed = Arc::new(AtomicBool::new(false));
 104        let stopped_by_user = Arc::new(AtomicBool::new(false));
 105        let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
 106
 107        let wait_for_exit = cx
 108            .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
 109            .shared();
 110
 111        Self {
 112            killed,
 113            stopped_by_user,
 114            exit_sender: std::cell::RefCell::new(Some(exit_sender)),
 115            wait_for_exit,
 116            output: acp::TerminalOutputResponse::new("command output".to_string(), false),
 117            id: acp::TerminalId::new("fake_terminal".to_string()),
 118        }
 119    }
 120
 121    pub(crate) fn was_killed(&self) -> bool {
 122        self.killed.load(Ordering::SeqCst)
 123    }
 124
 125    pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
 126        self.stopped_by_user.store(stopped, Ordering::SeqCst);
 127    }
 128
 129    pub(crate) fn signal_exit(&self) {
 130        if let Some(sender) = self.exit_sender.borrow_mut().take() {
 131            let _ = sender.send(());
 132        }
 133    }
 134}
 135
 136impl crate::TerminalHandle for FakeTerminalHandle {
 137    fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
 138        Ok(self.id.clone())
 139    }
 140
 141    fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
 142        Ok(self.output.clone())
 143    }
 144
 145    fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
 146        Ok(self.wait_for_exit.clone())
 147    }
 148
 149    fn kill(&self, _cx: &AsyncApp) -> Result<()> {
 150        self.killed.store(true, Ordering::SeqCst);
 151        self.signal_exit();
 152        Ok(())
 153    }
 154
 155    fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
 156        Ok(self.stopped_by_user.load(Ordering::SeqCst))
 157    }
 158}
 159
 160struct FakeSubagentHandle {
 161    session_id: acp::SessionId,
 162    send_task: Shared<Task<String>>,
 163}
 164
 165impl SubagentHandle for FakeSubagentHandle {
 166    fn id(&self) -> acp::SessionId {
 167        self.session_id.clone()
 168    }
 169
 170    fn num_entries(&self, _cx: &App) -> usize {
 171        unimplemented!()
 172    }
 173
 174    fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
 175        let task = self.send_task.clone();
 176        cx.background_spawn(async move { Ok(task.await) })
 177    }
 178}
 179
 180#[derive(Default)]
 181pub(crate) struct FakeThreadEnvironment {
 182    terminal_handle: Option<Rc<FakeTerminalHandle>>,
 183    subagent_handle: Option<Rc<FakeSubagentHandle>>,
 184    terminal_creations: Arc<AtomicUsize>,
 185}
 186
 187impl FakeThreadEnvironment {
 188    pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
 189        Self {
 190            terminal_handle: Some(terminal_handle.into()),
 191            ..self
 192        }
 193    }
 194
 195    pub(crate) fn terminal_creation_count(&self) -> usize {
 196        self.terminal_creations.load(Ordering::SeqCst)
 197    }
 198}
 199
 200impl crate::ThreadEnvironment for FakeThreadEnvironment {
 201    fn create_terminal(
 202        &self,
 203        _command: String,
 204        _cwd: Option<std::path::PathBuf>,
 205        _output_byte_limit: Option<u64>,
 206        _cx: &mut AsyncApp,
 207    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 208        self.terminal_creations.fetch_add(1, Ordering::SeqCst);
 209        let handle = self
 210            .terminal_handle
 211            .clone()
 212            .expect("Terminal handle not available on FakeThreadEnvironment");
 213        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 214    }
 215
 216    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 217        Ok(self
 218            .subagent_handle
 219            .clone()
 220            .expect("Subagent handle not available on FakeThreadEnvironment")
 221            as Rc<dyn SubagentHandle>)
 222    }
 223}
 224
 225/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
 226struct MultiTerminalEnvironment {
 227    handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
 228}
 229
 230impl MultiTerminalEnvironment {
 231    fn new() -> Self {
 232        Self {
 233            handles: std::cell::RefCell::new(Vec::new()),
 234        }
 235    }
 236
 237    fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
 238        self.handles.borrow().clone()
 239    }
 240}
 241
 242impl crate::ThreadEnvironment for MultiTerminalEnvironment {
 243    fn create_terminal(
 244        &self,
 245        _command: String,
 246        _cwd: Option<std::path::PathBuf>,
 247        _output_byte_limit: Option<u64>,
 248        cx: &mut AsyncApp,
 249    ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
 250        let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
 251        self.handles.borrow_mut().push(handle.clone());
 252        Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
 253    }
 254
 255    fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
 256        unimplemented!()
 257    }
 258}
 259
 260fn always_allow_tools(cx: &mut TestAppContext) {
 261    cx.update(|cx| {
 262        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
 263        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
 264        agent_settings::AgentSettings::override_global(settings, cx);
 265    });
 266}
 267
 268#[gpui::test]
 269async fn test_echo(cx: &mut TestAppContext) {
 270    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 271    let fake_model = model.as_fake();
 272
 273    let events = thread
 274        .update(cx, |thread, cx| {
 275            thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
 276        })
 277        .unwrap();
 278    cx.run_until_parked();
 279    fake_model.send_last_completion_stream_text_chunk("Hello");
 280    fake_model
 281        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 282    fake_model.end_last_completion_stream();
 283
 284    let events = events.collect().await;
 285    thread.update(cx, |thread, _cx| {
 286        assert_eq!(
 287            thread.last_received_or_pending_message().unwrap().role(),
 288            Role::Assistant
 289        );
 290        assert_eq!(
 291            thread
 292                .last_received_or_pending_message()
 293                .unwrap()
 294                .to_markdown(),
 295            "Hello\n"
 296        )
 297    });
 298    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 299}
 300
 301#[gpui::test]
 302async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
 303    init_test(cx);
 304    always_allow_tools(cx);
 305
 306    let fs = FakeFs::new(cx.executor());
 307    let project = Project::test(fs, [], cx).await;
 308
 309    let environment = Rc::new(cx.update(|cx| {
 310        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 311    }));
 312    let handle = environment.terminal_handle.clone().unwrap();
 313
 314    #[allow(clippy::arc_with_non_send_sync)]
 315    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 316    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 317
 318    let task = cx.update(|cx| {
 319        tool.run(
 320            ToolInput::resolved(crate::TerminalToolInput {
 321                command: "sleep 1000".to_string(),
 322                cd: ".".to_string(),
 323                timeout_ms: Some(5),
 324            }),
 325            event_stream,
 326            cx,
 327        )
 328    });
 329
 330    let update = rx.expect_update_fields().await;
 331    assert!(
 332        update.content.iter().any(|blocks| {
 333            blocks
 334                .iter()
 335                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 336        }),
 337        "expected tool call update to include terminal content"
 338    );
 339
 340    let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
 341
 342    let deadline = std::time::Instant::now() + Duration::from_millis(500);
 343    loop {
 344        if let Some(result) = task_future.as_mut().now_or_never() {
 345            let result = result.expect("terminal tool task should complete");
 346
 347            assert!(
 348                handle.was_killed(),
 349                "expected terminal handle to be killed on timeout"
 350            );
 351            assert!(
 352                result.contains("partial output"),
 353                "expected result to include terminal output, got: {result}"
 354            );
 355            return;
 356        }
 357
 358        if std::time::Instant::now() >= deadline {
 359            panic!("timed out waiting for terminal tool task to complete");
 360        }
 361
 362        cx.run_until_parked();
 363        cx.background_executor.timer(Duration::from_millis(1)).await;
 364    }
 365}
 366
 367#[gpui::test]
 368#[ignore]
 369async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
 370    init_test(cx);
 371    always_allow_tools(cx);
 372
 373    let fs = FakeFs::new(cx.executor());
 374    let project = Project::test(fs, [], cx).await;
 375
 376    let environment = Rc::new(cx.update(|cx| {
 377        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
 378    }));
 379    let handle = environment.terminal_handle.clone().unwrap();
 380
 381    #[allow(clippy::arc_with_non_send_sync)]
 382    let tool = Arc::new(crate::TerminalTool::new(project, environment));
 383    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
 384
 385    let _task = cx.update(|cx| {
 386        tool.run(
 387            ToolInput::resolved(crate::TerminalToolInput {
 388                command: "sleep 1000".to_string(),
 389                cd: ".".to_string(),
 390                timeout_ms: None,
 391            }),
 392            event_stream,
 393            cx,
 394        )
 395    });
 396
 397    let update = rx.expect_update_fields().await;
 398    assert!(
 399        update.content.iter().any(|blocks| {
 400            blocks
 401                .iter()
 402                .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
 403        }),
 404        "expected tool call update to include terminal content"
 405    );
 406
 407    cx.background_executor
 408        .timer(Duration::from_millis(25))
 409        .await;
 410
 411    assert!(
 412        !handle.was_killed(),
 413        "did not expect terminal handle to be killed without a timeout"
 414    );
 415}
 416
 417#[gpui::test]
 418async fn test_thinking(cx: &mut TestAppContext) {
 419    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 420    let fake_model = model.as_fake();
 421
 422    let events = thread
 423        .update(cx, |thread, cx| {
 424            thread.send(
 425                UserMessageId::new(),
 426                [indoc! {"
 427                    Testing:
 428
 429                    Generate a thinking step where you just think the word 'Think',
 430                    and have your final answer be 'Hello'
 431                "}],
 432                cx,
 433            )
 434        })
 435        .unwrap();
 436    cx.run_until_parked();
 437    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
 438        text: "Think".to_string(),
 439        signature: None,
 440    });
 441    fake_model.send_last_completion_stream_text_chunk("Hello");
 442    fake_model
 443        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
 444    fake_model.end_last_completion_stream();
 445
 446    let events = events.collect().await;
 447    thread.update(cx, |thread, _cx| {
 448        assert_eq!(
 449            thread.last_received_or_pending_message().unwrap().role(),
 450            Role::Assistant
 451        );
 452        assert_eq!(
 453            thread
 454                .last_received_or_pending_message()
 455                .unwrap()
 456                .to_markdown(),
 457            indoc! {"
 458                <think>Think</think>
 459                Hello
 460            "}
 461        )
 462    });
 463    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 464}
 465
 466#[gpui::test]
 467async fn test_system_prompt(cx: &mut TestAppContext) {
 468    let ThreadTest {
 469        model,
 470        thread,
 471        project_context,
 472        ..
 473    } = setup(cx, TestModel::Fake).await;
 474    let fake_model = model.as_fake();
 475
 476    project_context.update(cx, |project_context, _cx| {
 477        project_context.shell = "test-shell".into()
 478    });
 479    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 480    thread
 481        .update(cx, |thread, cx| {
 482            thread.send(UserMessageId::new(), ["abc"], cx)
 483        })
 484        .unwrap();
 485    cx.run_until_parked();
 486    let mut pending_completions = fake_model.pending_completions();
 487    assert_eq!(
 488        pending_completions.len(),
 489        1,
 490        "unexpected pending completions: {:?}",
 491        pending_completions
 492    );
 493
 494    let pending_completion = pending_completions.pop().unwrap();
 495    assert_eq!(pending_completion.messages[0].role, Role::System);
 496
 497    let system_message = &pending_completion.messages[0];
 498    let system_prompt = system_message.content[0].to_str().unwrap();
 499    assert!(
 500        system_prompt.contains("test-shell"),
 501        "unexpected system message: {:?}",
 502        system_message
 503    );
 504    assert!(
 505        system_prompt.contains("## Fixing Diagnostics"),
 506        "unexpected system message: {:?}",
 507        system_message
 508    );
 509}
 510
 511#[gpui::test]
 512async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
 513    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 514    let fake_model = model.as_fake();
 515
 516    thread
 517        .update(cx, |thread, cx| {
 518            thread.send(UserMessageId::new(), ["abc"], cx)
 519        })
 520        .unwrap();
 521    cx.run_until_parked();
 522    let mut pending_completions = fake_model.pending_completions();
 523    assert_eq!(
 524        pending_completions.len(),
 525        1,
 526        "unexpected pending completions: {:?}",
 527        pending_completions
 528    );
 529
 530    let pending_completion = pending_completions.pop().unwrap();
 531    assert_eq!(pending_completion.messages[0].role, Role::System);
 532
 533    let system_message = &pending_completion.messages[0];
 534    let system_prompt = system_message.content[0].to_str().unwrap();
 535    assert!(
 536        !system_prompt.contains("## Tool Use"),
 537        "unexpected system message: {:?}",
 538        system_message
 539    );
 540    assert!(
 541        !system_prompt.contains("## Fixing Diagnostics"),
 542        "unexpected system message: {:?}",
 543        system_message
 544    );
 545}
 546
 547#[gpui::test]
 548async fn test_prompt_caching(cx: &mut TestAppContext) {
 549    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 550    let fake_model = model.as_fake();
 551
 552    // Send initial user message and verify it's cached
 553    thread
 554        .update(cx, |thread, cx| {
 555            thread.send(UserMessageId::new(), ["Message 1"], cx)
 556        })
 557        .unwrap();
 558    cx.run_until_parked();
 559
 560    let completion = fake_model.pending_completions().pop().unwrap();
 561    assert_eq!(
 562        completion.messages[1..],
 563        vec![LanguageModelRequestMessage {
 564            role: Role::User,
 565            content: vec!["Message 1".into()],
 566            cache: true,
 567            reasoning_details: None,
 568        }]
 569    );
 570    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 571        "Response to Message 1".into(),
 572    ));
 573    fake_model.end_last_completion_stream();
 574    cx.run_until_parked();
 575
 576    // Send another user message and verify only the latest is cached
 577    thread
 578        .update(cx, |thread, cx| {
 579            thread.send(UserMessageId::new(), ["Message 2"], cx)
 580        })
 581        .unwrap();
 582    cx.run_until_parked();
 583
 584    let completion = fake_model.pending_completions().pop().unwrap();
 585    assert_eq!(
 586        completion.messages[1..],
 587        vec![
 588            LanguageModelRequestMessage {
 589                role: Role::User,
 590                content: vec!["Message 1".into()],
 591                cache: false,
 592                reasoning_details: None,
 593            },
 594            LanguageModelRequestMessage {
 595                role: Role::Assistant,
 596                content: vec!["Response to Message 1".into()],
 597                cache: false,
 598                reasoning_details: None,
 599            },
 600            LanguageModelRequestMessage {
 601                role: Role::User,
 602                content: vec!["Message 2".into()],
 603                cache: true,
 604                reasoning_details: None,
 605            }
 606        ]
 607    );
 608    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
 609        "Response to Message 2".into(),
 610    ));
 611    fake_model.end_last_completion_stream();
 612    cx.run_until_parked();
 613
 614    // Simulate a tool call and verify that the latest tool result is cached
 615    thread.update(cx, |thread, _| thread.add_tool(EchoTool));
 616    thread
 617        .update(cx, |thread, cx| {
 618            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
 619        })
 620        .unwrap();
 621    cx.run_until_parked();
 622
 623    let tool_use = LanguageModelToolUse {
 624        id: "tool_1".into(),
 625        name: EchoTool::NAME.into(),
 626        raw_input: json!({"text": "test"}).to_string(),
 627        input: json!({"text": "test"}),
 628        is_input_complete: true,
 629        thought_signature: None,
 630    };
 631    fake_model
 632        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
 633    fake_model.end_last_completion_stream();
 634    cx.run_until_parked();
 635
 636    let completion = fake_model.pending_completions().pop().unwrap();
 637    let tool_result = LanguageModelToolResult {
 638        tool_use_id: "tool_1".into(),
 639        tool_name: EchoTool::NAME.into(),
 640        is_error: false,
 641        content: "test".into(),
 642        output: Some("test".into()),
 643    };
 644    assert_eq!(
 645        completion.messages[1..],
 646        vec![
 647            LanguageModelRequestMessage {
 648                role: Role::User,
 649                content: vec!["Message 1".into()],
 650                cache: false,
 651                reasoning_details: None,
 652            },
 653            LanguageModelRequestMessage {
 654                role: Role::Assistant,
 655                content: vec!["Response to Message 1".into()],
 656                cache: false,
 657                reasoning_details: None,
 658            },
 659            LanguageModelRequestMessage {
 660                role: Role::User,
 661                content: vec!["Message 2".into()],
 662                cache: false,
 663                reasoning_details: None,
 664            },
 665            LanguageModelRequestMessage {
 666                role: Role::Assistant,
 667                content: vec!["Response to Message 2".into()],
 668                cache: false,
 669                reasoning_details: None,
 670            },
 671            LanguageModelRequestMessage {
 672                role: Role::User,
 673                content: vec!["Use the echo tool".into()],
 674                cache: false,
 675                reasoning_details: None,
 676            },
 677            LanguageModelRequestMessage {
 678                role: Role::Assistant,
 679                content: vec![MessageContent::ToolUse(tool_use)],
 680                cache: false,
 681                reasoning_details: None,
 682            },
 683            LanguageModelRequestMessage {
 684                role: Role::User,
 685                content: vec![MessageContent::ToolResult(tool_result)],
 686                cache: true,
 687                reasoning_details: None,
 688            }
 689        ]
 690    );
 691}
 692
 693#[gpui::test]
 694#[cfg_attr(not(feature = "e2e"), ignore)]
 695async fn test_basic_tool_calls(cx: &mut TestAppContext) {
 696    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 697
 698    // Test a tool call that's likely to complete *before* streaming stops.
 699    let events = thread
 700        .update(cx, |thread, cx| {
 701            thread.add_tool(EchoTool);
 702            thread.send(
 703                UserMessageId::new(),
 704                ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
 705                cx,
 706            )
 707        })
 708        .unwrap()
 709        .collect()
 710        .await;
 711    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 712
 713    // Test a tool calls that's likely to complete *after* streaming stops.
 714    let events = thread
 715        .update(cx, |thread, cx| {
 716            thread.remove_tool(&EchoTool::NAME);
 717            thread.add_tool(DelayTool);
 718            thread.send(
 719                UserMessageId::new(),
 720                [
 721                    "Now call the delay tool with 200ms.",
 722                    "When the timer goes off, then you echo the output of the tool.",
 723                ],
 724                cx,
 725            )
 726        })
 727        .unwrap()
 728        .collect()
 729        .await;
 730    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
 731    thread.update(cx, |thread, _cx| {
 732        assert!(
 733            thread
 734                .last_received_or_pending_message()
 735                .unwrap()
 736                .as_agent_message()
 737                .unwrap()
 738                .content
 739                .iter()
 740                .any(|content| {
 741                    if let AgentMessageContent::Text(text) = content {
 742                        text.contains("Ding")
 743                    } else {
 744                        false
 745                    }
 746                }),
 747            "{}",
 748            thread.to_markdown()
 749        );
 750    });
 751}
 752
 753#[gpui::test]
 754#[cfg_attr(not(feature = "e2e"), ignore)]
 755async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
 756    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
 757
 758    // Test a tool call that's likely to complete *before* streaming stops.
 759    let mut events = thread
 760        .update(cx, |thread, cx| {
 761            thread.add_tool(WordListTool);
 762            thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
 763        })
 764        .unwrap();
 765
 766    let mut saw_partial_tool_use = false;
 767    while let Some(event) = events.next().await {
 768        if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
 769            thread.update(cx, |thread, _cx| {
 770                // Look for a tool use in the thread's last message
 771                let message = thread.last_received_or_pending_message().unwrap();
 772                let agent_message = message.as_agent_message().unwrap();
 773                let last_content = agent_message.content.last().unwrap();
 774                if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
 775                    assert_eq!(last_tool_use.name.as_ref(), "word_list");
 776                    if tool_call.status == acp::ToolCallStatus::Pending {
 777                        if !last_tool_use.is_input_complete
 778                            && last_tool_use.input.get("g").is_none()
 779                        {
 780                            saw_partial_tool_use = true;
 781                        }
 782                    } else {
 783                        last_tool_use
 784                            .input
 785                            .get("a")
 786                            .expect("'a' has streamed because input is now complete");
 787                        last_tool_use
 788                            .input
 789                            .get("g")
 790                            .expect("'g' has streamed because input is now complete");
 791                    }
 792                } else {
 793                    panic!("last content should be a tool use");
 794                }
 795            });
 796        }
 797    }
 798
 799    assert!(
 800        saw_partial_tool_use,
 801        "should see at least one partially streamed tool use in the history"
 802    );
 803}
 804
 805#[gpui::test]
 806async fn test_tool_authorization(cx: &mut TestAppContext) {
 807    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 808    let fake_model = model.as_fake();
 809
 810    let mut events = thread
 811        .update(cx, |thread, cx| {
 812            thread.add_tool(ToolRequiringPermission);
 813            thread.send(UserMessageId::new(), ["abc"], cx)
 814        })
 815        .unwrap();
 816    cx.run_until_parked();
 817    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 818        LanguageModelToolUse {
 819            id: "tool_id_1".into(),
 820            name: ToolRequiringPermission::NAME.into(),
 821            raw_input: "{}".into(),
 822            input: json!({}),
 823            is_input_complete: true,
 824            thought_signature: None,
 825        },
 826    ));
 827    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 828        LanguageModelToolUse {
 829            id: "tool_id_2".into(),
 830            name: ToolRequiringPermission::NAME.into(),
 831            raw_input: "{}".into(),
 832            input: json!({}),
 833            is_input_complete: true,
 834            thought_signature: None,
 835        },
 836    ));
 837    fake_model.end_last_completion_stream();
 838    let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
 839    let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
 840
 841    // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
 842    tool_call_auth_1
 843        .response
 844        .send(acp::PermissionOptionId::new("allow").into())
 845        .unwrap();
 846    cx.run_until_parked();
 847
 848    // Reject the second - send "deny" option_id directly since Deny is now a button
 849    tool_call_auth_2
 850        .response
 851        .send(acp::PermissionOptionId::new("deny").into())
 852        .unwrap();
 853    cx.run_until_parked();
 854
 855    let completion = fake_model.pending_completions().pop().unwrap();
 856    let message = completion.messages.last().unwrap();
 857    assert_eq!(
 858        message.content,
 859        vec![
 860            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 861                tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
 862                tool_name: ToolRequiringPermission::NAME.into(),
 863                is_error: false,
 864                content: "Allowed".into(),
 865                output: Some("Allowed".into())
 866            }),
 867            language_model::MessageContent::ToolResult(LanguageModelToolResult {
 868                tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
 869                tool_name: ToolRequiringPermission::NAME.into(),
 870                is_error: true,
 871                content: "Permission to run tool denied by user".into(),
 872                output: Some("Permission to run tool denied by user".into())
 873            })
 874        ]
 875    );
 876
 877    // Simulate yet another tool call.
 878    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 879        LanguageModelToolUse {
 880            id: "tool_id_3".into(),
 881            name: ToolRequiringPermission::NAME.into(),
 882            raw_input: "{}".into(),
 883            input: json!({}),
 884            is_input_complete: true,
 885            thought_signature: None,
 886        },
 887    ));
 888    fake_model.end_last_completion_stream();
 889
 890    // Respond by always allowing tools - send transformed option_id
 891    // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
 892    let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
 893    tool_call_auth_3
 894        .response
 895        .send(acp::PermissionOptionId::new("always_allow:tool_requiring_permission").into())
 896        .unwrap();
 897    cx.run_until_parked();
 898    let completion = fake_model.pending_completions().pop().unwrap();
 899    let message = completion.messages.last().unwrap();
 900    assert_eq!(
 901        message.content,
 902        vec![language_model::MessageContent::ToolResult(
 903            LanguageModelToolResult {
 904                tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
 905                tool_name: ToolRequiringPermission::NAME.into(),
 906                is_error: false,
 907                content: "Allowed".into(),
 908                output: Some("Allowed".into())
 909            }
 910        )]
 911    );
 912
 913    // Simulate a final tool call, ensuring we don't trigger authorization.
 914    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 915        LanguageModelToolUse {
 916            id: "tool_id_4".into(),
 917            name: ToolRequiringPermission::NAME.into(),
 918            raw_input: "{}".into(),
 919            input: json!({}),
 920            is_input_complete: true,
 921            thought_signature: None,
 922        },
 923    ));
 924    fake_model.end_last_completion_stream();
 925    cx.run_until_parked();
 926    let completion = fake_model.pending_completions().pop().unwrap();
 927    let message = completion.messages.last().unwrap();
 928    assert_eq!(
 929        message.content,
 930        vec![language_model::MessageContent::ToolResult(
 931            LanguageModelToolResult {
 932                tool_use_id: "tool_id_4".into(),
 933                tool_name: ToolRequiringPermission::NAME.into(),
 934                is_error: false,
 935                content: "Allowed".into(),
 936                output: Some("Allowed".into())
 937            }
 938        )]
 939    );
 940}
 941
 942#[gpui::test]
 943async fn test_tool_hallucination(cx: &mut TestAppContext) {
 944    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
 945    let fake_model = model.as_fake();
 946
 947    let mut events = thread
 948        .update(cx, |thread, cx| {
 949            thread.send(UserMessageId::new(), ["abc"], cx)
 950        })
 951        .unwrap();
 952    cx.run_until_parked();
 953    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
 954        LanguageModelToolUse {
 955            id: "tool_id_1".into(),
 956            name: "nonexistent_tool".into(),
 957            raw_input: "{}".into(),
 958            input: json!({}),
 959            is_input_complete: true,
 960            thought_signature: None,
 961        },
 962    ));
 963    fake_model.end_last_completion_stream();
 964
 965    let tool_call = expect_tool_call(&mut events).await;
 966    assert_eq!(tool_call.title, "nonexistent_tool");
 967    assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
 968    let update = expect_tool_call_update_fields(&mut events).await;
 969    assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
 970}
 971
 972async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
 973    let event = events
 974        .next()
 975        .await
 976        .expect("no tool call authorization event received")
 977        .unwrap();
 978    match event {
 979        ThreadEvent::ToolCall(tool_call) => tool_call,
 980        event => {
 981            panic!("Unexpected event {event:?}");
 982        }
 983    }
 984}
 985
 986async fn expect_tool_call_update_fields(
 987    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
 988) -> acp::ToolCallUpdate {
 989    let event = events
 990        .next()
 991        .await
 992        .expect("no tool call authorization event received")
 993        .unwrap();
 994    match event {
 995        ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
 996        event => {
 997            panic!("Unexpected event {event:?}");
 998        }
 999    }
1000}
1001
1002async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1003    let event = events
1004        .next()
1005        .await
1006        .expect("no plan event received")
1007        .unwrap();
1008    match event {
1009        ThreadEvent::Plan(plan) => plan,
1010        event => {
1011            panic!("Unexpected event {event:?}");
1012        }
1013    }
1014}
1015
1016async fn next_tool_call_authorization(
1017    events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1018) -> ToolCallAuthorization {
1019    loop {
1020        let event = events
1021            .next()
1022            .await
1023            .expect("no tool call authorization event received")
1024            .unwrap();
1025        if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1026            let permission_kinds = tool_call_authorization
1027                .options
1028                .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1029                .map(|option| option.kind);
1030            let allow_once = tool_call_authorization
1031                .options
1032                .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1033                .map(|option| option.kind);
1034
1035            assert_eq!(
1036                permission_kinds,
1037                Some(acp::PermissionOptionKind::AllowAlways)
1038            );
1039            assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1040            return tool_call_authorization;
1041        }
1042    }
1043}
1044
1045#[test]
1046fn test_permission_options_terminal_with_pattern() {
1047    let permission_options = ToolPermissionContext::new(
1048        TerminalTool::NAME,
1049        vec!["cargo build --release".to_string()],
1050    )
1051    .build_permission_options();
1052
1053    let PermissionOptions::Dropdown(choices) = permission_options else {
1054        panic!("Expected dropdown permission options");
1055    };
1056
1057    assert_eq!(choices.len(), 3);
1058    let labels: Vec<&str> = choices
1059        .iter()
1060        .map(|choice| choice.allow.name.as_ref())
1061        .collect();
1062    assert!(labels.contains(&"Always for terminal"));
1063    assert!(labels.contains(&"Always for `cargo build` commands"));
1064    assert!(labels.contains(&"Only this time"));
1065}
1066
1067#[test]
1068fn test_permission_options_terminal_command_with_flag_second_token() {
1069    let permission_options =
1070        ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1071            .build_permission_options();
1072
1073    let PermissionOptions::Dropdown(choices) = permission_options else {
1074        panic!("Expected dropdown permission options");
1075    };
1076
1077    assert_eq!(choices.len(), 3);
1078    let labels: Vec<&str> = choices
1079        .iter()
1080        .map(|choice| choice.allow.name.as_ref())
1081        .collect();
1082    assert!(labels.contains(&"Always for terminal"));
1083    assert!(labels.contains(&"Always for `ls` commands"));
1084    assert!(labels.contains(&"Only this time"));
1085}
1086
1087#[test]
1088fn test_permission_options_terminal_single_word_command() {
1089    let permission_options =
1090        ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1091            .build_permission_options();
1092
1093    let PermissionOptions::Dropdown(choices) = permission_options else {
1094        panic!("Expected dropdown permission options");
1095    };
1096
1097    assert_eq!(choices.len(), 3);
1098    let labels: Vec<&str> = choices
1099        .iter()
1100        .map(|choice| choice.allow.name.as_ref())
1101        .collect();
1102    assert!(labels.contains(&"Always for terminal"));
1103    assert!(labels.contains(&"Always for `whoami` commands"));
1104    assert!(labels.contains(&"Only this time"));
1105}
1106
1107#[test]
1108fn test_permission_options_edit_file_with_path_pattern() {
1109    let permission_options =
1110        ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1111            .build_permission_options();
1112
1113    let PermissionOptions::Dropdown(choices) = permission_options else {
1114        panic!("Expected dropdown permission options");
1115    };
1116
1117    let labels: Vec<&str> = choices
1118        .iter()
1119        .map(|choice| choice.allow.name.as_ref())
1120        .collect();
1121    assert!(labels.contains(&"Always for edit file"));
1122    assert!(labels.contains(&"Always for `src/`"));
1123}
1124
1125#[test]
1126fn test_permission_options_fetch_with_domain_pattern() {
1127    let permission_options =
1128        ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1129            .build_permission_options();
1130
1131    let PermissionOptions::Dropdown(choices) = permission_options else {
1132        panic!("Expected dropdown permission options");
1133    };
1134
1135    let labels: Vec<&str> = choices
1136        .iter()
1137        .map(|choice| choice.allow.name.as_ref())
1138        .collect();
1139    assert!(labels.contains(&"Always for fetch"));
1140    assert!(labels.contains(&"Always for `docs.rs`"));
1141}
1142
1143#[test]
1144fn test_permission_options_without_pattern() {
1145    let permission_options = ToolPermissionContext::new(
1146        TerminalTool::NAME,
1147        vec!["./deploy.sh --production".to_string()],
1148    )
1149    .build_permission_options();
1150
1151    let PermissionOptions::Dropdown(choices) = permission_options else {
1152        panic!("Expected dropdown permission options");
1153    };
1154
1155    assert_eq!(choices.len(), 2);
1156    let labels: Vec<&str> = choices
1157        .iter()
1158        .map(|choice| choice.allow.name.as_ref())
1159        .collect();
1160    assert!(labels.contains(&"Always for terminal"));
1161    assert!(labels.contains(&"Only this time"));
1162    assert!(!labels.iter().any(|label| label.contains("commands")));
1163}
1164
1165#[test]
1166fn test_permission_options_symlink_target_are_flat_once_only() {
1167    let permission_options =
1168        ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1169            .build_permission_options();
1170
1171    let PermissionOptions::Flat(options) = permission_options else {
1172        panic!("Expected flat permission options for symlink target authorization");
1173    };
1174
1175    assert_eq!(options.len(), 2);
1176    assert!(options.iter().any(|option| {
1177        option.option_id.0.as_ref() == "allow"
1178            && option.kind == acp::PermissionOptionKind::AllowOnce
1179    }));
1180    assert!(options.iter().any(|option| {
1181        option.option_id.0.as_ref() == "deny"
1182            && option.kind == acp::PermissionOptionKind::RejectOnce
1183    }));
1184}
1185
1186#[test]
1187fn test_permission_option_ids_for_terminal() {
1188    let permission_options = ToolPermissionContext::new(
1189        TerminalTool::NAME,
1190        vec!["cargo build --release".to_string()],
1191    )
1192    .build_permission_options();
1193
1194    let PermissionOptions::Dropdown(choices) = permission_options else {
1195        panic!("Expected dropdown permission options");
1196    };
1197
1198    // Expect 3 choices: always-tool, always-pattern, once
1199    assert_eq!(choices.len(), 3);
1200
1201    // First two choices both use the tool-level option IDs
1202    assert_eq!(
1203        choices[0].allow.option_id.0.as_ref(),
1204        "always_allow:terminal"
1205    );
1206    assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1207    assert!(choices[0].sub_patterns.is_empty());
1208
1209    assert_eq!(
1210        choices[1].allow.option_id.0.as_ref(),
1211        "always_allow:terminal"
1212    );
1213    assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1214    assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1215
1216    // Third choice is the one-time allow/deny
1217    assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1218    assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1219    assert!(choices[2].sub_patterns.is_empty());
1220}
1221
1222#[test]
1223fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1224    let permission_options = ToolPermissionContext::new(
1225        TerminalTool::NAME,
1226        vec!["cargo test 2>&1 | tail".to_string()],
1227    )
1228    .build_permission_options();
1229
1230    let PermissionOptions::DropdownWithPatterns {
1231        choices,
1232        patterns,
1233        tool_name,
1234    } = permission_options
1235    else {
1236        panic!("Expected DropdownWithPatterns permission options for pipeline command");
1237    };
1238
1239    assert_eq!(tool_name, TerminalTool::NAME);
1240
1241    // Should have "Always for terminal" and "Only this time" choices
1242    assert_eq!(choices.len(), 2);
1243    let labels: Vec<&str> = choices
1244        .iter()
1245        .map(|choice| choice.allow.name.as_ref())
1246        .collect();
1247    assert!(labels.contains(&"Always for terminal"));
1248    assert!(labels.contains(&"Only this time"));
1249
1250    // Should have per-command patterns for "cargo test" and "tail"
1251    assert_eq!(patterns.len(), 2);
1252    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1253    assert!(pattern_names.contains(&"cargo test"));
1254    assert!(pattern_names.contains(&"tail"));
1255
1256    // Verify patterns are valid regex patterns
1257    let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1258    assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1259    assert!(regex_patterns.contains(&"^tail\\b"));
1260}
1261
1262#[test]
1263fn test_permission_options_terminal_pipeline_with_chaining() {
1264    let permission_options = ToolPermissionContext::new(
1265        TerminalTool::NAME,
1266        vec!["npm install && npm test | tail".to_string()],
1267    )
1268    .build_permission_options();
1269
1270    let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1271        panic!("Expected DropdownWithPatterns for chained pipeline command");
1272    };
1273
1274    // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1275    assert_eq!(patterns.len(), 3);
1276    let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1277    assert!(pattern_names.contains(&"npm install"));
1278    assert!(pattern_names.contains(&"npm test"));
1279    assert!(pattern_names.contains(&"tail"));
1280}
1281
1282#[gpui::test]
1283#[cfg_attr(not(feature = "e2e"), ignore)]
1284async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1285    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1286
1287    // Test concurrent tool calls with different delay times
1288    let events = thread
1289        .update(cx, |thread, cx| {
1290            thread.add_tool(DelayTool);
1291            thread.send(
1292                UserMessageId::new(),
1293                [
1294                    "Call the delay tool twice in the same message.",
1295                    "Once with 100ms. Once with 300ms.",
1296                    "When both timers are complete, describe the outputs.",
1297                ],
1298                cx,
1299            )
1300        })
1301        .unwrap()
1302        .collect()
1303        .await;
1304
1305    let stop_reasons = stop_events(events);
1306    assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1307
1308    thread.update(cx, |thread, _cx| {
1309        let last_message = thread.last_received_or_pending_message().unwrap();
1310        let agent_message = last_message.as_agent_message().unwrap();
1311        let text = agent_message
1312            .content
1313            .iter()
1314            .filter_map(|content| {
1315                if let AgentMessageContent::Text(text) = content {
1316                    Some(text.as_str())
1317                } else {
1318                    None
1319                }
1320            })
1321            .collect::<String>();
1322
1323        assert!(text.contains("Ding"));
1324    });
1325}
1326
1327#[gpui::test]
1328async fn test_profiles(cx: &mut TestAppContext) {
1329    let ThreadTest {
1330        model, thread, fs, ..
1331    } = setup(cx, TestModel::Fake).await;
1332    let fake_model = model.as_fake();
1333
1334    thread.update(cx, |thread, _cx| {
1335        thread.add_tool(DelayTool);
1336        thread.add_tool(EchoTool);
1337        thread.add_tool(InfiniteTool);
1338    });
1339
1340    // Override profiles and wait for settings to be loaded.
1341    fs.insert_file(
1342        paths::settings_file(),
1343        json!({
1344            "agent": {
1345                "profiles": {
1346                    "test-1": {
1347                        "name": "Test Profile 1",
1348                        "tools": {
1349                            EchoTool::NAME: true,
1350                            DelayTool::NAME: true,
1351                        }
1352                    },
1353                    "test-2": {
1354                        "name": "Test Profile 2",
1355                        "tools": {
1356                            InfiniteTool::NAME: true,
1357                        }
1358                    }
1359                }
1360            }
1361        })
1362        .to_string()
1363        .into_bytes(),
1364    )
1365    .await;
1366    cx.run_until_parked();
1367
1368    // Test that test-1 profile (default) has echo and delay tools
1369    thread
1370        .update(cx, |thread, cx| {
1371            thread.set_profile(AgentProfileId("test-1".into()), cx);
1372            thread.send(UserMessageId::new(), ["test"], cx)
1373        })
1374        .unwrap();
1375    cx.run_until_parked();
1376
1377    let mut pending_completions = fake_model.pending_completions();
1378    assert_eq!(pending_completions.len(), 1);
1379    let completion = pending_completions.pop().unwrap();
1380    let tool_names: Vec<String> = completion
1381        .tools
1382        .iter()
1383        .map(|tool| tool.name.clone())
1384        .collect();
1385    assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1386    fake_model.end_last_completion_stream();
1387
1388    // Switch to test-2 profile, and verify that it has only the infinite tool.
1389    thread
1390        .update(cx, |thread, cx| {
1391            thread.set_profile(AgentProfileId("test-2".into()), cx);
1392            thread.send(UserMessageId::new(), ["test2"], cx)
1393        })
1394        .unwrap();
1395    cx.run_until_parked();
1396    let mut pending_completions = fake_model.pending_completions();
1397    assert_eq!(pending_completions.len(), 1);
1398    let completion = pending_completions.pop().unwrap();
1399    let tool_names: Vec<String> = completion
1400        .tools
1401        .iter()
1402        .map(|tool| tool.name.clone())
1403        .collect();
1404    assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1405}
1406
1407#[gpui::test]
1408async fn test_mcp_tools(cx: &mut TestAppContext) {
1409    let ThreadTest {
1410        model,
1411        thread,
1412        context_server_store,
1413        fs,
1414        ..
1415    } = setup(cx, TestModel::Fake).await;
1416    let fake_model = model.as_fake();
1417
1418    // Override profiles and wait for settings to be loaded.
1419    fs.insert_file(
1420        paths::settings_file(),
1421        json!({
1422            "agent": {
1423                "tool_permissions": { "default": "allow" },
1424                "profiles": {
1425                    "test": {
1426                        "name": "Test Profile",
1427                        "enable_all_context_servers": true,
1428                        "tools": {
1429                            EchoTool::NAME: true,
1430                        }
1431                    },
1432                }
1433            }
1434        })
1435        .to_string()
1436        .into_bytes(),
1437    )
1438    .await;
1439    cx.run_until_parked();
1440    thread.update(cx, |thread, cx| {
1441        thread.set_profile(AgentProfileId("test".into()), cx)
1442    });
1443
1444    let mut mcp_tool_calls = setup_context_server(
1445        "test_server",
1446        vec![context_server::types::Tool {
1447            name: "echo".into(),
1448            description: None,
1449            input_schema: serde_json::to_value(EchoTool::input_schema(
1450                LanguageModelToolSchemaFormat::JsonSchema,
1451            ))
1452            .unwrap(),
1453            output_schema: None,
1454            annotations: None,
1455        }],
1456        &context_server_store,
1457        cx,
1458    );
1459
1460    let events = thread.update(cx, |thread, cx| {
1461        thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1462    });
1463    cx.run_until_parked();
1464
1465    // Simulate the model calling the MCP tool.
1466    let completion = fake_model.pending_completions().pop().unwrap();
1467    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1468    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1469        LanguageModelToolUse {
1470            id: "tool_1".into(),
1471            name: "echo".into(),
1472            raw_input: json!({"text": "test"}).to_string(),
1473            input: json!({"text": "test"}),
1474            is_input_complete: true,
1475            thought_signature: None,
1476        },
1477    ));
1478    fake_model.end_last_completion_stream();
1479    cx.run_until_parked();
1480
1481    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1482    assert_eq!(tool_call_params.name, "echo");
1483    assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1484    tool_call_response
1485        .send(context_server::types::CallToolResponse {
1486            content: vec![context_server::types::ToolResponseContent::Text {
1487                text: "test".into(),
1488            }],
1489            is_error: None,
1490            meta: None,
1491            structured_content: None,
1492        })
1493        .unwrap();
1494    cx.run_until_parked();
1495
1496    assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1497    fake_model.send_last_completion_stream_text_chunk("Done!");
1498    fake_model.end_last_completion_stream();
1499    events.collect::<Vec<_>>().await;
1500
1501    // Send again after adding the echo tool, ensuring the name collision is resolved.
1502    let events = thread.update(cx, |thread, cx| {
1503        thread.add_tool(EchoTool);
1504        thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1505    });
1506    cx.run_until_parked();
1507    let completion = fake_model.pending_completions().pop().unwrap();
1508    assert_eq!(
1509        tool_names_for_completion(&completion),
1510        vec!["echo", "test_server_echo"]
1511    );
1512    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1513        LanguageModelToolUse {
1514            id: "tool_2".into(),
1515            name: "test_server_echo".into(),
1516            raw_input: json!({"text": "mcp"}).to_string(),
1517            input: json!({"text": "mcp"}),
1518            is_input_complete: true,
1519            thought_signature: None,
1520        },
1521    ));
1522    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1523        LanguageModelToolUse {
1524            id: "tool_3".into(),
1525            name: "echo".into(),
1526            raw_input: json!({"text": "native"}).to_string(),
1527            input: json!({"text": "native"}),
1528            is_input_complete: true,
1529            thought_signature: None,
1530        },
1531    ));
1532    fake_model.end_last_completion_stream();
1533    cx.run_until_parked();
1534
1535    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1536    assert_eq!(tool_call_params.name, "echo");
1537    assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1538    tool_call_response
1539        .send(context_server::types::CallToolResponse {
1540            content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1541            is_error: None,
1542            meta: None,
1543            structured_content: None,
1544        })
1545        .unwrap();
1546    cx.run_until_parked();
1547
1548    // Ensure the tool results were inserted with the correct names.
1549    let completion = fake_model.pending_completions().pop().unwrap();
1550    assert_eq!(
1551        completion.messages.last().unwrap().content,
1552        vec![
1553            MessageContent::ToolResult(LanguageModelToolResult {
1554                tool_use_id: "tool_3".into(),
1555                tool_name: "echo".into(),
1556                is_error: false,
1557                content: "native".into(),
1558                output: Some("native".into()),
1559            },),
1560            MessageContent::ToolResult(LanguageModelToolResult {
1561                tool_use_id: "tool_2".into(),
1562                tool_name: "test_server_echo".into(),
1563                is_error: false,
1564                content: "mcp".into(),
1565                output: Some("mcp".into()),
1566            },),
1567        ]
1568    );
1569    fake_model.end_last_completion_stream();
1570    events.collect::<Vec<_>>().await;
1571}
1572
1573#[gpui::test]
1574async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1575    let ThreadTest {
1576        model,
1577        thread,
1578        context_server_store,
1579        fs,
1580        ..
1581    } = setup(cx, TestModel::Fake).await;
1582    let fake_model = model.as_fake();
1583
1584    // Setup settings to allow MCP tools
1585    fs.insert_file(
1586        paths::settings_file(),
1587        json!({
1588            "agent": {
1589                "always_allow_tool_actions": true,
1590                "profiles": {
1591                    "test": {
1592                        "name": "Test Profile",
1593                        "enable_all_context_servers": true,
1594                        "tools": {}
1595                    },
1596                }
1597            }
1598        })
1599        .to_string()
1600        .into_bytes(),
1601    )
1602    .await;
1603    cx.run_until_parked();
1604    thread.update(cx, |thread, cx| {
1605        thread.set_profile(AgentProfileId("test".into()), cx)
1606    });
1607
1608    // Setup a context server with a tool
1609    let mut mcp_tool_calls = setup_context_server(
1610        "github_server",
1611        vec![context_server::types::Tool {
1612            name: "issue_read".into(),
1613            description: Some("Read a GitHub issue".into()),
1614            input_schema: json!({
1615                "type": "object",
1616                "properties": {
1617                    "issue_url": { "type": "string" }
1618                }
1619            }),
1620            output_schema: None,
1621            annotations: None,
1622        }],
1623        &context_server_store,
1624        cx,
1625    );
1626
1627    // Send a message and have the model call the MCP tool
1628    let events = thread.update(cx, |thread, cx| {
1629        thread
1630            .send(UserMessageId::new(), ["Read issue #47404"], cx)
1631            .unwrap()
1632    });
1633    cx.run_until_parked();
1634
1635    // Verify the MCP tool is available to the model
1636    let completion = fake_model.pending_completions().pop().unwrap();
1637    assert_eq!(
1638        tool_names_for_completion(&completion),
1639        vec!["issue_read"],
1640        "MCP tool should be available"
1641    );
1642
1643    // Simulate the model calling the MCP tool
1644    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1645        LanguageModelToolUse {
1646            id: "tool_1".into(),
1647            name: "issue_read".into(),
1648            raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1649                .to_string(),
1650            input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1651            is_input_complete: true,
1652            thought_signature: None,
1653        },
1654    ));
1655    fake_model.end_last_completion_stream();
1656    cx.run_until_parked();
1657
1658    // The MCP server receives the tool call and responds with content
1659    let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1660    let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1661    assert_eq!(tool_call_params.name, "issue_read");
1662    tool_call_response
1663        .send(context_server::types::CallToolResponse {
1664            content: vec![context_server::types::ToolResponseContent::Text {
1665                text: expected_tool_output.into(),
1666            }],
1667            is_error: None,
1668            meta: None,
1669            structured_content: None,
1670        })
1671        .unwrap();
1672    cx.run_until_parked();
1673
1674    // After tool completes, the model continues with a new completion request
1675    // that includes the tool results. We need to respond to this.
1676    let _completion = fake_model.pending_completions().pop().unwrap();
1677    fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1678    fake_model
1679        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1680    fake_model.end_last_completion_stream();
1681    events.collect::<Vec<_>>().await;
1682
1683    // Verify the tool result is stored in the thread by checking the markdown output.
1684    // The tool result is in the first assistant message (not the last one, which is
1685    // the model's response after the tool completed).
1686    thread.update(cx, |thread, _cx| {
1687        let markdown = thread.to_markdown();
1688        assert!(
1689            markdown.contains("**Tool Result**: issue_read"),
1690            "Thread should contain tool result header"
1691        );
1692        assert!(
1693            markdown.contains(expected_tool_output),
1694            "Thread should contain tool output: {}",
1695            expected_tool_output
1696        );
1697    });
1698
1699    // Simulate app restart: disconnect the MCP server.
1700    // After restart, the MCP server won't be connected yet when the thread is replayed.
1701    context_server_store.update(cx, |store, cx| {
1702        let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1703    });
1704    cx.run_until_parked();
1705
1706    // Replay the thread (this is what happens when loading a saved thread)
1707    let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1708
1709    let mut found_tool_call = None;
1710    let mut found_tool_call_update_with_output = None;
1711
1712    while let Some(event) = replay_events.next().await {
1713        let event = event.unwrap();
1714        match &event {
1715            ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1716                found_tool_call = Some(tc.clone());
1717            }
1718            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1719                if update.tool_call_id.to_string() == "tool_1" =>
1720            {
1721                if update.fields.raw_output.is_some() {
1722                    found_tool_call_update_with_output = Some(update.clone());
1723                }
1724            }
1725            _ => {}
1726        }
1727    }
1728
1729    // The tool call should be found
1730    assert!(
1731        found_tool_call.is_some(),
1732        "Tool call should be emitted during replay"
1733    );
1734
1735    assert!(
1736        found_tool_call_update_with_output.is_some(),
1737        "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1738    );
1739
1740    let update = found_tool_call_update_with_output.unwrap();
1741    assert_eq!(
1742        update.fields.raw_output,
1743        Some(expected_tool_output.into()),
1744        "raw_output should contain the saved tool result"
1745    );
1746
1747    // Also verify the status is correct (completed, not failed)
1748    assert_eq!(
1749        update.fields.status,
1750        Some(acp::ToolCallStatus::Completed),
1751        "Tool call status should reflect the original completion status"
1752    );
1753}
1754
1755#[gpui::test]
1756async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1757    let ThreadTest {
1758        model,
1759        thread,
1760        context_server_store,
1761        fs,
1762        ..
1763    } = setup(cx, TestModel::Fake).await;
1764    let fake_model = model.as_fake();
1765
1766    // Set up a profile with all tools enabled
1767    fs.insert_file(
1768        paths::settings_file(),
1769        json!({
1770            "agent": {
1771                "profiles": {
1772                    "test": {
1773                        "name": "Test Profile",
1774                        "enable_all_context_servers": true,
1775                        "tools": {
1776                            EchoTool::NAME: true,
1777                            DelayTool::NAME: true,
1778                            WordListTool::NAME: true,
1779                            ToolRequiringPermission::NAME: true,
1780                            InfiniteTool::NAME: true,
1781                        }
1782                    },
1783                }
1784            }
1785        })
1786        .to_string()
1787        .into_bytes(),
1788    )
1789    .await;
1790    cx.run_until_parked();
1791
1792    thread.update(cx, |thread, cx| {
1793        thread.set_profile(AgentProfileId("test".into()), cx);
1794        thread.add_tool(EchoTool);
1795        thread.add_tool(DelayTool);
1796        thread.add_tool(WordListTool);
1797        thread.add_tool(ToolRequiringPermission);
1798        thread.add_tool(InfiniteTool);
1799    });
1800
1801    // Set up multiple context servers with some overlapping tool names
1802    let _server1_calls = setup_context_server(
1803        "xxx",
1804        vec![
1805            context_server::types::Tool {
1806                name: "echo".into(), // Conflicts with native EchoTool
1807                description: None,
1808                input_schema: serde_json::to_value(EchoTool::input_schema(
1809                    LanguageModelToolSchemaFormat::JsonSchema,
1810                ))
1811                .unwrap(),
1812                output_schema: None,
1813                annotations: None,
1814            },
1815            context_server::types::Tool {
1816                name: "unique_tool_1".into(),
1817                description: None,
1818                input_schema: json!({"type": "object", "properties": {}}),
1819                output_schema: None,
1820                annotations: None,
1821            },
1822        ],
1823        &context_server_store,
1824        cx,
1825    );
1826
1827    let _server2_calls = setup_context_server(
1828        "yyy",
1829        vec![
1830            context_server::types::Tool {
1831                name: "echo".into(), // Also conflicts with native EchoTool
1832                description: None,
1833                input_schema: serde_json::to_value(EchoTool::input_schema(
1834                    LanguageModelToolSchemaFormat::JsonSchema,
1835                ))
1836                .unwrap(),
1837                output_schema: None,
1838                annotations: None,
1839            },
1840            context_server::types::Tool {
1841                name: "unique_tool_2".into(),
1842                description: None,
1843                input_schema: json!({"type": "object", "properties": {}}),
1844                output_schema: None,
1845                annotations: None,
1846            },
1847            context_server::types::Tool {
1848                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1849                description: None,
1850                input_schema: json!({"type": "object", "properties": {}}),
1851                output_schema: None,
1852                annotations: None,
1853            },
1854            context_server::types::Tool {
1855                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1856                description: None,
1857                input_schema: json!({"type": "object", "properties": {}}),
1858                output_schema: None,
1859                annotations: None,
1860            },
1861        ],
1862        &context_server_store,
1863        cx,
1864    );
1865    let _server3_calls = setup_context_server(
1866        "zzz",
1867        vec![
1868            context_server::types::Tool {
1869                name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1870                description: None,
1871                input_schema: json!({"type": "object", "properties": {}}),
1872                output_schema: None,
1873                annotations: None,
1874            },
1875            context_server::types::Tool {
1876                name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1877                description: None,
1878                input_schema: json!({"type": "object", "properties": {}}),
1879                output_schema: None,
1880                annotations: None,
1881            },
1882            context_server::types::Tool {
1883                name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1884                description: None,
1885                input_schema: json!({"type": "object", "properties": {}}),
1886                output_schema: None,
1887                annotations: None,
1888            },
1889        ],
1890        &context_server_store,
1891        cx,
1892    );
1893
1894    // Server with spaces in name - tests snake_case conversion for API compatibility
1895    let _server4_calls = setup_context_server(
1896        "Azure DevOps",
1897        vec![context_server::types::Tool {
1898            name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1899            description: None,
1900            input_schema: serde_json::to_value(EchoTool::input_schema(
1901                LanguageModelToolSchemaFormat::JsonSchema,
1902            ))
1903            .unwrap(),
1904            output_schema: None,
1905            annotations: None,
1906        }],
1907        &context_server_store,
1908        cx,
1909    );
1910
1911    thread
1912        .update(cx, |thread, cx| {
1913            thread.send(UserMessageId::new(), ["Go"], cx)
1914        })
1915        .unwrap();
1916    cx.run_until_parked();
1917    let completion = fake_model.pending_completions().pop().unwrap();
1918    assert_eq!(
1919        tool_names_for_completion(&completion),
1920        vec![
1921            "azure_dev_ops_echo",
1922            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1923            "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1924            "delay",
1925            "echo",
1926            "infinite",
1927            "tool_requiring_permission",
1928            "unique_tool_1",
1929            "unique_tool_2",
1930            "word_list",
1931            "xxx_echo",
1932            "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1933            "yyy_echo",
1934            "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1935        ]
1936    );
1937}
1938
1939#[gpui::test]
1940#[cfg_attr(not(feature = "e2e"), ignore)]
1941async fn test_cancellation(cx: &mut TestAppContext) {
1942    let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1943
1944    let mut events = thread
1945        .update(cx, |thread, cx| {
1946            thread.add_tool(InfiniteTool);
1947            thread.add_tool(EchoTool);
1948            thread.send(
1949                UserMessageId::new(),
1950                ["Call the echo tool, then call the infinite tool, then explain their output"],
1951                cx,
1952            )
1953        })
1954        .unwrap();
1955
1956    // Wait until both tools are called.
1957    let mut expected_tools = vec!["Echo", "Infinite Tool"];
1958    let mut echo_id = None;
1959    let mut echo_completed = false;
1960    while let Some(event) = events.next().await {
1961        match event.unwrap() {
1962            ThreadEvent::ToolCall(tool_call) => {
1963                assert_eq!(tool_call.title, expected_tools.remove(0));
1964                if tool_call.title == "Echo" {
1965                    echo_id = Some(tool_call.tool_call_id);
1966                }
1967            }
1968            ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1969                acp::ToolCallUpdate {
1970                    tool_call_id,
1971                    fields:
1972                        acp::ToolCallUpdateFields {
1973                            status: Some(acp::ToolCallStatus::Completed),
1974                            ..
1975                        },
1976                    ..
1977                },
1978            )) if Some(&tool_call_id) == echo_id.as_ref() => {
1979                echo_completed = true;
1980            }
1981            _ => {}
1982        }
1983
1984        if expected_tools.is_empty() && echo_completed {
1985            break;
1986        }
1987    }
1988
1989    // Cancel the current send and ensure that the event stream is closed, even
1990    // if one of the tools is still running.
1991    thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1992    let events = events.collect::<Vec<_>>().await;
1993    let last_event = events.last();
1994    assert!(
1995        matches!(
1996            last_event,
1997            Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1998        ),
1999        "unexpected event {last_event:?}"
2000    );
2001
2002    // Ensure we can still send a new message after cancellation.
2003    let events = thread
2004        .update(cx, |thread, cx| {
2005            thread.send(
2006                UserMessageId::new(),
2007                ["Testing: reply with 'Hello' then stop."],
2008                cx,
2009            )
2010        })
2011        .unwrap()
2012        .collect::<Vec<_>>()
2013        .await;
2014    thread.update(cx, |thread, _cx| {
2015        let message = thread.last_received_or_pending_message().unwrap();
2016        let agent_message = message.as_agent_message().unwrap();
2017        assert_eq!(
2018            agent_message.content,
2019            vec![AgentMessageContent::Text("Hello".to_string())]
2020        );
2021    });
2022    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2023}
2024
2025#[gpui::test]
2026async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2027    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2028    always_allow_tools(cx);
2029    let fake_model = model.as_fake();
2030
2031    let environment = Rc::new(cx.update(|cx| {
2032        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2033    }));
2034    let handle = environment.terminal_handle.clone().unwrap();
2035
2036    let mut events = thread
2037        .update(cx, |thread, cx| {
2038            thread.add_tool(crate::TerminalTool::new(
2039                thread.project().clone(),
2040                environment,
2041            ));
2042            thread.send(UserMessageId::new(), ["run a command"], cx)
2043        })
2044        .unwrap();
2045
2046    cx.run_until_parked();
2047
2048    // Simulate the model calling the terminal tool
2049    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2050        LanguageModelToolUse {
2051            id: "terminal_tool_1".into(),
2052            name: TerminalTool::NAME.into(),
2053            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2054            input: json!({"command": "sleep 1000", "cd": "."}),
2055            is_input_complete: true,
2056            thought_signature: None,
2057        },
2058    ));
2059    fake_model.end_last_completion_stream();
2060
2061    // Wait for the terminal tool to start running
2062    wait_for_terminal_tool_started(&mut events, cx).await;
2063
2064    // Cancel the thread while the terminal is running
2065    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2066
2067    // Collect remaining events, driving the executor to let cancellation complete
2068    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2069
2070    // Verify the terminal was killed
2071    assert!(
2072        handle.was_killed(),
2073        "expected terminal handle to be killed on cancellation"
2074    );
2075
2076    // Verify we got a cancellation stop event
2077    assert_eq!(
2078        stop_events(remaining_events),
2079        vec![acp::StopReason::Cancelled],
2080    );
2081
2082    // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2083    thread.update(cx, |thread, _cx| {
2084        let message = thread.last_received_or_pending_message().unwrap();
2085        let agent_message = message.as_agent_message().unwrap();
2086
2087        let tool_use = agent_message
2088            .content
2089            .iter()
2090            .find_map(|content| match content {
2091                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2092                _ => None,
2093            })
2094            .expect("expected tool use in agent message");
2095
2096        let tool_result = agent_message
2097            .tool_results
2098            .get(&tool_use.id)
2099            .expect("expected tool result");
2100
2101        let result_text = match &tool_result.content {
2102            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2103            _ => panic!("expected text content in tool result"),
2104        };
2105
2106        // "partial output" comes from FakeTerminalHandle's output field
2107        assert!(
2108            result_text.contains("partial output"),
2109            "expected tool result to contain terminal output, got: {result_text}"
2110        );
2111        // Match the actual format from process_content in terminal_tool.rs
2112        assert!(
2113            result_text.contains("The user stopped this command"),
2114            "expected tool result to indicate user stopped, got: {result_text}"
2115        );
2116    });
2117
2118    // Verify we can send a new message after cancellation
2119    verify_thread_recovery(&thread, &fake_model, cx).await;
2120}
2121
2122#[gpui::test]
2123async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2124    // This test verifies that tools which properly handle cancellation via
2125    // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2126    // to cancellation and report that they were cancelled.
2127    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2128    always_allow_tools(cx);
2129    let fake_model = model.as_fake();
2130
2131    let (tool, was_cancelled) = CancellationAwareTool::new();
2132
2133    let mut events = thread
2134        .update(cx, |thread, cx| {
2135            thread.add_tool(tool);
2136            thread.send(
2137                UserMessageId::new(),
2138                ["call the cancellation aware tool"],
2139                cx,
2140            )
2141        })
2142        .unwrap();
2143
2144    cx.run_until_parked();
2145
2146    // Simulate the model calling the cancellation-aware tool
2147    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2148        LanguageModelToolUse {
2149            id: "cancellation_aware_1".into(),
2150            name: "cancellation_aware".into(),
2151            raw_input: r#"{}"#.into(),
2152            input: json!({}),
2153            is_input_complete: true,
2154            thought_signature: None,
2155        },
2156    ));
2157    fake_model.end_last_completion_stream();
2158
2159    cx.run_until_parked();
2160
2161    // Wait for the tool call to be reported
2162    let mut tool_started = false;
2163    let deadline = cx.executor().num_cpus() * 100;
2164    for _ in 0..deadline {
2165        cx.run_until_parked();
2166
2167        while let Some(Some(event)) = events.next().now_or_never() {
2168            if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2169                if tool_call.title == "Cancellation Aware Tool" {
2170                    tool_started = true;
2171                    break;
2172                }
2173            }
2174        }
2175
2176        if tool_started {
2177            break;
2178        }
2179
2180        cx.background_executor
2181            .timer(Duration::from_millis(10))
2182            .await;
2183    }
2184    assert!(tool_started, "expected cancellation aware tool to start");
2185
2186    // Cancel the thread and wait for it to complete
2187    let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2188
2189    // The cancel task should complete promptly because the tool handles cancellation
2190    let timeout = cx.background_executor.timer(Duration::from_secs(5));
2191    futures::select! {
2192        _ = cancel_task.fuse() => {}
2193        _ = timeout.fuse() => {
2194            panic!("cancel task timed out - tool did not respond to cancellation");
2195        }
2196    }
2197
2198    // Verify the tool detected cancellation via its flag
2199    assert!(
2200        was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2201        "tool should have detected cancellation via event_stream.cancelled_by_user()"
2202    );
2203
2204    // Collect remaining events
2205    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2206
2207    // Verify we got a cancellation stop event
2208    assert_eq!(
2209        stop_events(remaining_events),
2210        vec![acp::StopReason::Cancelled],
2211    );
2212
2213    // Verify we can send a new message after cancellation
2214    verify_thread_recovery(&thread, &fake_model, cx).await;
2215}
2216
2217/// Helper to verify thread can recover after cancellation by sending a simple message.
2218async fn verify_thread_recovery(
2219    thread: &Entity<Thread>,
2220    fake_model: &FakeLanguageModel,
2221    cx: &mut TestAppContext,
2222) {
2223    let events = thread
2224        .update(cx, |thread, cx| {
2225            thread.send(
2226                UserMessageId::new(),
2227                ["Testing: reply with 'Hello' then stop."],
2228                cx,
2229            )
2230        })
2231        .unwrap();
2232    cx.run_until_parked();
2233    fake_model.send_last_completion_stream_text_chunk("Hello");
2234    fake_model
2235        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2236    fake_model.end_last_completion_stream();
2237
2238    let events = events.collect::<Vec<_>>().await;
2239    thread.update(cx, |thread, _cx| {
2240        let message = thread.last_received_or_pending_message().unwrap();
2241        let agent_message = message.as_agent_message().unwrap();
2242        assert_eq!(
2243            agent_message.content,
2244            vec![AgentMessageContent::Text("Hello".to_string())]
2245        );
2246    });
2247    assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2248}
2249
2250/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2251async fn wait_for_terminal_tool_started(
2252    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2253    cx: &mut TestAppContext,
2254) {
2255    let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2256    for _ in 0..deadline {
2257        cx.run_until_parked();
2258
2259        while let Some(Some(event)) = events.next().now_or_never() {
2260            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2261                update,
2262            ))) = &event
2263            {
2264                if update.fields.content.as_ref().is_some_and(|content| {
2265                    content
2266                        .iter()
2267                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2268                }) {
2269                    return;
2270                }
2271            }
2272        }
2273
2274        cx.background_executor
2275            .timer(Duration::from_millis(10))
2276            .await;
2277    }
2278    panic!("terminal tool did not start within the expected time");
2279}
2280
2281/// Collects events until a Stop event is received, driving the executor to completion.
2282async fn collect_events_until_stop(
2283    events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2284    cx: &mut TestAppContext,
2285) -> Vec<Result<ThreadEvent>> {
2286    let mut collected = Vec::new();
2287    let deadline = cx.executor().num_cpus() * 200;
2288
2289    for _ in 0..deadline {
2290        cx.executor().advance_clock(Duration::from_millis(10));
2291        cx.run_until_parked();
2292
2293        while let Some(Some(event)) = events.next().now_or_never() {
2294            let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2295            collected.push(event);
2296            if is_stop {
2297                return collected;
2298            }
2299        }
2300    }
2301    panic!(
2302        "did not receive Stop event within the expected time; collected {} events",
2303        collected.len()
2304    );
2305}
2306
2307#[gpui::test]
2308async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2309    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2310    always_allow_tools(cx);
2311    let fake_model = model.as_fake();
2312
2313    let environment = Rc::new(cx.update(|cx| {
2314        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2315    }));
2316    let handle = environment.terminal_handle.clone().unwrap();
2317
2318    let message_id = UserMessageId::new();
2319    let mut events = thread
2320        .update(cx, |thread, cx| {
2321            thread.add_tool(crate::TerminalTool::new(
2322                thread.project().clone(),
2323                environment,
2324            ));
2325            thread.send(message_id.clone(), ["run a command"], cx)
2326        })
2327        .unwrap();
2328
2329    cx.run_until_parked();
2330
2331    // Simulate the model calling the terminal tool
2332    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2333        LanguageModelToolUse {
2334            id: "terminal_tool_1".into(),
2335            name: TerminalTool::NAME.into(),
2336            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2337            input: json!({"command": "sleep 1000", "cd": "."}),
2338            is_input_complete: true,
2339            thought_signature: None,
2340        },
2341    ));
2342    fake_model.end_last_completion_stream();
2343
2344    // Wait for the terminal tool to start running
2345    wait_for_terminal_tool_started(&mut events, cx).await;
2346
2347    // Truncate the thread while the terminal is running
2348    thread
2349        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2350        .unwrap();
2351
2352    // Drive the executor to let cancellation complete
2353    let _ = collect_events_until_stop(&mut events, cx).await;
2354
2355    // Verify the terminal was killed
2356    assert!(
2357        handle.was_killed(),
2358        "expected terminal handle to be killed on truncate"
2359    );
2360
2361    // Verify the thread is empty after truncation
2362    thread.update(cx, |thread, _cx| {
2363        assert_eq!(
2364            thread.to_markdown(),
2365            "",
2366            "expected thread to be empty after truncating the only message"
2367        );
2368    });
2369
2370    // Verify we can send a new message after truncation
2371    verify_thread_recovery(&thread, &fake_model, cx).await;
2372}
2373
2374#[gpui::test]
2375async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2376    // Tests that cancellation properly kills all running terminal tools when multiple are active.
2377    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2378    always_allow_tools(cx);
2379    let fake_model = model.as_fake();
2380
2381    let environment = Rc::new(MultiTerminalEnvironment::new());
2382
2383    let mut events = thread
2384        .update(cx, |thread, cx| {
2385            thread.add_tool(crate::TerminalTool::new(
2386                thread.project().clone(),
2387                environment.clone(),
2388            ));
2389            thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2390        })
2391        .unwrap();
2392
2393    cx.run_until_parked();
2394
2395    // Simulate the model calling two terminal tools
2396    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2397        LanguageModelToolUse {
2398            id: "terminal_tool_1".into(),
2399            name: TerminalTool::NAME.into(),
2400            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2401            input: json!({"command": "sleep 1000", "cd": "."}),
2402            is_input_complete: true,
2403            thought_signature: None,
2404        },
2405    ));
2406    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2407        LanguageModelToolUse {
2408            id: "terminal_tool_2".into(),
2409            name: TerminalTool::NAME.into(),
2410            raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2411            input: json!({"command": "sleep 2000", "cd": "."}),
2412            is_input_complete: true,
2413            thought_signature: None,
2414        },
2415    ));
2416    fake_model.end_last_completion_stream();
2417
2418    // Wait for both terminal tools to start by counting terminal content updates
2419    let mut terminals_started = 0;
2420    let deadline = cx.executor().num_cpus() * 100;
2421    for _ in 0..deadline {
2422        cx.run_until_parked();
2423
2424        while let Some(Some(event)) = events.next().now_or_never() {
2425            if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2426                update,
2427            ))) = &event
2428            {
2429                if update.fields.content.as_ref().is_some_and(|content| {
2430                    content
2431                        .iter()
2432                        .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2433                }) {
2434                    terminals_started += 1;
2435                    if terminals_started >= 2 {
2436                        break;
2437                    }
2438                }
2439            }
2440        }
2441        if terminals_started >= 2 {
2442            break;
2443        }
2444
2445        cx.background_executor
2446            .timer(Duration::from_millis(10))
2447            .await;
2448    }
2449    assert!(
2450        terminals_started >= 2,
2451        "expected 2 terminal tools to start, got {terminals_started}"
2452    );
2453
2454    // Cancel the thread while both terminals are running
2455    thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2456
2457    // Collect remaining events
2458    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2459
2460    // Verify both terminal handles were killed
2461    let handles = environment.handles();
2462    assert_eq!(
2463        handles.len(),
2464        2,
2465        "expected 2 terminal handles to be created"
2466    );
2467    assert!(
2468        handles[0].was_killed(),
2469        "expected first terminal handle to be killed on cancellation"
2470    );
2471    assert!(
2472        handles[1].was_killed(),
2473        "expected second terminal handle to be killed on cancellation"
2474    );
2475
2476    // Verify we got a cancellation stop event
2477    assert_eq!(
2478        stop_events(remaining_events),
2479        vec![acp::StopReason::Cancelled],
2480    );
2481}
2482
2483#[gpui::test]
2484async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2485    // Tests that clicking the stop button on the terminal card (as opposed to the main
2486    // cancel button) properly reports user stopped via the was_stopped_by_user path.
2487    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2488    always_allow_tools(cx);
2489    let fake_model = model.as_fake();
2490
2491    let environment = Rc::new(cx.update(|cx| {
2492        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2493    }));
2494    let handle = environment.terminal_handle.clone().unwrap();
2495
2496    let mut events = thread
2497        .update(cx, |thread, cx| {
2498            thread.add_tool(crate::TerminalTool::new(
2499                thread.project().clone(),
2500                environment,
2501            ));
2502            thread.send(UserMessageId::new(), ["run a command"], cx)
2503        })
2504        .unwrap();
2505
2506    cx.run_until_parked();
2507
2508    // Simulate the model calling the terminal tool
2509    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2510        LanguageModelToolUse {
2511            id: "terminal_tool_1".into(),
2512            name: TerminalTool::NAME.into(),
2513            raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2514            input: json!({"command": "sleep 1000", "cd": "."}),
2515            is_input_complete: true,
2516            thought_signature: None,
2517        },
2518    ));
2519    fake_model.end_last_completion_stream();
2520
2521    // Wait for the terminal tool to start running
2522    wait_for_terminal_tool_started(&mut events, cx).await;
2523
2524    // Simulate user clicking stop on the terminal card itself.
2525    // This sets the flag and signals exit (simulating what the real UI would do).
2526    handle.set_stopped_by_user(true);
2527    handle.killed.store(true, Ordering::SeqCst);
2528    handle.signal_exit();
2529
2530    // Wait for the tool to complete
2531    cx.run_until_parked();
2532
2533    // The thread continues after tool completion - simulate the model ending its turn
2534    fake_model
2535        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2536    fake_model.end_last_completion_stream();
2537
2538    // Collect remaining events
2539    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2540
2541    // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2542    assert_eq!(
2543        stop_events(remaining_events),
2544        vec![acp::StopReason::EndTurn],
2545    );
2546
2547    // Verify the tool result indicates user stopped
2548    thread.update(cx, |thread, _cx| {
2549        let message = thread.last_received_or_pending_message().unwrap();
2550        let agent_message = message.as_agent_message().unwrap();
2551
2552        let tool_use = agent_message
2553            .content
2554            .iter()
2555            .find_map(|content| match content {
2556                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2557                _ => None,
2558            })
2559            .expect("expected tool use in agent message");
2560
2561        let tool_result = agent_message
2562            .tool_results
2563            .get(&tool_use.id)
2564            .expect("expected tool result");
2565
2566        let result_text = match &tool_result.content {
2567            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2568            _ => panic!("expected text content in tool result"),
2569        };
2570
2571        assert!(
2572            result_text.contains("The user stopped this command"),
2573            "expected tool result to indicate user stopped, got: {result_text}"
2574        );
2575    });
2576}
2577
2578#[gpui::test]
2579async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2580    // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2581    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2582    always_allow_tools(cx);
2583    let fake_model = model.as_fake();
2584
2585    let environment = Rc::new(cx.update(|cx| {
2586        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2587    }));
2588    let handle = environment.terminal_handle.clone().unwrap();
2589
2590    let mut events = thread
2591        .update(cx, |thread, cx| {
2592            thread.add_tool(crate::TerminalTool::new(
2593                thread.project().clone(),
2594                environment,
2595            ));
2596            thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2597        })
2598        .unwrap();
2599
2600    cx.run_until_parked();
2601
2602    // Simulate the model calling the terminal tool with a short timeout
2603    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2604        LanguageModelToolUse {
2605            id: "terminal_tool_1".into(),
2606            name: TerminalTool::NAME.into(),
2607            raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2608            input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2609            is_input_complete: true,
2610            thought_signature: None,
2611        },
2612    ));
2613    fake_model.end_last_completion_stream();
2614
2615    // Wait for the terminal tool to start running
2616    wait_for_terminal_tool_started(&mut events, cx).await;
2617
2618    // Advance clock past the timeout
2619    cx.executor().advance_clock(Duration::from_millis(200));
2620    cx.run_until_parked();
2621
2622    // The thread continues after tool completion - simulate the model ending its turn
2623    fake_model
2624        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2625    fake_model.end_last_completion_stream();
2626
2627    // Collect remaining events
2628    let remaining_events = collect_events_until_stop(&mut events, cx).await;
2629
2630    // Verify the terminal was killed due to timeout
2631    assert!(
2632        handle.was_killed(),
2633        "expected terminal handle to be killed on timeout"
2634    );
2635
2636    // Verify we got an EndTurn (the tool completed, just with timeout)
2637    assert_eq!(
2638        stop_events(remaining_events),
2639        vec![acp::StopReason::EndTurn],
2640    );
2641
2642    // Verify the tool result indicates timeout, not user stopped
2643    thread.update(cx, |thread, _cx| {
2644        let message = thread.last_received_or_pending_message().unwrap();
2645        let agent_message = message.as_agent_message().unwrap();
2646
2647        let tool_use = agent_message
2648            .content
2649            .iter()
2650            .find_map(|content| match content {
2651                AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2652                _ => None,
2653            })
2654            .expect("expected tool use in agent message");
2655
2656        let tool_result = agent_message
2657            .tool_results
2658            .get(&tool_use.id)
2659            .expect("expected tool result");
2660
2661        let result_text = match &tool_result.content {
2662            language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2663            _ => panic!("expected text content in tool result"),
2664        };
2665
2666        assert!(
2667            result_text.contains("timed out"),
2668            "expected tool result to indicate timeout, got: {result_text}"
2669        );
2670        assert!(
2671            !result_text.contains("The user stopped"),
2672            "tool result should not mention user stopped when it timed out, got: {result_text}"
2673        );
2674    });
2675}
2676
2677#[gpui::test]
2678async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2679    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2680    let fake_model = model.as_fake();
2681
2682    let events_1 = thread
2683        .update(cx, |thread, cx| {
2684            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2685        })
2686        .unwrap();
2687    cx.run_until_parked();
2688    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2689    cx.run_until_parked();
2690
2691    let events_2 = thread
2692        .update(cx, |thread, cx| {
2693            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2694        })
2695        .unwrap();
2696    cx.run_until_parked();
2697    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2698    fake_model
2699        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2700    fake_model.end_last_completion_stream();
2701
2702    let events_1 = events_1.collect::<Vec<_>>().await;
2703    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2704    let events_2 = events_2.collect::<Vec<_>>().await;
2705    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2706}
2707
2708#[gpui::test]
2709async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2710    // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2711    // the retry loop waits on a timer. If the user switches models and sends a new message
2712    // during that delay, the old turn should exit immediately instead of retrying with the
2713    // stale model.
2714    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2715    let model_a = model.as_fake();
2716
2717    // Start a turn with model_a.
2718    let events_1 = thread
2719        .update(cx, |thread, cx| {
2720            thread.send(UserMessageId::new(), ["Hello"], cx)
2721        })
2722        .unwrap();
2723    cx.run_until_parked();
2724    assert_eq!(model_a.completion_count(), 1);
2725
2726    // Model returns a retryable upstream 500. The turn enters the retry delay.
2727    model_a.send_last_completion_stream_error(
2728        LanguageModelCompletionError::UpstreamProviderError {
2729            message: "Internal server error".to_string(),
2730            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2731            retry_after: None,
2732        },
2733    );
2734    model_a.end_last_completion_stream();
2735    cx.run_until_parked();
2736
2737    // The old completion was consumed; model_a has no pending requests yet because the
2738    // retry timer hasn't fired.
2739    assert_eq!(model_a.completion_count(), 0);
2740
2741    // Switch to model_b and send a new message. This cancels the old turn.
2742    let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2743        "fake", "model-b", "Model B", false,
2744    ));
2745    thread.update(cx, |thread, cx| {
2746        thread.set_model(model_b.clone(), cx);
2747    });
2748    let events_2 = thread
2749        .update(cx, |thread, cx| {
2750            thread.send(UserMessageId::new(), ["Continue"], cx)
2751        })
2752        .unwrap();
2753    cx.run_until_parked();
2754
2755    // model_b should have received its completion request.
2756    assert_eq!(model_b.as_fake().completion_count(), 1);
2757
2758    // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2759    cx.executor().advance_clock(Duration::from_secs(10));
2760    cx.run_until_parked();
2761
2762    // model_a must NOT have received another completion request — the cancelled turn
2763    // should have exited during the retry delay rather than retrying with the old model.
2764    assert_eq!(
2765        model_a.completion_count(),
2766        0,
2767        "old model should not receive a retry request after cancellation"
2768    );
2769
2770    // Complete model_b's turn.
2771    model_b
2772        .as_fake()
2773        .send_last_completion_stream_text_chunk("Done!");
2774    model_b
2775        .as_fake()
2776        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2777    model_b.as_fake().end_last_completion_stream();
2778
2779    let events_1 = events_1.collect::<Vec<_>>().await;
2780    assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2781
2782    let events_2 = events_2.collect::<Vec<_>>().await;
2783    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2784}
2785
2786#[gpui::test]
2787async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2788    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2789    let fake_model = model.as_fake();
2790
2791    let events_1 = thread
2792        .update(cx, |thread, cx| {
2793            thread.send(UserMessageId::new(), ["Hello 1"], cx)
2794        })
2795        .unwrap();
2796    cx.run_until_parked();
2797    fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2798    fake_model
2799        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2800    fake_model.end_last_completion_stream();
2801    let events_1 = events_1.collect::<Vec<_>>().await;
2802
2803    let events_2 = thread
2804        .update(cx, |thread, cx| {
2805            thread.send(UserMessageId::new(), ["Hello 2"], cx)
2806        })
2807        .unwrap();
2808    cx.run_until_parked();
2809    fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2810    fake_model
2811        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2812    fake_model.end_last_completion_stream();
2813    let events_2 = events_2.collect::<Vec<_>>().await;
2814
2815    assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2816    assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2817}
2818
2819#[gpui::test]
2820async fn test_refusal(cx: &mut TestAppContext) {
2821    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2822    let fake_model = model.as_fake();
2823
2824    let events = thread
2825        .update(cx, |thread, cx| {
2826            thread.send(UserMessageId::new(), ["Hello"], cx)
2827        })
2828        .unwrap();
2829    cx.run_until_parked();
2830    thread.read_with(cx, |thread, _| {
2831        assert_eq!(
2832            thread.to_markdown(),
2833            indoc! {"
2834                ## User
2835
2836                Hello
2837            "}
2838        );
2839    });
2840
2841    fake_model.send_last_completion_stream_text_chunk("Hey!");
2842    cx.run_until_parked();
2843    thread.read_with(cx, |thread, _| {
2844        assert_eq!(
2845            thread.to_markdown(),
2846            indoc! {"
2847                ## User
2848
2849                Hello
2850
2851                ## Assistant
2852
2853                Hey!
2854            "}
2855        );
2856    });
2857
2858    // If the model refuses to continue, the thread should remove all the messages after the last user message.
2859    fake_model
2860        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2861    let events = events.collect::<Vec<_>>().await;
2862    assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2863    thread.read_with(cx, |thread, _| {
2864        assert_eq!(thread.to_markdown(), "");
2865    });
2866}
2867
2868#[gpui::test]
2869async fn test_truncate_first_message(cx: &mut TestAppContext) {
2870    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2871    let fake_model = model.as_fake();
2872
2873    let message_id = UserMessageId::new();
2874    thread
2875        .update(cx, |thread, cx| {
2876            thread.send(message_id.clone(), ["Hello"], cx)
2877        })
2878        .unwrap();
2879    cx.run_until_parked();
2880    thread.read_with(cx, |thread, _| {
2881        assert_eq!(
2882            thread.to_markdown(),
2883            indoc! {"
2884                ## User
2885
2886                Hello
2887            "}
2888        );
2889        assert_eq!(thread.latest_token_usage(), None);
2890    });
2891
2892    fake_model.send_last_completion_stream_text_chunk("Hey!");
2893    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2894        language_model::TokenUsage {
2895            input_tokens: 32_000,
2896            output_tokens: 16_000,
2897            cache_creation_input_tokens: 0,
2898            cache_read_input_tokens: 0,
2899        },
2900    ));
2901    cx.run_until_parked();
2902    thread.read_with(cx, |thread, _| {
2903        assert_eq!(
2904            thread.to_markdown(),
2905            indoc! {"
2906                ## User
2907
2908                Hello
2909
2910                ## Assistant
2911
2912                Hey!
2913            "}
2914        );
2915        assert_eq!(
2916            thread.latest_token_usage(),
2917            Some(acp_thread::TokenUsage {
2918                used_tokens: 32_000 + 16_000,
2919                max_tokens: 1_000_000,
2920                max_output_tokens: None,
2921                input_tokens: 32_000,
2922                output_tokens: 16_000,
2923            })
2924        );
2925    });
2926
2927    thread
2928        .update(cx, |thread, cx| thread.truncate(message_id, cx))
2929        .unwrap();
2930    cx.run_until_parked();
2931    thread.read_with(cx, |thread, _| {
2932        assert_eq!(thread.to_markdown(), "");
2933        assert_eq!(thread.latest_token_usage(), None);
2934    });
2935
2936    // Ensure we can still send a new message after truncation.
2937    thread
2938        .update(cx, |thread, cx| {
2939            thread.send(UserMessageId::new(), ["Hi"], cx)
2940        })
2941        .unwrap();
2942    thread.update(cx, |thread, _cx| {
2943        assert_eq!(
2944            thread.to_markdown(),
2945            indoc! {"
2946                ## User
2947
2948                Hi
2949            "}
2950        );
2951    });
2952    cx.run_until_parked();
2953    fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2954    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2955        language_model::TokenUsage {
2956            input_tokens: 40_000,
2957            output_tokens: 20_000,
2958            cache_creation_input_tokens: 0,
2959            cache_read_input_tokens: 0,
2960        },
2961    ));
2962    cx.run_until_parked();
2963    thread.read_with(cx, |thread, _| {
2964        assert_eq!(
2965            thread.to_markdown(),
2966            indoc! {"
2967                ## User
2968
2969                Hi
2970
2971                ## Assistant
2972
2973                Ahoy!
2974            "}
2975        );
2976
2977        assert_eq!(
2978            thread.latest_token_usage(),
2979            Some(acp_thread::TokenUsage {
2980                used_tokens: 40_000 + 20_000,
2981                max_tokens: 1_000_000,
2982                max_output_tokens: None,
2983                input_tokens: 40_000,
2984                output_tokens: 20_000,
2985            })
2986        );
2987    });
2988}
2989
2990#[gpui::test]
2991async fn test_truncate_second_message(cx: &mut TestAppContext) {
2992    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2993    let fake_model = model.as_fake();
2994
2995    thread
2996        .update(cx, |thread, cx| {
2997            thread.send(UserMessageId::new(), ["Message 1"], cx)
2998        })
2999        .unwrap();
3000    cx.run_until_parked();
3001    fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3002    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3003        language_model::TokenUsage {
3004            input_tokens: 32_000,
3005            output_tokens: 16_000,
3006            cache_creation_input_tokens: 0,
3007            cache_read_input_tokens: 0,
3008        },
3009    ));
3010    fake_model.end_last_completion_stream();
3011    cx.run_until_parked();
3012
3013    let assert_first_message_state = |cx: &mut TestAppContext| {
3014        thread.clone().read_with(cx, |thread, _| {
3015            assert_eq!(
3016                thread.to_markdown(),
3017                indoc! {"
3018                    ## User
3019
3020                    Message 1
3021
3022                    ## Assistant
3023
3024                    Message 1 response
3025                "}
3026            );
3027
3028            assert_eq!(
3029                thread.latest_token_usage(),
3030                Some(acp_thread::TokenUsage {
3031                    used_tokens: 32_000 + 16_000,
3032                    max_tokens: 1_000_000,
3033                    max_output_tokens: None,
3034                    input_tokens: 32_000,
3035                    output_tokens: 16_000,
3036                })
3037            );
3038        });
3039    };
3040
3041    assert_first_message_state(cx);
3042
3043    let second_message_id = UserMessageId::new();
3044    thread
3045        .update(cx, |thread, cx| {
3046            thread.send(second_message_id.clone(), ["Message 2"], cx)
3047        })
3048        .unwrap();
3049    cx.run_until_parked();
3050
3051    fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3052    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3053        language_model::TokenUsage {
3054            input_tokens: 40_000,
3055            output_tokens: 20_000,
3056            cache_creation_input_tokens: 0,
3057            cache_read_input_tokens: 0,
3058        },
3059    ));
3060    fake_model.end_last_completion_stream();
3061    cx.run_until_parked();
3062
3063    thread.read_with(cx, |thread, _| {
3064        assert_eq!(
3065            thread.to_markdown(),
3066            indoc! {"
3067                ## User
3068
3069                Message 1
3070
3071                ## Assistant
3072
3073                Message 1 response
3074
3075                ## User
3076
3077                Message 2
3078
3079                ## Assistant
3080
3081                Message 2 response
3082            "}
3083        );
3084
3085        assert_eq!(
3086            thread.latest_token_usage(),
3087            Some(acp_thread::TokenUsage {
3088                used_tokens: 40_000 + 20_000,
3089                max_tokens: 1_000_000,
3090                max_output_tokens: None,
3091                input_tokens: 40_000,
3092                output_tokens: 20_000,
3093            })
3094        );
3095    });
3096
3097    thread
3098        .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3099        .unwrap();
3100    cx.run_until_parked();
3101
3102    assert_first_message_state(cx);
3103}
3104
3105#[gpui::test]
3106async fn test_title_generation(cx: &mut TestAppContext) {
3107    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3108    let fake_model = model.as_fake();
3109
3110    let summary_model = Arc::new(FakeLanguageModel::default());
3111    thread.update(cx, |thread, cx| {
3112        thread.set_summarization_model(Some(summary_model.clone()), cx)
3113    });
3114
3115    let send = thread
3116        .update(cx, |thread, cx| {
3117            thread.send(UserMessageId::new(), ["Hello"], cx)
3118        })
3119        .unwrap();
3120    cx.run_until_parked();
3121
3122    fake_model.send_last_completion_stream_text_chunk("Hey!");
3123    fake_model.end_last_completion_stream();
3124    cx.run_until_parked();
3125    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "New Thread"));
3126
3127    // Ensure the summary model has been invoked to generate a title.
3128    summary_model.send_last_completion_stream_text_chunk("Hello ");
3129    summary_model.send_last_completion_stream_text_chunk("world\nG");
3130    summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3131    summary_model.end_last_completion_stream();
3132    send.collect::<Vec<_>>().await;
3133    cx.run_until_parked();
3134    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3135
3136    // Send another message, ensuring no title is generated this time.
3137    let send = thread
3138        .update(cx, |thread, cx| {
3139            thread.send(UserMessageId::new(), ["Hello again"], cx)
3140        })
3141        .unwrap();
3142    cx.run_until_parked();
3143    fake_model.send_last_completion_stream_text_chunk("Hey again!");
3144    fake_model.end_last_completion_stream();
3145    cx.run_until_parked();
3146    assert_eq!(summary_model.pending_completions(), Vec::new());
3147    send.collect::<Vec<_>>().await;
3148    thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3149}
3150
3151#[gpui::test]
3152async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3153    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3154    let fake_model = model.as_fake();
3155
3156    let _events = thread
3157        .update(cx, |thread, cx| {
3158            thread.add_tool(ToolRequiringPermission);
3159            thread.add_tool(EchoTool);
3160            thread.send(UserMessageId::new(), ["Hey!"], cx)
3161        })
3162        .unwrap();
3163    cx.run_until_parked();
3164
3165    let permission_tool_use = LanguageModelToolUse {
3166        id: "tool_id_1".into(),
3167        name: ToolRequiringPermission::NAME.into(),
3168        raw_input: "{}".into(),
3169        input: json!({}),
3170        is_input_complete: true,
3171        thought_signature: None,
3172    };
3173    let echo_tool_use = LanguageModelToolUse {
3174        id: "tool_id_2".into(),
3175        name: EchoTool::NAME.into(),
3176        raw_input: json!({"text": "test"}).to_string(),
3177        input: json!({"text": "test"}),
3178        is_input_complete: true,
3179        thought_signature: None,
3180    };
3181    fake_model.send_last_completion_stream_text_chunk("Hi!");
3182    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3183        permission_tool_use,
3184    ));
3185    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3186        echo_tool_use.clone(),
3187    ));
3188    fake_model.end_last_completion_stream();
3189    cx.run_until_parked();
3190
3191    // Ensure pending tools are skipped when building a request.
3192    let request = thread
3193        .read_with(cx, |thread, cx| {
3194            thread.build_completion_request(CompletionIntent::EditFile, cx)
3195        })
3196        .unwrap();
3197    assert_eq!(
3198        request.messages[1..],
3199        vec![
3200            LanguageModelRequestMessage {
3201                role: Role::User,
3202                content: vec!["Hey!".into()],
3203                cache: true,
3204                reasoning_details: None,
3205            },
3206            LanguageModelRequestMessage {
3207                role: Role::Assistant,
3208                content: vec![
3209                    MessageContent::Text("Hi!".into()),
3210                    MessageContent::ToolUse(echo_tool_use.clone())
3211                ],
3212                cache: false,
3213                reasoning_details: None,
3214            },
3215            LanguageModelRequestMessage {
3216                role: Role::User,
3217                content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3218                    tool_use_id: echo_tool_use.id.clone(),
3219                    tool_name: echo_tool_use.name,
3220                    is_error: false,
3221                    content: "test".into(),
3222                    output: Some("test".into())
3223                })],
3224                cache: false,
3225                reasoning_details: None,
3226            },
3227        ],
3228    );
3229}
3230
3231#[gpui::test]
3232async fn test_agent_connection(cx: &mut TestAppContext) {
3233    cx.update(settings::init);
3234    let templates = Templates::new();
3235
3236    // Initialize language model system with test provider
3237    cx.update(|cx| {
3238        gpui_tokio::init(cx);
3239
3240        let http_client = FakeHttpClient::with_404_response();
3241        let clock = Arc::new(clock::FakeSystemClock::new());
3242        let client = Client::new(clock, http_client, cx);
3243        let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3244        language_model::init(user_store.clone(), client.clone(), cx);
3245        language_models::init(user_store, client.clone(), cx);
3246        LanguageModelRegistry::test(cx);
3247    });
3248    cx.executor().forbid_parking();
3249
3250    // Create a project for new_thread
3251    let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3252    fake_fs.insert_tree(path!("/test"), json!({})).await;
3253    let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3254    let cwd = PathList::new(&[Path::new("/test")]);
3255    let thread_store = cx.new(|cx| ThreadStore::new(cx));
3256
3257    // Create agent and connection
3258    let agent = cx
3259        .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3260    let connection = NativeAgentConnection(agent.clone());
3261
3262    // Create a thread using new_thread
3263    let connection_rc = Rc::new(connection.clone());
3264    let acp_thread = cx
3265        .update(|cx| connection_rc.new_session(project, cwd, cx))
3266        .await
3267        .expect("new_thread should succeed");
3268
3269    // Get the session_id from the AcpThread
3270    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3271
3272    // Test model_selector returns Some
3273    let selector_opt = connection.model_selector(&session_id);
3274    assert!(
3275        selector_opt.is_some(),
3276        "agent should always support ModelSelector"
3277    );
3278    let selector = selector_opt.unwrap();
3279
3280    // Test list_models
3281    let listed_models = cx
3282        .update(|cx| selector.list_models(cx))
3283        .await
3284        .expect("list_models should succeed");
3285    let AgentModelList::Grouped(listed_models) = listed_models else {
3286        panic!("Unexpected model list type");
3287    };
3288    assert!(!listed_models.is_empty(), "should have at least one model");
3289    assert_eq!(
3290        listed_models[&AgentModelGroupName("Fake".into())][0]
3291            .id
3292            .0
3293            .as_ref(),
3294        "fake/fake"
3295    );
3296
3297    // Test selected_model returns the default
3298    let model = cx
3299        .update(|cx| selector.selected_model(cx))
3300        .await
3301        .expect("selected_model should succeed");
3302    let model = cx
3303        .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3304        .unwrap();
3305    let model = model.as_fake();
3306    assert_eq!(model.id().0, "fake", "should return default model");
3307
3308    let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3309    cx.run_until_parked();
3310    model.send_last_completion_stream_text_chunk("def");
3311    cx.run_until_parked();
3312    acp_thread.read_with(cx, |thread, cx| {
3313        assert_eq!(
3314            thread.to_markdown(cx),
3315            indoc! {"
3316                ## User
3317
3318                abc
3319
3320                ## Assistant
3321
3322                def
3323
3324            "}
3325        )
3326    });
3327
3328    // Test cancel
3329    cx.update(|cx| connection.cancel(&session_id, cx));
3330    request.await.expect("prompt should fail gracefully");
3331
3332    // Explicitly close the session and drop the ACP thread.
3333    cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3334        .await
3335        .unwrap();
3336    drop(acp_thread);
3337    let result = cx
3338        .update(|cx| {
3339            connection.prompt(
3340                Some(acp_thread::UserMessageId::new()),
3341                acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3342                cx,
3343            )
3344        })
3345        .await;
3346    assert_eq!(
3347        result.as_ref().unwrap_err().to_string(),
3348        "Session not found",
3349        "unexpected result: {:?}",
3350        result
3351    );
3352}
3353
3354#[gpui::test]
3355async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3356    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3357    thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3358    let fake_model = model.as_fake();
3359
3360    let mut events = thread
3361        .update(cx, |thread, cx| {
3362            thread.send(UserMessageId::new(), ["Echo something"], cx)
3363        })
3364        .unwrap();
3365    cx.run_until_parked();
3366
3367    // Simulate streaming partial input.
3368    let input = json!({});
3369    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3370        LanguageModelToolUse {
3371            id: "1".into(),
3372            name: EchoTool::NAME.into(),
3373            raw_input: input.to_string(),
3374            input,
3375            is_input_complete: false,
3376            thought_signature: None,
3377        },
3378    ));
3379
3380    // Input streaming completed
3381    let input = json!({ "text": "Hello!" });
3382    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3383        LanguageModelToolUse {
3384            id: "1".into(),
3385            name: "echo".into(),
3386            raw_input: input.to_string(),
3387            input,
3388            is_input_complete: true,
3389            thought_signature: None,
3390        },
3391    ));
3392    fake_model.end_last_completion_stream();
3393    cx.run_until_parked();
3394
3395    let tool_call = expect_tool_call(&mut events).await;
3396    assert_eq!(
3397        tool_call,
3398        acp::ToolCall::new("1", "Echo")
3399            .raw_input(json!({}))
3400            .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3401    );
3402    let update = expect_tool_call_update_fields(&mut events).await;
3403    assert_eq!(
3404        update,
3405        acp::ToolCallUpdate::new(
3406            "1",
3407            acp::ToolCallUpdateFields::new()
3408                .title("Echo")
3409                .kind(acp::ToolKind::Other)
3410                .raw_input(json!({ "text": "Hello!"}))
3411        )
3412    );
3413    let update = expect_tool_call_update_fields(&mut events).await;
3414    assert_eq!(
3415        update,
3416        acp::ToolCallUpdate::new(
3417            "1",
3418            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3419        )
3420    );
3421    let update = expect_tool_call_update_fields(&mut events).await;
3422    assert_eq!(
3423        update,
3424        acp::ToolCallUpdate::new(
3425            "1",
3426            acp::ToolCallUpdateFields::new()
3427                .status(acp::ToolCallStatus::Completed)
3428                .raw_output("Hello!")
3429        )
3430    );
3431}
3432
3433#[gpui::test]
3434async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3435    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3436    thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3437    let fake_model = model.as_fake();
3438
3439    let mut events = thread
3440        .update(cx, |thread, cx| {
3441            thread.send(UserMessageId::new(), ["Make a plan"], cx)
3442        })
3443        .unwrap();
3444    cx.run_until_parked();
3445
3446    let input = json!({
3447        "plan": [
3448            {
3449                "step": "Inspect the code",
3450                "status": "completed",
3451                "priority": "high"
3452            },
3453            {
3454                "step": "Implement the tool",
3455                "status": "in_progress"
3456            },
3457            {
3458                "step": "Run tests",
3459                "status": "pending",
3460                "priority": "low"
3461            }
3462        ]
3463    });
3464    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3465        LanguageModelToolUse {
3466            id: "plan_1".into(),
3467            name: UpdatePlanTool::NAME.into(),
3468            raw_input: input.to_string(),
3469            input,
3470            is_input_complete: true,
3471            thought_signature: None,
3472        },
3473    ));
3474    fake_model.end_last_completion_stream();
3475    cx.run_until_parked();
3476
3477    let tool_call = expect_tool_call(&mut events).await;
3478    assert_eq!(
3479        tool_call,
3480        acp::ToolCall::new("plan_1", "Update plan")
3481            .kind(acp::ToolKind::Think)
3482            .raw_input(json!({
3483                "plan": [
3484                    {
3485                        "step": "Inspect the code",
3486                        "status": "completed",
3487                        "priority": "high"
3488                    },
3489                    {
3490                        "step": "Implement the tool",
3491                        "status": "in_progress"
3492                    },
3493                    {
3494                        "step": "Run tests",
3495                        "status": "pending",
3496                        "priority": "low"
3497                    }
3498                ]
3499            }))
3500            .meta(acp::Meta::from_iter([(
3501                "tool_name".into(),
3502                "update_plan".into()
3503            )]))
3504    );
3505
3506    let update = expect_tool_call_update_fields(&mut events).await;
3507    assert_eq!(
3508        update,
3509        acp::ToolCallUpdate::new(
3510            "plan_1",
3511            acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3512        )
3513    );
3514
3515    let plan = expect_plan(&mut events).await;
3516    assert_eq!(
3517        plan,
3518        acp::Plan::new(vec![
3519            acp::PlanEntry::new(
3520                "Inspect the code",
3521                acp::PlanEntryPriority::High,
3522                acp::PlanEntryStatus::Completed,
3523            ),
3524            acp::PlanEntry::new(
3525                "Implement the tool",
3526                acp::PlanEntryPriority::Medium,
3527                acp::PlanEntryStatus::InProgress,
3528            ),
3529            acp::PlanEntry::new(
3530                "Run tests",
3531                acp::PlanEntryPriority::Low,
3532                acp::PlanEntryStatus::Pending,
3533            ),
3534        ])
3535    );
3536
3537    let update = expect_tool_call_update_fields(&mut events).await;
3538    assert_eq!(
3539        update,
3540        acp::ToolCallUpdate::new(
3541            "plan_1",
3542            acp::ToolCallUpdateFields::new()
3543                .status(acp::ToolCallStatus::Completed)
3544                .raw_output("Plan updated")
3545        )
3546    );
3547}
3548
3549#[gpui::test]
3550async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3551    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3552    let fake_model = model.as_fake();
3553
3554    let mut events = thread
3555        .update(cx, |thread, cx| {
3556            thread.send(UserMessageId::new(), ["Hello!"], cx)
3557        })
3558        .unwrap();
3559    cx.run_until_parked();
3560
3561    fake_model.send_last_completion_stream_text_chunk("Hey!");
3562    fake_model.end_last_completion_stream();
3563
3564    let mut retry_events = Vec::new();
3565    while let Some(Ok(event)) = events.next().await {
3566        match event {
3567            ThreadEvent::Retry(retry_status) => {
3568                retry_events.push(retry_status);
3569            }
3570            ThreadEvent::Stop(..) => break,
3571            _ => {}
3572        }
3573    }
3574
3575    assert_eq!(retry_events.len(), 0);
3576    thread.read_with(cx, |thread, _cx| {
3577        assert_eq!(
3578            thread.to_markdown(),
3579            indoc! {"
3580                ## User
3581
3582                Hello!
3583
3584                ## Assistant
3585
3586                Hey!
3587            "}
3588        )
3589    });
3590}
3591
3592#[gpui::test]
3593async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3594    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3595    let fake_model = model.as_fake();
3596
3597    let mut events = thread
3598        .update(cx, |thread, cx| {
3599            thread.send(UserMessageId::new(), ["Hello!"], cx)
3600        })
3601        .unwrap();
3602    cx.run_until_parked();
3603
3604    fake_model.send_last_completion_stream_text_chunk("Hey,");
3605    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3606        provider: LanguageModelProviderName::new("Anthropic"),
3607        retry_after: Some(Duration::from_secs(3)),
3608    });
3609    fake_model.end_last_completion_stream();
3610
3611    cx.executor().advance_clock(Duration::from_secs(3));
3612    cx.run_until_parked();
3613
3614    fake_model.send_last_completion_stream_text_chunk("there!");
3615    fake_model.end_last_completion_stream();
3616    cx.run_until_parked();
3617
3618    let mut retry_events = Vec::new();
3619    while let Some(Ok(event)) = events.next().await {
3620        match event {
3621            ThreadEvent::Retry(retry_status) => {
3622                retry_events.push(retry_status);
3623            }
3624            ThreadEvent::Stop(..) => break,
3625            _ => {}
3626        }
3627    }
3628
3629    assert_eq!(retry_events.len(), 1);
3630    assert!(matches!(
3631        retry_events[0],
3632        acp_thread::RetryStatus { attempt: 1, .. }
3633    ));
3634    thread.read_with(cx, |thread, _cx| {
3635        assert_eq!(
3636            thread.to_markdown(),
3637            indoc! {"
3638                ## User
3639
3640                Hello!
3641
3642                ## Assistant
3643
3644                Hey,
3645
3646                [resume]
3647
3648                ## Assistant
3649
3650                there!
3651            "}
3652        )
3653    });
3654}
3655
3656#[gpui::test]
3657async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3658    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3659    let fake_model = model.as_fake();
3660
3661    let events = thread
3662        .update(cx, |thread, cx| {
3663            thread.add_tool(EchoTool);
3664            thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3665        })
3666        .unwrap();
3667    cx.run_until_parked();
3668
3669    let tool_use_1 = LanguageModelToolUse {
3670        id: "tool_1".into(),
3671        name: EchoTool::NAME.into(),
3672        raw_input: json!({"text": "test"}).to_string(),
3673        input: json!({"text": "test"}),
3674        is_input_complete: true,
3675        thought_signature: None,
3676    };
3677    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3678        tool_use_1.clone(),
3679    ));
3680    fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3681        provider: LanguageModelProviderName::new("Anthropic"),
3682        retry_after: Some(Duration::from_secs(3)),
3683    });
3684    fake_model.end_last_completion_stream();
3685
3686    cx.executor().advance_clock(Duration::from_secs(3));
3687    let completion = fake_model.pending_completions().pop().unwrap();
3688    assert_eq!(
3689        completion.messages[1..],
3690        vec![
3691            LanguageModelRequestMessage {
3692                role: Role::User,
3693                content: vec!["Call the echo tool!".into()],
3694                cache: false,
3695                reasoning_details: None,
3696            },
3697            LanguageModelRequestMessage {
3698                role: Role::Assistant,
3699                content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3700                cache: false,
3701                reasoning_details: None,
3702            },
3703            LanguageModelRequestMessage {
3704                role: Role::User,
3705                content: vec![language_model::MessageContent::ToolResult(
3706                    LanguageModelToolResult {
3707                        tool_use_id: tool_use_1.id.clone(),
3708                        tool_name: tool_use_1.name.clone(),
3709                        is_error: false,
3710                        content: "test".into(),
3711                        output: Some("test".into())
3712                    }
3713                )],
3714                cache: true,
3715                reasoning_details: None,
3716            },
3717        ]
3718    );
3719
3720    fake_model.send_last_completion_stream_text_chunk("Done");
3721    fake_model.end_last_completion_stream();
3722    cx.run_until_parked();
3723    events.collect::<Vec<_>>().await;
3724    thread.read_with(cx, |thread, _cx| {
3725        assert_eq!(
3726            thread.last_received_or_pending_message(),
3727            Some(Message::Agent(AgentMessage {
3728                content: vec![AgentMessageContent::Text("Done".into())],
3729                tool_results: IndexMap::default(),
3730                reasoning_details: None,
3731            }))
3732        );
3733    })
3734}
3735
3736#[gpui::test]
3737async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3738    let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3739    let fake_model = model.as_fake();
3740
3741    let mut events = thread
3742        .update(cx, |thread, cx| {
3743            thread.send(UserMessageId::new(), ["Hello!"], cx)
3744        })
3745        .unwrap();
3746    cx.run_until_parked();
3747
3748    for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3749        fake_model.send_last_completion_stream_error(
3750            LanguageModelCompletionError::ServerOverloaded {
3751                provider: LanguageModelProviderName::new("Anthropic"),
3752                retry_after: Some(Duration::from_secs(3)),
3753            },
3754        );
3755        fake_model.end_last_completion_stream();
3756        cx.executor().advance_clock(Duration::from_secs(3));
3757        cx.run_until_parked();
3758    }
3759
3760    let mut errors = Vec::new();
3761    let mut retry_events = Vec::new();
3762    while let Some(event) = events.next().await {
3763        match event {
3764            Ok(ThreadEvent::Retry(retry_status)) => {
3765                retry_events.push(retry_status);
3766            }
3767            Ok(ThreadEvent::Stop(..)) => break,
3768            Err(error) => errors.push(error),
3769            _ => {}
3770        }
3771    }
3772
3773    assert_eq!(
3774        retry_events.len(),
3775        crate::thread::MAX_RETRY_ATTEMPTS as usize
3776    );
3777    for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3778        assert_eq!(retry_events[i].attempt, i + 1);
3779    }
3780    assert_eq!(errors.len(), 1);
3781    let error = errors[0]
3782        .downcast_ref::<LanguageModelCompletionError>()
3783        .unwrap();
3784    assert!(matches!(
3785        error,
3786        LanguageModelCompletionError::ServerOverloaded { .. }
3787    ));
3788}
3789
3790#[gpui::test]
3791async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3792    cx: &mut TestAppContext,
3793) {
3794    init_test(cx);
3795    always_allow_tools(cx);
3796
3797    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3798    let fake_model = model.as_fake();
3799
3800    thread.update(cx, |thread, _cx| {
3801        thread.add_tool(StreamingEchoTool::new());
3802    });
3803
3804    let _events = thread
3805        .update(cx, |thread, cx| {
3806            thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3807        })
3808        .unwrap();
3809    cx.run_until_parked();
3810
3811    // Send a partial tool use (is_input_complete = false), simulating the LLM
3812    // streaming input for a tool.
3813    let tool_use = LanguageModelToolUse {
3814        id: "tool_1".into(),
3815        name: "streaming_echo".into(),
3816        raw_input: r#"{"text": "partial"}"#.into(),
3817        input: json!({"text": "partial"}),
3818        is_input_complete: false,
3819        thought_signature: None,
3820    };
3821    fake_model
3822        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3823    cx.run_until_parked();
3824
3825    // Send a stream error WITHOUT ever sending is_input_complete = true.
3826    // Before the fix, this would deadlock: the tool waits for more partials
3827    // (or cancellation), run_turn_internal waits for the tool, and the sender
3828    // keeping the channel open lives inside RunningTurn.
3829    fake_model.send_last_completion_stream_error(
3830        LanguageModelCompletionError::UpstreamProviderError {
3831            message: "Internal server error".to_string(),
3832            status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3833            retry_after: None,
3834        },
3835    );
3836    fake_model.end_last_completion_stream();
3837
3838    // Advance past the retry delay so run_turn_internal retries.
3839    cx.executor().advance_clock(Duration::from_secs(5));
3840    cx.run_until_parked();
3841
3842    // The retry request should contain the streaming tool's error result,
3843    // proving the tool terminated and its result was forwarded.
3844    let completion = fake_model
3845        .pending_completions()
3846        .pop()
3847        .expect("No running turn");
3848    assert_eq!(
3849        completion.messages[1..],
3850        vec![
3851            LanguageModelRequestMessage {
3852                role: Role::User,
3853                content: vec!["Use the streaming_echo tool".into()],
3854                cache: false,
3855                reasoning_details: None,
3856            },
3857            LanguageModelRequestMessage {
3858                role: Role::Assistant,
3859                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3860                cache: false,
3861                reasoning_details: None,
3862            },
3863            LanguageModelRequestMessage {
3864                role: Role::User,
3865                content: vec![language_model::MessageContent::ToolResult(
3866                    LanguageModelToolResult {
3867                        tool_use_id: tool_use.id.clone(),
3868                        tool_name: tool_use.name,
3869                        is_error: true,
3870                        content: "Failed to receive tool input: tool input was not fully received"
3871                            .into(),
3872                        output: Some(
3873                            "Failed to receive tool input: tool input was not fully received"
3874                                .into()
3875                        ),
3876                    }
3877                )],
3878                cache: true,
3879                reasoning_details: None,
3880            },
3881        ]
3882    );
3883
3884    // Finish the retry round so the turn completes cleanly.
3885    fake_model.send_last_completion_stream_text_chunk("Done");
3886    fake_model.end_last_completion_stream();
3887    cx.run_until_parked();
3888
3889    thread.read_with(cx, |thread, _cx| {
3890        assert!(
3891            thread.is_turn_complete(),
3892            "Thread should not be stuck; the turn should have completed",
3893        );
3894    });
3895}
3896
3897/// Filters out the stop events for asserting against in tests
3898fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3899    result_events
3900        .into_iter()
3901        .filter_map(|event| match event.unwrap() {
3902            ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3903            _ => None,
3904        })
3905        .collect()
3906}
3907
3908struct ThreadTest {
3909    model: Arc<dyn LanguageModel>,
3910    thread: Entity<Thread>,
3911    project_context: Entity<ProjectContext>,
3912    context_server_store: Entity<ContextServerStore>,
3913    fs: Arc<FakeFs>,
3914}
3915
3916enum TestModel {
3917    Sonnet4,
3918    Fake,
3919}
3920
3921impl TestModel {
3922    fn id(&self) -> LanguageModelId {
3923        match self {
3924            TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3925            TestModel::Fake => unreachable!(),
3926        }
3927    }
3928}
3929
3930async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3931    cx.executor().allow_parking();
3932
3933    let fs = FakeFs::new(cx.background_executor.clone());
3934    fs.create_dir(paths::settings_file().parent().unwrap())
3935        .await
3936        .unwrap();
3937    fs.insert_file(
3938        paths::settings_file(),
3939        json!({
3940            "agent": {
3941                "default_profile": "test-profile",
3942                "profiles": {
3943                    "test-profile": {
3944                        "name": "Test Profile",
3945                        "tools": {
3946                            EchoTool::NAME: true,
3947                            DelayTool::NAME: true,
3948                            WordListTool::NAME: true,
3949                            ToolRequiringPermission::NAME: true,
3950                            InfiniteTool::NAME: true,
3951                            CancellationAwareTool::NAME: true,
3952                            StreamingEchoTool::NAME: true,
3953                            StreamingFailingEchoTool::NAME: true,
3954                            TerminalTool::NAME: true,
3955                            UpdatePlanTool::NAME: true,
3956                        }
3957                    }
3958                }
3959            }
3960        })
3961        .to_string()
3962        .into_bytes(),
3963    )
3964    .await;
3965
3966    cx.update(|cx| {
3967        settings::init(cx);
3968
3969        match model {
3970            TestModel::Fake => {}
3971            TestModel::Sonnet4 => {
3972                gpui_tokio::init(cx);
3973                let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3974                cx.set_http_client(Arc::new(http_client));
3975                let client = Client::production(cx);
3976                let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3977                language_model::init(user_store.clone(), client.clone(), cx);
3978                language_models::init(user_store, client.clone(), cx);
3979            }
3980        };
3981
3982        watch_settings(fs.clone(), cx);
3983    });
3984
3985    let templates = Templates::new();
3986
3987    fs.insert_tree(path!("/test"), json!({})).await;
3988    let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3989
3990    let model = cx
3991        .update(|cx| {
3992            if let TestModel::Fake = model {
3993                Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3994            } else {
3995                let model_id = model.id();
3996                let models = LanguageModelRegistry::read_global(cx);
3997                let model = models
3998                    .available_models(cx)
3999                    .find(|model| model.id() == model_id)
4000                    .unwrap();
4001
4002                let provider = models.provider(&model.provider_id()).unwrap();
4003                let authenticated = provider.authenticate(cx);
4004
4005                cx.spawn(async move |_cx| {
4006                    authenticated.await.unwrap();
4007                    model
4008                })
4009            }
4010        })
4011        .await;
4012
4013    let project_context = cx.new(|_cx| ProjectContext::default());
4014    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4015    let context_server_registry =
4016        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4017    let thread = cx.new(|cx| {
4018        Thread::new(
4019            project,
4020            project_context.clone(),
4021            context_server_registry,
4022            templates,
4023            Some(model.clone()),
4024            cx,
4025        )
4026    });
4027    ThreadTest {
4028        model,
4029        thread,
4030        project_context,
4031        context_server_store,
4032        fs,
4033    }
4034}
4035
4036#[cfg(test)]
4037#[ctor::ctor]
4038fn init_logger() {
4039    if std::env::var("RUST_LOG").is_ok() {
4040        env_logger::init();
4041    }
4042}
4043
4044fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4045    let fs = fs.clone();
4046    cx.spawn({
4047        async move |cx| {
4048            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4049                cx.background_executor(),
4050                fs,
4051                paths::settings_file().clone(),
4052            );
4053            let _watcher_task = watcher_task;
4054
4055            while let Some(new_settings_content) = new_settings_content_rx.next().await {
4056                cx.update(|cx| {
4057                    SettingsStore::update_global(cx, |settings, cx| {
4058                        settings.set_user_settings(&new_settings_content, cx)
4059                    })
4060                })
4061                .ok();
4062            }
4063        }
4064    })
4065    .detach();
4066}
4067
4068fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4069    completion
4070        .tools
4071        .iter()
4072        .map(|tool| tool.name.clone())
4073        .collect()
4074}
4075
4076fn setup_context_server(
4077    name: &'static str,
4078    tools: Vec<context_server::types::Tool>,
4079    context_server_store: &Entity<ContextServerStore>,
4080    cx: &mut TestAppContext,
4081) -> mpsc::UnboundedReceiver<(
4082    context_server::types::CallToolParams,
4083    oneshot::Sender<context_server::types::CallToolResponse>,
4084)> {
4085    cx.update(|cx| {
4086        let mut settings = ProjectSettings::get_global(cx).clone();
4087        settings.context_servers.insert(
4088            name.into(),
4089            project::project_settings::ContextServerSettings::Stdio {
4090                enabled: true,
4091                remote: false,
4092                command: ContextServerCommand {
4093                    path: "somebinary".into(),
4094                    args: Vec::new(),
4095                    env: None,
4096                    timeout: None,
4097                },
4098            },
4099        );
4100        ProjectSettings::override_global(settings, cx);
4101    });
4102
4103    let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4104    let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4105        .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4106            context_server::types::InitializeResponse {
4107                protocol_version: context_server::types::ProtocolVersion(
4108                    context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4109                ),
4110                server_info: context_server::types::Implementation {
4111                    name: name.into(),
4112                    version: "1.0.0".to_string(),
4113                },
4114                capabilities: context_server::types::ServerCapabilities {
4115                    tools: Some(context_server::types::ToolsCapabilities {
4116                        list_changed: Some(true),
4117                    }),
4118                    ..Default::default()
4119                },
4120                meta: None,
4121            }
4122        })
4123        .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4124            let tools = tools.clone();
4125            async move {
4126                context_server::types::ListToolsResponse {
4127                    tools,
4128                    next_cursor: None,
4129                    meta: None,
4130                }
4131            }
4132        })
4133        .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4134            let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4135            async move {
4136                let (response_tx, response_rx) = oneshot::channel();
4137                mcp_tool_calls_tx
4138                    .unbounded_send((params, response_tx))
4139                    .unwrap();
4140                response_rx.await.unwrap()
4141            }
4142        });
4143    context_server_store.update(cx, |store, cx| {
4144        store.start_server(
4145            Arc::new(ContextServer::new(
4146                ContextServerId(name.into()),
4147                Arc::new(fake_transport),
4148            )),
4149            cx,
4150        );
4151    });
4152    cx.run_until_parked();
4153    mcp_tool_calls_rx
4154}
4155
4156#[gpui::test]
4157async fn test_tokens_before_message(cx: &mut TestAppContext) {
4158    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4159    let fake_model = model.as_fake();
4160
4161    // First message
4162    let message_1_id = UserMessageId::new();
4163    thread
4164        .update(cx, |thread, cx| {
4165            thread.send(message_1_id.clone(), ["First message"], cx)
4166        })
4167        .unwrap();
4168    cx.run_until_parked();
4169
4170    // Before any response, tokens_before_message should return None for first message
4171    thread.read_with(cx, |thread, _| {
4172        assert_eq!(
4173            thread.tokens_before_message(&message_1_id),
4174            None,
4175            "First message should have no tokens before it"
4176        );
4177    });
4178
4179    // Complete first message with usage
4180    fake_model.send_last_completion_stream_text_chunk("Response 1");
4181    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4182        language_model::TokenUsage {
4183            input_tokens: 100,
4184            output_tokens: 50,
4185            cache_creation_input_tokens: 0,
4186            cache_read_input_tokens: 0,
4187        },
4188    ));
4189    fake_model.end_last_completion_stream();
4190    cx.run_until_parked();
4191
4192    // First message still has no tokens before it
4193    thread.read_with(cx, |thread, _| {
4194        assert_eq!(
4195            thread.tokens_before_message(&message_1_id),
4196            None,
4197            "First message should still have no tokens before it after response"
4198        );
4199    });
4200
4201    // Second message
4202    let message_2_id = UserMessageId::new();
4203    thread
4204        .update(cx, |thread, cx| {
4205            thread.send(message_2_id.clone(), ["Second message"], cx)
4206        })
4207        .unwrap();
4208    cx.run_until_parked();
4209
4210    // Second message should have first message's input tokens before it
4211    thread.read_with(cx, |thread, _| {
4212        assert_eq!(
4213            thread.tokens_before_message(&message_2_id),
4214            Some(100),
4215            "Second message should have 100 tokens before it (from first request)"
4216        );
4217    });
4218
4219    // Complete second message
4220    fake_model.send_last_completion_stream_text_chunk("Response 2");
4221    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4222        language_model::TokenUsage {
4223            input_tokens: 250, // Total for this request (includes previous context)
4224            output_tokens: 75,
4225            cache_creation_input_tokens: 0,
4226            cache_read_input_tokens: 0,
4227        },
4228    ));
4229    fake_model.end_last_completion_stream();
4230    cx.run_until_parked();
4231
4232    // Third message
4233    let message_3_id = UserMessageId::new();
4234    thread
4235        .update(cx, |thread, cx| {
4236            thread.send(message_3_id.clone(), ["Third message"], cx)
4237        })
4238        .unwrap();
4239    cx.run_until_parked();
4240
4241    // Third message should have second message's input tokens (250) before it
4242    thread.read_with(cx, |thread, _| {
4243        assert_eq!(
4244            thread.tokens_before_message(&message_3_id),
4245            Some(250),
4246            "Third message should have 250 tokens before it (from second request)"
4247        );
4248        // Second message should still have 100
4249        assert_eq!(
4250            thread.tokens_before_message(&message_2_id),
4251            Some(100),
4252            "Second message should still have 100 tokens before it"
4253        );
4254        // First message still has none
4255        assert_eq!(
4256            thread.tokens_before_message(&message_1_id),
4257            None,
4258            "First message should still have no tokens before it"
4259        );
4260    });
4261}
4262
4263#[gpui::test]
4264async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4265    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4266    let fake_model = model.as_fake();
4267
4268    // Set up three messages with responses
4269    let message_1_id = UserMessageId::new();
4270    thread
4271        .update(cx, |thread, cx| {
4272            thread.send(message_1_id.clone(), ["Message 1"], cx)
4273        })
4274        .unwrap();
4275    cx.run_until_parked();
4276    fake_model.send_last_completion_stream_text_chunk("Response 1");
4277    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4278        language_model::TokenUsage {
4279            input_tokens: 100,
4280            output_tokens: 50,
4281            cache_creation_input_tokens: 0,
4282            cache_read_input_tokens: 0,
4283        },
4284    ));
4285    fake_model.end_last_completion_stream();
4286    cx.run_until_parked();
4287
4288    let message_2_id = UserMessageId::new();
4289    thread
4290        .update(cx, |thread, cx| {
4291            thread.send(message_2_id.clone(), ["Message 2"], cx)
4292        })
4293        .unwrap();
4294    cx.run_until_parked();
4295    fake_model.send_last_completion_stream_text_chunk("Response 2");
4296    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4297        language_model::TokenUsage {
4298            input_tokens: 250,
4299            output_tokens: 75,
4300            cache_creation_input_tokens: 0,
4301            cache_read_input_tokens: 0,
4302        },
4303    ));
4304    fake_model.end_last_completion_stream();
4305    cx.run_until_parked();
4306
4307    // Verify initial state
4308    thread.read_with(cx, |thread, _| {
4309        assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4310    });
4311
4312    // Truncate at message 2 (removes message 2 and everything after)
4313    thread
4314        .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4315        .unwrap();
4316    cx.run_until_parked();
4317
4318    // After truncation, message_2_id no longer exists, so lookup should return None
4319    thread.read_with(cx, |thread, _| {
4320        assert_eq!(
4321            thread.tokens_before_message(&message_2_id),
4322            None,
4323            "After truncation, message 2 no longer exists"
4324        );
4325        // Message 1 still exists but has no tokens before it
4326        assert_eq!(
4327            thread.tokens_before_message(&message_1_id),
4328            None,
4329            "First message still has no tokens before it"
4330        );
4331    });
4332}
4333
4334#[gpui::test]
4335async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4336    init_test(cx);
4337
4338    let fs = FakeFs::new(cx.executor());
4339    fs.insert_tree("/root", json!({})).await;
4340    let project = Project::test(fs, ["/root".as_ref()], cx).await;
4341
4342    // Test 1: Deny rule blocks command
4343    {
4344        let environment = Rc::new(cx.update(|cx| {
4345            FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4346        }));
4347
4348        cx.update(|cx| {
4349            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4350            settings.tool_permissions.tools.insert(
4351                TerminalTool::NAME.into(),
4352                agent_settings::ToolRules {
4353                    default: Some(settings::ToolPermissionMode::Confirm),
4354                    always_allow: vec![],
4355                    always_deny: vec![
4356                        agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4357                    ],
4358                    always_confirm: vec![],
4359                    invalid_patterns: vec![],
4360                },
4361            );
4362            agent_settings::AgentSettings::override_global(settings, cx);
4363        });
4364
4365        #[allow(clippy::arc_with_non_send_sync)]
4366        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4367        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4368
4369        let task = cx.update(|cx| {
4370            tool.run(
4371                ToolInput::resolved(crate::TerminalToolInput {
4372                    command: "rm -rf /".to_string(),
4373                    cd: ".".to_string(),
4374                    timeout_ms: None,
4375                }),
4376                event_stream,
4377                cx,
4378            )
4379        });
4380
4381        let result = task.await;
4382        assert!(
4383            result.is_err(),
4384            "expected command to be blocked by deny rule"
4385        );
4386        let err_msg = result.unwrap_err().to_lowercase();
4387        assert!(
4388            err_msg.contains("blocked"),
4389            "error should mention the command was blocked"
4390        );
4391    }
4392
4393    // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4394    {
4395        let environment = Rc::new(cx.update(|cx| {
4396            FakeThreadEnvironment::default()
4397                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4398        }));
4399
4400        cx.update(|cx| {
4401            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4402            settings.tool_permissions.tools.insert(
4403                TerminalTool::NAME.into(),
4404                agent_settings::ToolRules {
4405                    default: Some(settings::ToolPermissionMode::Deny),
4406                    always_allow: vec![
4407                        agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4408                    ],
4409                    always_deny: vec![],
4410                    always_confirm: vec![],
4411                    invalid_patterns: vec![],
4412                },
4413            );
4414            agent_settings::AgentSettings::override_global(settings, cx);
4415        });
4416
4417        #[allow(clippy::arc_with_non_send_sync)]
4418        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4419        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4420
4421        let task = cx.update(|cx| {
4422            tool.run(
4423                ToolInput::resolved(crate::TerminalToolInput {
4424                    command: "echo hello".to_string(),
4425                    cd: ".".to_string(),
4426                    timeout_ms: None,
4427                }),
4428                event_stream,
4429                cx,
4430            )
4431        });
4432
4433        let update = rx.expect_update_fields().await;
4434        assert!(
4435            update.content.iter().any(|blocks| {
4436                blocks
4437                    .iter()
4438                    .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4439            }),
4440            "expected terminal content (allow rule should skip confirmation and override default deny)"
4441        );
4442
4443        let result = task.await;
4444        assert!(
4445            result.is_ok(),
4446            "expected command to succeed without confirmation"
4447        );
4448    }
4449
4450    // Test 3: global default: allow does NOT override always_confirm patterns
4451    {
4452        let environment = Rc::new(cx.update(|cx| {
4453            FakeThreadEnvironment::default()
4454                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4455        }));
4456
4457        cx.update(|cx| {
4458            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4459            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4460            settings.tool_permissions.tools.insert(
4461                TerminalTool::NAME.into(),
4462                agent_settings::ToolRules {
4463                    default: Some(settings::ToolPermissionMode::Allow),
4464                    always_allow: vec![],
4465                    always_deny: vec![],
4466                    always_confirm: vec![
4467                        agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4468                    ],
4469                    invalid_patterns: vec![],
4470                },
4471            );
4472            agent_settings::AgentSettings::override_global(settings, cx);
4473        });
4474
4475        #[allow(clippy::arc_with_non_send_sync)]
4476        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4477        let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4478
4479        let _task = cx.update(|cx| {
4480            tool.run(
4481                ToolInput::resolved(crate::TerminalToolInput {
4482                    command: "sudo rm file".to_string(),
4483                    cd: ".".to_string(),
4484                    timeout_ms: None,
4485                }),
4486                event_stream,
4487                cx,
4488            )
4489        });
4490
4491        // With global default: allow, confirm patterns are still respected
4492        // The expect_authorization() call will panic if no authorization is requested,
4493        // which validates that the confirm pattern still triggers confirmation
4494        let _auth = rx.expect_authorization().await;
4495
4496        drop(_task);
4497    }
4498
4499    // Test 4: tool-specific default: deny is respected even with global default: allow
4500    {
4501        let environment = Rc::new(cx.update(|cx| {
4502            FakeThreadEnvironment::default()
4503                .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4504        }));
4505
4506        cx.update(|cx| {
4507            let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4508            settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4509            settings.tool_permissions.tools.insert(
4510                TerminalTool::NAME.into(),
4511                agent_settings::ToolRules {
4512                    default: Some(settings::ToolPermissionMode::Deny),
4513                    always_allow: vec![],
4514                    always_deny: vec![],
4515                    always_confirm: vec![],
4516                    invalid_patterns: vec![],
4517                },
4518            );
4519            agent_settings::AgentSettings::override_global(settings, cx);
4520        });
4521
4522        #[allow(clippy::arc_with_non_send_sync)]
4523        let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4524        let (event_stream, _rx) = crate::ToolCallEventStream::test();
4525
4526        let task = cx.update(|cx| {
4527            tool.run(
4528                ToolInput::resolved(crate::TerminalToolInput {
4529                    command: "echo hello".to_string(),
4530                    cd: ".".to_string(),
4531                    timeout_ms: None,
4532                }),
4533                event_stream,
4534                cx,
4535            )
4536        });
4537
4538        // tool-specific default: deny is respected even with global default: allow
4539        let result = task.await;
4540        assert!(
4541            result.is_err(),
4542            "expected command to be blocked by tool-specific deny default"
4543        );
4544        let err_msg = result.unwrap_err().to_lowercase();
4545        assert!(
4546            err_msg.contains("disabled"),
4547            "error should mention the tool is disabled, got: {err_msg}"
4548        );
4549    }
4550}
4551
4552#[gpui::test]
4553async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4554    init_test(cx);
4555    cx.update(|cx| {
4556        LanguageModelRegistry::test(cx);
4557    });
4558    cx.update(|cx| {
4559        cx.update_flags(true, vec!["subagents".to_string()]);
4560    });
4561
4562    let fs = FakeFs::new(cx.executor());
4563    fs.insert_tree(
4564        "/",
4565        json!({
4566            "a": {
4567                "b.md": "Lorem"
4568            }
4569        }),
4570    )
4571    .await;
4572    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4573    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4574    let agent = cx.update(|cx| {
4575        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4576    });
4577    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4578
4579    let acp_thread = cx
4580        .update(|cx| {
4581            connection
4582                .clone()
4583                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4584        })
4585        .await
4586        .unwrap();
4587    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4588    let thread = agent.read_with(cx, |agent, _| {
4589        agent.sessions.get(&session_id).unwrap().thread.clone()
4590    });
4591    let model = Arc::new(FakeLanguageModel::default());
4592
4593    // Ensure empty threads are not saved, even if they get mutated.
4594    thread.update(cx, |thread, cx| {
4595        thread.set_model(model.clone(), cx);
4596    });
4597    cx.run_until_parked();
4598
4599    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4600    cx.run_until_parked();
4601    model.send_last_completion_stream_text_chunk("spawning subagent");
4602    let subagent_tool_input = SpawnAgentToolInput {
4603        label: "label".to_string(),
4604        message: "subagent task prompt".to_string(),
4605        session_id: None,
4606    };
4607    let subagent_tool_use = LanguageModelToolUse {
4608        id: "subagent_1".into(),
4609        name: SpawnAgentTool::NAME.into(),
4610        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4611        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4612        is_input_complete: true,
4613        thought_signature: None,
4614    };
4615    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4616        subagent_tool_use,
4617    ));
4618    model.end_last_completion_stream();
4619
4620    cx.run_until_parked();
4621
4622    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4623        thread
4624            .running_subagent_ids(cx)
4625            .get(0)
4626            .expect("subagent thread should be running")
4627            .clone()
4628    });
4629
4630    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4631        agent
4632            .sessions
4633            .get(&subagent_session_id)
4634            .expect("subagent session should exist")
4635            .acp_thread
4636            .clone()
4637    });
4638
4639    model.send_last_completion_stream_text_chunk("subagent task response");
4640    model.end_last_completion_stream();
4641
4642    cx.run_until_parked();
4643
4644    assert_eq!(
4645        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4646        indoc! {"
4647            ## User
4648
4649            subagent task prompt
4650
4651            ## Assistant
4652
4653            subagent task response
4654
4655        "}
4656    );
4657
4658    model.send_last_completion_stream_text_chunk("Response");
4659    model.end_last_completion_stream();
4660
4661    send.await.unwrap();
4662
4663    assert_eq!(
4664        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4665        indoc! {r#"
4666            ## User
4667
4668            Prompt
4669
4670            ## Assistant
4671
4672            spawning subagent
4673
4674            **Tool Call: label**
4675            Status: Completed
4676
4677            subagent task response
4678
4679            ## Assistant
4680
4681            Response
4682
4683        "#},
4684    );
4685}
4686
4687#[gpui::test]
4688async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4689    init_test(cx);
4690    cx.update(|cx| {
4691        LanguageModelRegistry::test(cx);
4692    });
4693    cx.update(|cx| {
4694        cx.update_flags(true, vec!["subagents".to_string()]);
4695    });
4696
4697    let fs = FakeFs::new(cx.executor());
4698    fs.insert_tree(
4699        "/",
4700        json!({
4701            "a": {
4702                "b.md": "Lorem"
4703            }
4704        }),
4705    )
4706    .await;
4707    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4708    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4709    let agent = cx.update(|cx| {
4710        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4711    });
4712    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4713
4714    let acp_thread = cx
4715        .update(|cx| {
4716            connection
4717                .clone()
4718                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4719        })
4720        .await
4721        .unwrap();
4722    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4723    let thread = agent.read_with(cx, |agent, _| {
4724        agent.sessions.get(&session_id).unwrap().thread.clone()
4725    });
4726    let model = Arc::new(FakeLanguageModel::default());
4727
4728    // Ensure empty threads are not saved, even if they get mutated.
4729    thread.update(cx, |thread, cx| {
4730        thread.set_model(model.clone(), cx);
4731    });
4732    cx.run_until_parked();
4733
4734    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4735    cx.run_until_parked();
4736    model.send_last_completion_stream_text_chunk("spawning subagent");
4737    let subagent_tool_input = SpawnAgentToolInput {
4738        label: "label".to_string(),
4739        message: "subagent task prompt".to_string(),
4740        session_id: None,
4741    };
4742    let subagent_tool_use = LanguageModelToolUse {
4743        id: "subagent_1".into(),
4744        name: SpawnAgentTool::NAME.into(),
4745        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4746        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4747        is_input_complete: true,
4748        thought_signature: None,
4749    };
4750    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4751        subagent_tool_use,
4752    ));
4753    model.end_last_completion_stream();
4754
4755    cx.run_until_parked();
4756
4757    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4758        thread
4759            .running_subagent_ids(cx)
4760            .get(0)
4761            .expect("subagent thread should be running")
4762            .clone()
4763    });
4764
4765    let subagent_thread = agent.read_with(cx, |agent, _cx| {
4766        agent
4767            .sessions
4768            .get(&subagent_session_id)
4769            .expect("subagent session should exist")
4770            .acp_thread
4771            .clone()
4772    });
4773
4774    model.send_last_completion_stream_text_chunk("subagent task response 1");
4775    model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4776        text: "thinking more about the subagent task".into(),
4777        signature: None,
4778    });
4779    model.send_last_completion_stream_text_chunk("subagent task response 2");
4780    model.end_last_completion_stream();
4781
4782    cx.run_until_parked();
4783
4784    assert_eq!(
4785        subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4786        indoc! {"
4787            ## User
4788
4789            subagent task prompt
4790
4791            ## Assistant
4792
4793            subagent task response 1
4794
4795            <thinking>
4796            thinking more about the subagent task
4797            </thinking>
4798
4799            subagent task response 2
4800
4801        "}
4802    );
4803
4804    model.send_last_completion_stream_text_chunk("Response");
4805    model.end_last_completion_stream();
4806
4807    send.await.unwrap();
4808
4809    assert_eq!(
4810        acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4811        indoc! {r#"
4812            ## User
4813
4814            Prompt
4815
4816            ## Assistant
4817
4818            spawning subagent
4819
4820            **Tool Call: label**
4821            Status: Completed
4822
4823            subagent task response 1
4824
4825            subagent task response 2
4826
4827            ## Assistant
4828
4829            Response
4830
4831        "#},
4832    );
4833}
4834
4835#[gpui::test]
4836async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4837    init_test(cx);
4838    cx.update(|cx| {
4839        LanguageModelRegistry::test(cx);
4840    });
4841    cx.update(|cx| {
4842        cx.update_flags(true, vec!["subagents".to_string()]);
4843    });
4844
4845    let fs = FakeFs::new(cx.executor());
4846    fs.insert_tree(
4847        "/",
4848        json!({
4849            "a": {
4850                "b.md": "Lorem"
4851            }
4852        }),
4853    )
4854    .await;
4855    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4856    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4857    let agent = cx.update(|cx| {
4858        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4859    });
4860    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4861
4862    let acp_thread = cx
4863        .update(|cx| {
4864            connection
4865                .clone()
4866                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4867        })
4868        .await
4869        .unwrap();
4870    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4871    let thread = agent.read_with(cx, |agent, _| {
4872        agent.sessions.get(&session_id).unwrap().thread.clone()
4873    });
4874    let model = Arc::new(FakeLanguageModel::default());
4875
4876    // Ensure empty threads are not saved, even if they get mutated.
4877    thread.update(cx, |thread, cx| {
4878        thread.set_model(model.clone(), cx);
4879    });
4880    cx.run_until_parked();
4881
4882    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4883    cx.run_until_parked();
4884    model.send_last_completion_stream_text_chunk("spawning subagent");
4885    let subagent_tool_input = SpawnAgentToolInput {
4886        label: "label".to_string(),
4887        message: "subagent task prompt".to_string(),
4888        session_id: None,
4889    };
4890    let subagent_tool_use = LanguageModelToolUse {
4891        id: "subagent_1".into(),
4892        name: SpawnAgentTool::NAME.into(),
4893        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4894        input: serde_json::to_value(&subagent_tool_input).unwrap(),
4895        is_input_complete: true,
4896        thought_signature: None,
4897    };
4898    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4899        subagent_tool_use,
4900    ));
4901    model.end_last_completion_stream();
4902
4903    cx.run_until_parked();
4904
4905    let subagent_session_id = thread.read_with(cx, |thread, cx| {
4906        thread
4907            .running_subagent_ids(cx)
4908            .get(0)
4909            .expect("subagent thread should be running")
4910            .clone()
4911    });
4912    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4913        agent
4914            .sessions
4915            .get(&subagent_session_id)
4916            .expect("subagent session should exist")
4917            .acp_thread
4918            .clone()
4919    });
4920
4921    // model.send_last_completion_stream_text_chunk("subagent task response");
4922    // model.end_last_completion_stream();
4923
4924    // cx.run_until_parked();
4925
4926    acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4927
4928    cx.run_until_parked();
4929
4930    send.await.unwrap();
4931
4932    acp_thread.read_with(cx, |thread, cx| {
4933        assert_eq!(thread.status(), ThreadStatus::Idle);
4934        assert_eq!(
4935            thread.to_markdown(cx),
4936            indoc! {"
4937                ## User
4938
4939                Prompt
4940
4941                ## Assistant
4942
4943                spawning subagent
4944
4945                **Tool Call: label**
4946                Status: Canceled
4947
4948            "}
4949        );
4950    });
4951    subagent_acp_thread.read_with(cx, |thread, cx| {
4952        assert_eq!(thread.status(), ThreadStatus::Idle);
4953        assert_eq!(
4954            thread.to_markdown(cx),
4955            indoc! {"
4956                ## User
4957
4958                subagent task prompt
4959
4960            "}
4961        );
4962    });
4963}
4964
4965#[gpui::test]
4966async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4967    init_test(cx);
4968    cx.update(|cx| {
4969        LanguageModelRegistry::test(cx);
4970    });
4971    cx.update(|cx| {
4972        cx.update_flags(true, vec!["subagents".to_string()]);
4973    });
4974
4975    let fs = FakeFs::new(cx.executor());
4976    fs.insert_tree(
4977        "/",
4978        json!({
4979            "a": {
4980                "b.md": "Lorem"
4981            }
4982        }),
4983    )
4984    .await;
4985    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4986    let thread_store = cx.new(|cx| ThreadStore::new(cx));
4987    let agent = cx.update(|cx| {
4988        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4989    });
4990    let connection = Rc::new(NativeAgentConnection(agent.clone()));
4991
4992    let acp_thread = cx
4993        .update(|cx| {
4994            connection
4995                .clone()
4996                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4997        })
4998        .await
4999        .unwrap();
5000    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5001    let thread = agent.read_with(cx, |agent, _| {
5002        agent.sessions.get(&session_id).unwrap().thread.clone()
5003    });
5004    let model = Arc::new(FakeLanguageModel::default());
5005
5006    thread.update(cx, |thread, cx| {
5007        thread.set_model(model.clone(), cx);
5008    });
5009    cx.run_until_parked();
5010
5011    // === First turn: create subagent ===
5012    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5013    cx.run_until_parked();
5014    model.send_last_completion_stream_text_chunk("spawning subagent");
5015    let subagent_tool_input = SpawnAgentToolInput {
5016        label: "initial task".to_string(),
5017        message: "do the first task".to_string(),
5018        session_id: None,
5019    };
5020    let subagent_tool_use = LanguageModelToolUse {
5021        id: "subagent_1".into(),
5022        name: SpawnAgentTool::NAME.into(),
5023        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5024        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5025        is_input_complete: true,
5026        thought_signature: None,
5027    };
5028    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5029        subagent_tool_use,
5030    ));
5031    model.end_last_completion_stream();
5032
5033    cx.run_until_parked();
5034
5035    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5036        thread
5037            .running_subagent_ids(cx)
5038            .get(0)
5039            .expect("subagent thread should be running")
5040            .clone()
5041    });
5042
5043    let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5044        agent
5045            .sessions
5046            .get(&subagent_session_id)
5047            .expect("subagent session should exist")
5048            .acp_thread
5049            .clone()
5050    });
5051
5052    // Subagent responds
5053    model.send_last_completion_stream_text_chunk("first task response");
5054    model.end_last_completion_stream();
5055
5056    cx.run_until_parked();
5057
5058    // Parent model responds to complete first turn
5059    model.send_last_completion_stream_text_chunk("First response");
5060    model.end_last_completion_stream();
5061
5062    send.await.unwrap();
5063
5064    // Verify subagent is no longer running
5065    thread.read_with(cx, |thread, cx| {
5066        assert!(
5067            thread.running_subagent_ids(cx).is_empty(),
5068            "subagent should not be running after completion"
5069        );
5070    });
5071
5072    // === Second turn: resume subagent with session_id ===
5073    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5074    cx.run_until_parked();
5075    model.send_last_completion_stream_text_chunk("resuming subagent");
5076    let resume_tool_input = SpawnAgentToolInput {
5077        label: "follow-up task".to_string(),
5078        message: "do the follow-up task".to_string(),
5079        session_id: Some(subagent_session_id.clone()),
5080    };
5081    let resume_tool_use = LanguageModelToolUse {
5082        id: "subagent_2".into(),
5083        name: SpawnAgentTool::NAME.into(),
5084        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5085        input: serde_json::to_value(&resume_tool_input).unwrap(),
5086        is_input_complete: true,
5087        thought_signature: None,
5088    };
5089    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5090    model.end_last_completion_stream();
5091
5092    cx.run_until_parked();
5093
5094    // Subagent should be running again with the same session
5095    thread.read_with(cx, |thread, cx| {
5096        let running = thread.running_subagent_ids(cx);
5097        assert_eq!(running.len(), 1, "subagent should be running");
5098        assert_eq!(running[0], subagent_session_id, "should be same session");
5099    });
5100
5101    // Subagent responds to follow-up
5102    model.send_last_completion_stream_text_chunk("follow-up task response");
5103    model.end_last_completion_stream();
5104
5105    cx.run_until_parked();
5106
5107    // Parent model responds to complete second turn
5108    model.send_last_completion_stream_text_chunk("Second response");
5109    model.end_last_completion_stream();
5110
5111    send2.await.unwrap();
5112
5113    // Verify subagent is no longer running
5114    thread.read_with(cx, |thread, cx| {
5115        assert!(
5116            thread.running_subagent_ids(cx).is_empty(),
5117            "subagent should not be running after resume completion"
5118        );
5119    });
5120
5121    // Verify the subagent's acp thread has both conversation turns
5122    assert_eq!(
5123        subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5124        indoc! {"
5125            ## User
5126
5127            do the first task
5128
5129            ## Assistant
5130
5131            first task response
5132
5133            ## User
5134
5135            do the follow-up task
5136
5137            ## Assistant
5138
5139            follow-up task response
5140
5141        "}
5142    );
5143}
5144
5145#[gpui::test]
5146async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5147    init_test(cx);
5148
5149    cx.update(|cx| {
5150        cx.update_flags(true, vec!["subagents".to_string()]);
5151    });
5152
5153    let fs = FakeFs::new(cx.executor());
5154    fs.insert_tree(path!("/test"), json!({})).await;
5155    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5156    let project_context = cx.new(|_cx| ProjectContext::default());
5157    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5158    let context_server_registry =
5159        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5160    let model = Arc::new(FakeLanguageModel::default());
5161
5162    let parent_thread = cx.new(|cx| {
5163        Thread::new(
5164            project.clone(),
5165            project_context,
5166            context_server_registry,
5167            Templates::new(),
5168            Some(model.clone()),
5169            cx,
5170        )
5171    });
5172
5173    let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5174    subagent_thread.read_with(cx, |subagent_thread, cx| {
5175        assert!(subagent_thread.is_subagent());
5176        assert_eq!(subagent_thread.depth(), 1);
5177        assert_eq!(
5178            subagent_thread.model().map(|model| model.id()),
5179            Some(model.id())
5180        );
5181        assert_eq!(
5182            subagent_thread.parent_thread_id(),
5183            Some(parent_thread.read(cx).id().clone())
5184        );
5185    });
5186}
5187
5188#[gpui::test]
5189async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5190    init_test(cx);
5191
5192    cx.update(|cx| {
5193        cx.update_flags(true, vec!["subagents".to_string()]);
5194    });
5195
5196    let fs = FakeFs::new(cx.executor());
5197    fs.insert_tree(path!("/test"), json!({})).await;
5198    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5199    let project_context = cx.new(|_cx| ProjectContext::default());
5200    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5201    let context_server_registry =
5202        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5203    let model = Arc::new(FakeLanguageModel::default());
5204    let environment = Rc::new(cx.update(|cx| {
5205        FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5206    }));
5207
5208    let deep_parent_thread = cx.new(|cx| {
5209        let mut thread = Thread::new(
5210            project.clone(),
5211            project_context,
5212            context_server_registry,
5213            Templates::new(),
5214            Some(model.clone()),
5215            cx,
5216        );
5217        thread.set_subagent_context(SubagentContext {
5218            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5219            depth: MAX_SUBAGENT_DEPTH - 1,
5220        });
5221        thread
5222    });
5223    let deep_subagent_thread = cx.new(|cx| {
5224        let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5225        thread.add_default_tools(environment, cx);
5226        thread
5227    });
5228
5229    deep_subagent_thread.read_with(cx, |thread, _| {
5230        assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5231        assert!(
5232            !thread.has_registered_tool(SpawnAgentTool::NAME),
5233            "subagent tool should not be present at max depth"
5234        );
5235    });
5236}
5237
5238#[gpui::test]
5239async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5240    init_test(cx);
5241
5242    cx.update(|cx| {
5243        cx.update_flags(true, vec!["subagents".to_string()]);
5244    });
5245
5246    let fs = FakeFs::new(cx.executor());
5247    fs.insert_tree(path!("/test"), json!({})).await;
5248    let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5249    let project_context = cx.new(|_cx| ProjectContext::default());
5250    let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5251    let context_server_registry =
5252        cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5253    let model = Arc::new(FakeLanguageModel::default());
5254
5255    let parent = cx.new(|cx| {
5256        Thread::new(
5257            project.clone(),
5258            project_context.clone(),
5259            context_server_registry.clone(),
5260            Templates::new(),
5261            Some(model.clone()),
5262            cx,
5263        )
5264    });
5265
5266    let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5267
5268    parent.update(cx, |thread, _cx| {
5269        thread.register_running_subagent(subagent.downgrade());
5270    });
5271
5272    subagent
5273        .update(cx, |thread, cx| {
5274            thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5275        })
5276        .unwrap();
5277    cx.run_until_parked();
5278
5279    subagent.read_with(cx, |thread, _| {
5280        assert!(!thread.is_turn_complete(), "subagent should be running");
5281    });
5282
5283    parent.update(cx, |thread, cx| {
5284        thread.cancel(cx).detach();
5285    });
5286
5287    subagent.read_with(cx, |thread, _| {
5288        assert!(
5289            thread.is_turn_complete(),
5290            "subagent should be cancelled when parent cancels"
5291        );
5292    });
5293}
5294
5295#[gpui::test]
5296async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5297    init_test(cx);
5298    cx.update(|cx| {
5299        LanguageModelRegistry::test(cx);
5300    });
5301    cx.update(|cx| {
5302        cx.update_flags(true, vec!["subagents".to_string()]);
5303    });
5304
5305    let fs = FakeFs::new(cx.executor());
5306    fs.insert_tree(
5307        "/",
5308        json!({
5309            "a": {
5310                "b.md": "Lorem"
5311            }
5312        }),
5313    )
5314    .await;
5315    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5316    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5317    let agent = cx.update(|cx| {
5318        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5319    });
5320    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5321
5322    let acp_thread = cx
5323        .update(|cx| {
5324            connection
5325                .clone()
5326                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5327        })
5328        .await
5329        .unwrap();
5330    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5331    let thread = agent.read_with(cx, |agent, _| {
5332        agent.sessions.get(&session_id).unwrap().thread.clone()
5333    });
5334    let model = Arc::new(FakeLanguageModel::default());
5335
5336    thread.update(cx, |thread, cx| {
5337        thread.set_model(model.clone(), cx);
5338    });
5339    cx.run_until_parked();
5340
5341    // Start the parent turn
5342    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5343    cx.run_until_parked();
5344    model.send_last_completion_stream_text_chunk("spawning subagent");
5345    let subagent_tool_input = SpawnAgentToolInput {
5346        label: "label".to_string(),
5347        message: "subagent task prompt".to_string(),
5348        session_id: None,
5349    };
5350    let subagent_tool_use = LanguageModelToolUse {
5351        id: "subagent_1".into(),
5352        name: SpawnAgentTool::NAME.into(),
5353        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5354        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5355        is_input_complete: true,
5356        thought_signature: None,
5357    };
5358    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5359        subagent_tool_use,
5360    ));
5361    model.end_last_completion_stream();
5362
5363    cx.run_until_parked();
5364
5365    // Verify subagent is running
5366    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5367        thread
5368            .running_subagent_ids(cx)
5369            .get(0)
5370            .expect("subagent thread should be running")
5371            .clone()
5372    });
5373
5374    // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5375    model.send_last_completion_stream_text_chunk("partial work");
5376    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5377        TokenUsage {
5378            input_tokens: 850_000,
5379            output_tokens: 0,
5380            cache_creation_input_tokens: 0,
5381            cache_read_input_tokens: 0,
5382        },
5383    ));
5384
5385    cx.run_until_parked();
5386
5387    // The subagent should no longer be running
5388    thread.read_with(cx, |thread, cx| {
5389        assert!(
5390            thread.running_subagent_ids(cx).is_empty(),
5391            "subagent should be stopped after context window warning"
5392        );
5393    });
5394
5395    // The parent model should get a new completion request to respond to the tool error
5396    model.send_last_completion_stream_text_chunk("Response after warning");
5397    model.end_last_completion_stream();
5398
5399    send.await.unwrap();
5400
5401    // Verify the parent thread shows the warning error in the tool call
5402    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5403    assert!(
5404        markdown.contains("nearing the end of its context window"),
5405        "tool output should contain context window warning message, got:\n{markdown}"
5406    );
5407    assert!(
5408        markdown.contains("Status: Failed"),
5409        "tool call should have Failed status, got:\n{markdown}"
5410    );
5411
5412    // Verify the subagent session still exists (can be resumed)
5413    agent.read_with(cx, |agent, _cx| {
5414        assert!(
5415            agent.sessions.contains_key(&subagent_session_id),
5416            "subagent session should still exist for potential resume"
5417        );
5418    });
5419}
5420
5421#[gpui::test]
5422async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5423    init_test(cx);
5424    cx.update(|cx| {
5425        LanguageModelRegistry::test(cx);
5426    });
5427    cx.update(|cx| {
5428        cx.update_flags(true, vec!["subagents".to_string()]);
5429    });
5430
5431    let fs = FakeFs::new(cx.executor());
5432    fs.insert_tree(
5433        "/",
5434        json!({
5435            "a": {
5436                "b.md": "Lorem"
5437            }
5438        }),
5439    )
5440    .await;
5441    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5442    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5443    let agent = cx.update(|cx| {
5444        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5445    });
5446    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5447
5448    let acp_thread = cx
5449        .update(|cx| {
5450            connection
5451                .clone()
5452                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5453        })
5454        .await
5455        .unwrap();
5456    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5457    let thread = agent.read_with(cx, |agent, _| {
5458        agent.sessions.get(&session_id).unwrap().thread.clone()
5459    });
5460    let model = Arc::new(FakeLanguageModel::default());
5461
5462    thread.update(cx, |thread, cx| {
5463        thread.set_model(model.clone(), cx);
5464    });
5465    cx.run_until_parked();
5466
5467    // === First turn: create subagent, trigger context window warning ===
5468    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5469    cx.run_until_parked();
5470    model.send_last_completion_stream_text_chunk("spawning subagent");
5471    let subagent_tool_input = SpawnAgentToolInput {
5472        label: "initial task".to_string(),
5473        message: "do the first task".to_string(),
5474        session_id: None,
5475    };
5476    let subagent_tool_use = LanguageModelToolUse {
5477        id: "subagent_1".into(),
5478        name: SpawnAgentTool::NAME.into(),
5479        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5480        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5481        is_input_complete: true,
5482        thought_signature: None,
5483    };
5484    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5485        subagent_tool_use,
5486    ));
5487    model.end_last_completion_stream();
5488
5489    cx.run_until_parked();
5490
5491    let subagent_session_id = thread.read_with(cx, |thread, cx| {
5492        thread
5493            .running_subagent_ids(cx)
5494            .get(0)
5495            .expect("subagent thread should be running")
5496            .clone()
5497    });
5498
5499    // Subagent sends a usage update that crosses the warning threshold.
5500    // This triggers Normal→Warning, stopping the subagent.
5501    model.send_last_completion_stream_text_chunk("partial work");
5502    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5503        TokenUsage {
5504            input_tokens: 850_000,
5505            output_tokens: 0,
5506            cache_creation_input_tokens: 0,
5507            cache_read_input_tokens: 0,
5508        },
5509    ));
5510
5511    cx.run_until_parked();
5512
5513    // Verify the first turn was stopped with a context window warning
5514    thread.read_with(cx, |thread, cx| {
5515        assert!(
5516            thread.running_subagent_ids(cx).is_empty(),
5517            "subagent should be stopped after context window warning"
5518        );
5519    });
5520
5521    // Parent model responds to complete first turn
5522    model.send_last_completion_stream_text_chunk("First response");
5523    model.end_last_completion_stream();
5524
5525    send.await.unwrap();
5526
5527    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5528    assert!(
5529        markdown.contains("nearing the end of its context window"),
5530        "first turn should have context window warning, got:\n{markdown}"
5531    );
5532
5533    // === Second turn: resume the same subagent (now at Warning level) ===
5534    let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5535    cx.run_until_parked();
5536    model.send_last_completion_stream_text_chunk("resuming subagent");
5537    let resume_tool_input = SpawnAgentToolInput {
5538        label: "follow-up task".to_string(),
5539        message: "do the follow-up task".to_string(),
5540        session_id: Some(subagent_session_id.clone()),
5541    };
5542    let resume_tool_use = LanguageModelToolUse {
5543        id: "subagent_2".into(),
5544        name: SpawnAgentTool::NAME.into(),
5545        raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5546        input: serde_json::to_value(&resume_tool_input).unwrap(),
5547        is_input_complete: true,
5548        thought_signature: None,
5549    };
5550    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5551    model.end_last_completion_stream();
5552
5553    cx.run_until_parked();
5554
5555    // Subagent responds with tokens still at warning level (no worse).
5556    // Since ratio_before_prompt was already Warning, this should NOT
5557    // trigger the context window warning again.
5558    model.send_last_completion_stream_text_chunk("follow-up task response");
5559    model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5560        TokenUsage {
5561            input_tokens: 870_000,
5562            output_tokens: 0,
5563            cache_creation_input_tokens: 0,
5564            cache_read_input_tokens: 0,
5565        },
5566    ));
5567    model.end_last_completion_stream();
5568
5569    cx.run_until_parked();
5570
5571    // Parent model responds to complete second turn
5572    model.send_last_completion_stream_text_chunk("Second response");
5573    model.end_last_completion_stream();
5574
5575    send2.await.unwrap();
5576
5577    // The resumed subagent should have completed normally since the ratio
5578    // didn't transition (it was Warning before and stayed at Warning)
5579    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5580    assert!(
5581        markdown.contains("follow-up task response"),
5582        "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5583    );
5584    // The second tool call should NOT have a context window warning
5585    let second_tool_pos = markdown
5586        .find("follow-up task")
5587        .expect("should find follow-up tool call");
5588    let after_second_tool = &markdown[second_tool_pos..];
5589    assert!(
5590        !after_second_tool.contains("nearing the end of its context window"),
5591        "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5592    );
5593}
5594
5595#[gpui::test]
5596async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5597    init_test(cx);
5598    cx.update(|cx| {
5599        LanguageModelRegistry::test(cx);
5600    });
5601    cx.update(|cx| {
5602        cx.update_flags(true, vec!["subagents".to_string()]);
5603    });
5604
5605    let fs = FakeFs::new(cx.executor());
5606    fs.insert_tree(
5607        "/",
5608        json!({
5609            "a": {
5610                "b.md": "Lorem"
5611            }
5612        }),
5613    )
5614    .await;
5615    let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5616    let thread_store = cx.new(|cx| ThreadStore::new(cx));
5617    let agent = cx.update(|cx| {
5618        NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5619    });
5620    let connection = Rc::new(NativeAgentConnection(agent.clone()));
5621
5622    let acp_thread = cx
5623        .update(|cx| {
5624            connection
5625                .clone()
5626                .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5627        })
5628        .await
5629        .unwrap();
5630    let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5631    let thread = agent.read_with(cx, |agent, _| {
5632        agent.sessions.get(&session_id).unwrap().thread.clone()
5633    });
5634    let model = Arc::new(FakeLanguageModel::default());
5635
5636    thread.update(cx, |thread, cx| {
5637        thread.set_model(model.clone(), cx);
5638    });
5639    cx.run_until_parked();
5640
5641    // Start the parent turn
5642    let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5643    cx.run_until_parked();
5644    model.send_last_completion_stream_text_chunk("spawning subagent");
5645    let subagent_tool_input = SpawnAgentToolInput {
5646        label: "label".to_string(),
5647        message: "subagent task prompt".to_string(),
5648        session_id: None,
5649    };
5650    let subagent_tool_use = LanguageModelToolUse {
5651        id: "subagent_1".into(),
5652        name: SpawnAgentTool::NAME.into(),
5653        raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5654        input: serde_json::to_value(&subagent_tool_input).unwrap(),
5655        is_input_complete: true,
5656        thought_signature: None,
5657    };
5658    model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5659        subagent_tool_use,
5660    ));
5661    model.end_last_completion_stream();
5662
5663    cx.run_until_parked();
5664
5665    // Verify subagent is running
5666    thread.read_with(cx, |thread, cx| {
5667        assert!(
5668            !thread.running_subagent_ids(cx).is_empty(),
5669            "subagent should be running"
5670        );
5671    });
5672
5673    // The subagent's model returns a non-retryable error
5674    model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5675        tokens: None,
5676    });
5677
5678    cx.run_until_parked();
5679
5680    // The subagent should no longer be running
5681    thread.read_with(cx, |thread, cx| {
5682        assert!(
5683            thread.running_subagent_ids(cx).is_empty(),
5684            "subagent should not be running after error"
5685        );
5686    });
5687
5688    // The parent model should get a new completion request to respond to the tool error
5689    model.send_last_completion_stream_text_chunk("Response after error");
5690    model.end_last_completion_stream();
5691
5692    send.await.unwrap();
5693
5694    // Verify the parent thread shows the error in the tool call
5695    let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5696    assert!(
5697        markdown.contains("Status: Failed"),
5698        "tool call should have Failed status after model error, got:\n{markdown}"
5699    );
5700}
5701
5702#[gpui::test]
5703async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5704    init_test(cx);
5705
5706    let fs = FakeFs::new(cx.executor());
5707    fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5708        .await;
5709    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5710
5711    cx.update(|cx| {
5712        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5713        settings.tool_permissions.tools.insert(
5714            EditFileTool::NAME.into(),
5715            agent_settings::ToolRules {
5716                default: Some(settings::ToolPermissionMode::Allow),
5717                always_allow: vec![],
5718                always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5719                always_confirm: vec![],
5720                invalid_patterns: vec![],
5721            },
5722        );
5723        agent_settings::AgentSettings::override_global(settings, cx);
5724    });
5725
5726    let context_server_registry =
5727        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5728    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5729    let templates = crate::Templates::new();
5730    let thread = cx.new(|cx| {
5731        crate::Thread::new(
5732            project.clone(),
5733            cx.new(|_cx| prompt_store::ProjectContext::default()),
5734            context_server_registry,
5735            templates.clone(),
5736            None,
5737            cx,
5738        )
5739    });
5740
5741    #[allow(clippy::arc_with_non_send_sync)]
5742    let tool = Arc::new(crate::EditFileTool::new(
5743        project.clone(),
5744        thread.downgrade(),
5745        language_registry,
5746        templates,
5747    ));
5748    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5749
5750    let task = cx.update(|cx| {
5751        tool.run(
5752            ToolInput::resolved(crate::EditFileToolInput {
5753                display_description: "Edit sensitive file".to_string(),
5754                path: "root/sensitive_config.txt".into(),
5755                mode: crate::EditFileMode::Edit,
5756            }),
5757            event_stream,
5758            cx,
5759        )
5760    });
5761
5762    let result = task.await;
5763    assert!(result.is_err(), "expected edit to be blocked");
5764    assert!(
5765        result.unwrap_err().to_string().contains("blocked"),
5766        "error should mention the edit was blocked"
5767    );
5768}
5769
5770#[gpui::test]
5771async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5772    init_test(cx);
5773
5774    let fs = FakeFs::new(cx.executor());
5775    fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5776        .await;
5777    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5778
5779    cx.update(|cx| {
5780        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5781        settings.tool_permissions.tools.insert(
5782            DeletePathTool::NAME.into(),
5783            agent_settings::ToolRules {
5784                default: Some(settings::ToolPermissionMode::Allow),
5785                always_allow: vec![],
5786                always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5787                always_confirm: vec![],
5788                invalid_patterns: vec![],
5789            },
5790        );
5791        agent_settings::AgentSettings::override_global(settings, cx);
5792    });
5793
5794    let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5795
5796    #[allow(clippy::arc_with_non_send_sync)]
5797    let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5798    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5799
5800    let task = cx.update(|cx| {
5801        tool.run(
5802            ToolInput::resolved(crate::DeletePathToolInput {
5803                path: "root/important_data.txt".to_string(),
5804            }),
5805            event_stream,
5806            cx,
5807        )
5808    });
5809
5810    let result = task.await;
5811    assert!(result.is_err(), "expected deletion to be blocked");
5812    assert!(
5813        result.unwrap_err().contains("blocked"),
5814        "error should mention the deletion was blocked"
5815    );
5816}
5817
5818#[gpui::test]
5819async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5820    init_test(cx);
5821
5822    let fs = FakeFs::new(cx.executor());
5823    fs.insert_tree(
5824        "/root",
5825        json!({
5826            "safe.txt": "content",
5827            "protected": {}
5828        }),
5829    )
5830    .await;
5831    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5832
5833    cx.update(|cx| {
5834        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5835        settings.tool_permissions.tools.insert(
5836            MovePathTool::NAME.into(),
5837            agent_settings::ToolRules {
5838                default: Some(settings::ToolPermissionMode::Allow),
5839                always_allow: vec![],
5840                always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5841                always_confirm: vec![],
5842                invalid_patterns: vec![],
5843            },
5844        );
5845        agent_settings::AgentSettings::override_global(settings, cx);
5846    });
5847
5848    #[allow(clippy::arc_with_non_send_sync)]
5849    let tool = Arc::new(crate::MovePathTool::new(project));
5850    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5851
5852    let task = cx.update(|cx| {
5853        tool.run(
5854            ToolInput::resolved(crate::MovePathToolInput {
5855                source_path: "root/safe.txt".to_string(),
5856                destination_path: "root/protected/safe.txt".to_string(),
5857            }),
5858            event_stream,
5859            cx,
5860        )
5861    });
5862
5863    let result = task.await;
5864    assert!(
5865        result.is_err(),
5866        "expected move to be blocked due to destination path"
5867    );
5868    assert!(
5869        result.unwrap_err().contains("blocked"),
5870        "error should mention the move was blocked"
5871    );
5872}
5873
5874#[gpui::test]
5875async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5876    init_test(cx);
5877
5878    let fs = FakeFs::new(cx.executor());
5879    fs.insert_tree(
5880        "/root",
5881        json!({
5882            "secret.txt": "secret content",
5883            "public": {}
5884        }),
5885    )
5886    .await;
5887    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5888
5889    cx.update(|cx| {
5890        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5891        settings.tool_permissions.tools.insert(
5892            MovePathTool::NAME.into(),
5893            agent_settings::ToolRules {
5894                default: Some(settings::ToolPermissionMode::Allow),
5895                always_allow: vec![],
5896                always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5897                always_confirm: vec![],
5898                invalid_patterns: vec![],
5899            },
5900        );
5901        agent_settings::AgentSettings::override_global(settings, cx);
5902    });
5903
5904    #[allow(clippy::arc_with_non_send_sync)]
5905    let tool = Arc::new(crate::MovePathTool::new(project));
5906    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5907
5908    let task = cx.update(|cx| {
5909        tool.run(
5910            ToolInput::resolved(crate::MovePathToolInput {
5911                source_path: "root/secret.txt".to_string(),
5912                destination_path: "root/public/not_secret.txt".to_string(),
5913            }),
5914            event_stream,
5915            cx,
5916        )
5917    });
5918
5919    let result = task.await;
5920    assert!(
5921        result.is_err(),
5922        "expected move to be blocked due to source path"
5923    );
5924    assert!(
5925        result.unwrap_err().contains("blocked"),
5926        "error should mention the move was blocked"
5927    );
5928}
5929
5930#[gpui::test]
5931async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5932    init_test(cx);
5933
5934    let fs = FakeFs::new(cx.executor());
5935    fs.insert_tree(
5936        "/root",
5937        json!({
5938            "confidential.txt": "confidential data",
5939            "dest": {}
5940        }),
5941    )
5942    .await;
5943    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5944
5945    cx.update(|cx| {
5946        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5947        settings.tool_permissions.tools.insert(
5948            CopyPathTool::NAME.into(),
5949            agent_settings::ToolRules {
5950                default: Some(settings::ToolPermissionMode::Allow),
5951                always_allow: vec![],
5952                always_deny: vec![
5953                    agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5954                ],
5955                always_confirm: vec![],
5956                invalid_patterns: vec![],
5957            },
5958        );
5959        agent_settings::AgentSettings::override_global(settings, cx);
5960    });
5961
5962    #[allow(clippy::arc_with_non_send_sync)]
5963    let tool = Arc::new(crate::CopyPathTool::new(project));
5964    let (event_stream, _rx) = crate::ToolCallEventStream::test();
5965
5966    let task = cx.update(|cx| {
5967        tool.run(
5968            ToolInput::resolved(crate::CopyPathToolInput {
5969                source_path: "root/confidential.txt".to_string(),
5970                destination_path: "root/dest/copy.txt".to_string(),
5971            }),
5972            event_stream,
5973            cx,
5974        )
5975    });
5976
5977    let result = task.await;
5978    assert!(result.is_err(), "expected copy to be blocked");
5979    assert!(
5980        result.unwrap_err().contains("blocked"),
5981        "error should mention the copy was blocked"
5982    );
5983}
5984
5985#[gpui::test]
5986async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5987    init_test(cx);
5988
5989    let fs = FakeFs::new(cx.executor());
5990    fs.insert_tree(
5991        "/root",
5992        json!({
5993            "normal.txt": "normal content",
5994            "readonly": {
5995                "config.txt": "readonly content"
5996            }
5997        }),
5998    )
5999    .await;
6000    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6001
6002    cx.update(|cx| {
6003        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6004        settings.tool_permissions.tools.insert(
6005            SaveFileTool::NAME.into(),
6006            agent_settings::ToolRules {
6007                default: Some(settings::ToolPermissionMode::Allow),
6008                always_allow: vec![],
6009                always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6010                always_confirm: vec![],
6011                invalid_patterns: vec![],
6012            },
6013        );
6014        agent_settings::AgentSettings::override_global(settings, cx);
6015    });
6016
6017    #[allow(clippy::arc_with_non_send_sync)]
6018    let tool = Arc::new(crate::SaveFileTool::new(project));
6019    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6020
6021    let task = cx.update(|cx| {
6022        tool.run(
6023            ToolInput::resolved(crate::SaveFileToolInput {
6024                paths: vec![
6025                    std::path::PathBuf::from("root/normal.txt"),
6026                    std::path::PathBuf::from("root/readonly/config.txt"),
6027                ],
6028            }),
6029            event_stream,
6030            cx,
6031        )
6032    });
6033
6034    let result = task.await;
6035    assert!(
6036        result.is_err(),
6037        "expected save to be blocked due to denied path"
6038    );
6039    assert!(
6040        result.unwrap_err().contains("blocked"),
6041        "error should mention the save was blocked"
6042    );
6043}
6044
6045#[gpui::test]
6046async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6047    init_test(cx);
6048
6049    let fs = FakeFs::new(cx.executor());
6050    fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6051        .await;
6052    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6053
6054    cx.update(|cx| {
6055        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6056        settings.tool_permissions.tools.insert(
6057            SaveFileTool::NAME.into(),
6058            agent_settings::ToolRules {
6059                default: Some(settings::ToolPermissionMode::Allow),
6060                always_allow: vec![],
6061                always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6062                always_confirm: vec![],
6063                invalid_patterns: vec![],
6064            },
6065        );
6066        agent_settings::AgentSettings::override_global(settings, cx);
6067    });
6068
6069    #[allow(clippy::arc_with_non_send_sync)]
6070    let tool = Arc::new(crate::SaveFileTool::new(project));
6071    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6072
6073    let task = cx.update(|cx| {
6074        tool.run(
6075            ToolInput::resolved(crate::SaveFileToolInput {
6076                paths: vec![std::path::PathBuf::from("root/config.secret")],
6077            }),
6078            event_stream,
6079            cx,
6080        )
6081    });
6082
6083    let result = task.await;
6084    assert!(result.is_err(), "expected save to be blocked");
6085    assert!(
6086        result.unwrap_err().contains("blocked"),
6087        "error should mention the save was blocked"
6088    );
6089}
6090
6091#[gpui::test]
6092async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6093    init_test(cx);
6094
6095    cx.update(|cx| {
6096        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6097        settings.tool_permissions.tools.insert(
6098            WebSearchTool::NAME.into(),
6099            agent_settings::ToolRules {
6100                default: Some(settings::ToolPermissionMode::Allow),
6101                always_allow: vec![],
6102                always_deny: vec![
6103                    agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6104                ],
6105                always_confirm: vec![],
6106                invalid_patterns: vec![],
6107            },
6108        );
6109        agent_settings::AgentSettings::override_global(settings, cx);
6110    });
6111
6112    #[allow(clippy::arc_with_non_send_sync)]
6113    let tool = Arc::new(crate::WebSearchTool);
6114    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6115
6116    let input: crate::WebSearchToolInput =
6117        serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6118
6119    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6120
6121    let result = task.await;
6122    assert!(result.is_err(), "expected search to be blocked");
6123    match result.unwrap_err() {
6124        crate::WebSearchToolOutput::Error { error } => {
6125            assert!(
6126                error.contains("blocked"),
6127                "error should mention the search was blocked"
6128            );
6129        }
6130        other => panic!("expected Error variant, got: {other:?}"),
6131    }
6132}
6133
6134#[gpui::test]
6135async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6136    init_test(cx);
6137
6138    let fs = FakeFs::new(cx.executor());
6139    fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6140        .await;
6141    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6142
6143    cx.update(|cx| {
6144        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6145        settings.tool_permissions.tools.insert(
6146            EditFileTool::NAME.into(),
6147            agent_settings::ToolRules {
6148                default: Some(settings::ToolPermissionMode::Confirm),
6149                always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6150                always_deny: vec![],
6151                always_confirm: vec![],
6152                invalid_patterns: vec![],
6153            },
6154        );
6155        agent_settings::AgentSettings::override_global(settings, cx);
6156    });
6157
6158    let context_server_registry =
6159        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6160    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6161    let templates = crate::Templates::new();
6162    let thread = cx.new(|cx| {
6163        crate::Thread::new(
6164            project.clone(),
6165            cx.new(|_cx| prompt_store::ProjectContext::default()),
6166            context_server_registry,
6167            templates.clone(),
6168            None,
6169            cx,
6170        )
6171    });
6172
6173    #[allow(clippy::arc_with_non_send_sync)]
6174    let tool = Arc::new(crate::EditFileTool::new(
6175        project,
6176        thread.downgrade(),
6177        language_registry,
6178        templates,
6179    ));
6180    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6181
6182    let _task = cx.update(|cx| {
6183        tool.run(
6184            ToolInput::resolved(crate::EditFileToolInput {
6185                display_description: "Edit README".to_string(),
6186                path: "root/README.md".into(),
6187                mode: crate::EditFileMode::Edit,
6188            }),
6189            event_stream,
6190            cx,
6191        )
6192    });
6193
6194    cx.run_until_parked();
6195
6196    let event = rx.try_next();
6197    assert!(
6198        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6199        "expected no authorization request for allowed .md file"
6200    );
6201}
6202
6203#[gpui::test]
6204async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6205    init_test(cx);
6206
6207    let fs = FakeFs::new(cx.executor());
6208    fs.insert_tree(
6209        "/root",
6210        json!({
6211            ".zed": {
6212                "settings.json": "{}"
6213            },
6214            "README.md": "# Hello"
6215        }),
6216    )
6217    .await;
6218    let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6219
6220    cx.update(|cx| {
6221        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6222        settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6223        agent_settings::AgentSettings::override_global(settings, cx);
6224    });
6225
6226    let context_server_registry =
6227        cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6228    let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6229    let templates = crate::Templates::new();
6230    let thread = cx.new(|cx| {
6231        crate::Thread::new(
6232            project.clone(),
6233            cx.new(|_cx| prompt_store::ProjectContext::default()),
6234            context_server_registry,
6235            templates.clone(),
6236            None,
6237            cx,
6238        )
6239    });
6240
6241    #[allow(clippy::arc_with_non_send_sync)]
6242    let tool = Arc::new(crate::EditFileTool::new(
6243        project,
6244        thread.downgrade(),
6245        language_registry,
6246        templates,
6247    ));
6248
6249    // Editing a file inside .zed/ should still prompt even with global default: allow,
6250    // because local settings paths are sensitive and require confirmation regardless.
6251    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6252    let _task = cx.update(|cx| {
6253        tool.run(
6254            ToolInput::resolved(crate::EditFileToolInput {
6255                display_description: "Edit local settings".to_string(),
6256                path: "root/.zed/settings.json".into(),
6257                mode: crate::EditFileMode::Edit,
6258            }),
6259            event_stream,
6260            cx,
6261        )
6262    });
6263
6264    let _update = rx.expect_update_fields().await;
6265    let _auth = rx.expect_authorization().await;
6266}
6267
6268#[gpui::test]
6269async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6270    init_test(cx);
6271
6272    cx.update(|cx| {
6273        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6274        settings.tool_permissions.tools.insert(
6275            FetchTool::NAME.into(),
6276            agent_settings::ToolRules {
6277                default: Some(settings::ToolPermissionMode::Allow),
6278                always_allow: vec![],
6279                always_deny: vec![
6280                    agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6281                ],
6282                always_confirm: vec![],
6283                invalid_patterns: vec![],
6284            },
6285        );
6286        agent_settings::AgentSettings::override_global(settings, cx);
6287    });
6288
6289    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6290
6291    #[allow(clippy::arc_with_non_send_sync)]
6292    let tool = Arc::new(crate::FetchTool::new(http_client));
6293    let (event_stream, _rx) = crate::ToolCallEventStream::test();
6294
6295    let input: crate::FetchToolInput =
6296        serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6297
6298    let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6299
6300    let result = task.await;
6301    assert!(result.is_err(), "expected fetch to be blocked");
6302    assert!(
6303        result.unwrap_err().contains("blocked"),
6304        "error should mention the fetch was blocked"
6305    );
6306}
6307
6308#[gpui::test]
6309async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6310    init_test(cx);
6311
6312    cx.update(|cx| {
6313        let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6314        settings.tool_permissions.tools.insert(
6315            FetchTool::NAME.into(),
6316            agent_settings::ToolRules {
6317                default: Some(settings::ToolPermissionMode::Confirm),
6318                always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6319                always_deny: vec![],
6320                always_confirm: vec![],
6321                invalid_patterns: vec![],
6322            },
6323        );
6324        agent_settings::AgentSettings::override_global(settings, cx);
6325    });
6326
6327    let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6328
6329    #[allow(clippy::arc_with_non_send_sync)]
6330    let tool = Arc::new(crate::FetchTool::new(http_client));
6331    let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6332
6333    let input: crate::FetchToolInput =
6334        serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6335
6336    let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6337
6338    cx.run_until_parked();
6339
6340    let event = rx.try_next();
6341    assert!(
6342        !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6343        "expected no authorization request for allowed docs.rs URL"
6344    );
6345}
6346
6347#[gpui::test]
6348async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6349    init_test(cx);
6350    always_allow_tools(cx);
6351
6352    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6353    let fake_model = model.as_fake();
6354
6355    // Add a tool so we can simulate tool calls
6356    thread.update(cx, |thread, _cx| {
6357        thread.add_tool(EchoTool);
6358    });
6359
6360    // Start a turn by sending a message
6361    let mut events = thread
6362        .update(cx, |thread, cx| {
6363            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6364        })
6365        .unwrap();
6366    cx.run_until_parked();
6367
6368    // Simulate the model making a tool call
6369    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6370        LanguageModelToolUse {
6371            id: "tool_1".into(),
6372            name: "echo".into(),
6373            raw_input: r#"{"text": "hello"}"#.into(),
6374            input: json!({"text": "hello"}),
6375            is_input_complete: true,
6376            thought_signature: None,
6377        },
6378    ));
6379    fake_model
6380        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6381
6382    // Signal that a message is queued before ending the stream
6383    thread.update(cx, |thread, _cx| {
6384        thread.set_has_queued_message(true);
6385    });
6386
6387    // Now end the stream - tool will run, and the boundary check should see the queue
6388    fake_model.end_last_completion_stream();
6389
6390    // Collect all events until the turn stops
6391    let all_events = collect_events_until_stop(&mut events, cx).await;
6392
6393    // Verify we received the tool call event
6394    let tool_call_ids: Vec<_> = all_events
6395        .iter()
6396        .filter_map(|e| match e {
6397            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6398            _ => None,
6399        })
6400        .collect();
6401    assert_eq!(
6402        tool_call_ids,
6403        vec!["tool_1"],
6404        "Should have received a tool call event for our echo tool"
6405    );
6406
6407    // The turn should have stopped with EndTurn
6408    let stop_reasons = stop_events(all_events);
6409    assert_eq!(
6410        stop_reasons,
6411        vec![acp::StopReason::EndTurn],
6412        "Turn should have ended after tool completion due to queued message"
6413    );
6414
6415    // Verify the queued message flag is still set
6416    thread.update(cx, |thread, _cx| {
6417        assert!(
6418            thread.has_queued_message(),
6419            "Should still have queued message flag set"
6420        );
6421    });
6422
6423    // Thread should be idle now
6424    thread.update(cx, |thread, _cx| {
6425        assert!(
6426            thread.is_turn_complete(),
6427            "Thread should not be running after turn ends"
6428        );
6429    });
6430}
6431
6432#[gpui::test]
6433async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6434    init_test(cx);
6435    always_allow_tools(cx);
6436
6437    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6438    let fake_model = model.as_fake();
6439
6440    thread.update(cx, |thread, _cx| {
6441        thread.add_tool(StreamingFailingEchoTool {
6442            receive_chunks_until_failure: 1,
6443        });
6444    });
6445
6446    let _events = thread
6447        .update(cx, |thread, cx| {
6448            thread.send(
6449                UserMessageId::new(),
6450                ["Use the streaming_failing_echo tool"],
6451                cx,
6452            )
6453        })
6454        .unwrap();
6455    cx.run_until_parked();
6456
6457    let tool_use = LanguageModelToolUse {
6458        id: "call_1".into(),
6459        name: StreamingFailingEchoTool::NAME.into(),
6460        raw_input: "hello".into(),
6461        input: json!({}),
6462        is_input_complete: false,
6463        thought_signature: None,
6464    };
6465
6466    fake_model
6467        .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6468
6469    cx.run_until_parked();
6470
6471    let completions = fake_model.pending_completions();
6472    let last_completion = completions.last().unwrap();
6473
6474    assert_eq!(
6475        last_completion.messages[1..],
6476        vec![
6477            LanguageModelRequestMessage {
6478                role: Role::User,
6479                content: vec!["Use the streaming_failing_echo tool".into()],
6480                cache: false,
6481                reasoning_details: None,
6482            },
6483            LanguageModelRequestMessage {
6484                role: Role::Assistant,
6485                content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6486                cache: false,
6487                reasoning_details: None,
6488            },
6489            LanguageModelRequestMessage {
6490                role: Role::User,
6491                content: vec![language_model::MessageContent::ToolResult(
6492                    LanguageModelToolResult {
6493                        tool_use_id: tool_use.id.clone(),
6494                        tool_name: tool_use.name,
6495                        is_error: true,
6496                        content: "failed".into(),
6497                        output: Some("failed".into()),
6498                    }
6499                )],
6500                cache: true,
6501                reasoning_details: None,
6502            },
6503        ]
6504    );
6505}
6506
6507#[gpui::test]
6508async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6509    init_test(cx);
6510    always_allow_tools(cx);
6511
6512    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6513    let fake_model = model.as_fake();
6514
6515    let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6516        oneshot::channel();
6517
6518    thread.update(cx, |thread, _cx| {
6519        thread.add_tool(
6520            StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6521        );
6522        thread.add_tool(StreamingFailingEchoTool {
6523            receive_chunks_until_failure: 1,
6524        });
6525    });
6526
6527    let _events = thread
6528        .update(cx, |thread, cx| {
6529            thread.send(
6530                UserMessageId::new(),
6531                ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6532                cx,
6533            )
6534        })
6535        .unwrap();
6536    cx.run_until_parked();
6537
6538    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6539        LanguageModelToolUse {
6540            id: "call_1".into(),
6541            name: StreamingEchoTool::NAME.into(),
6542            raw_input: "hello".into(),
6543            input: json!({ "text": "hello" }),
6544            is_input_complete: false,
6545            thought_signature: None,
6546        },
6547    ));
6548    let first_tool_use = LanguageModelToolUse {
6549        id: "call_1".into(),
6550        name: StreamingEchoTool::NAME.into(),
6551        raw_input: "hello world".into(),
6552        input: json!({ "text": "hello world" }),
6553        is_input_complete: true,
6554        thought_signature: None,
6555    };
6556    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6557        first_tool_use.clone(),
6558    ));
6559    let second_tool_use = LanguageModelToolUse {
6560        name: StreamingFailingEchoTool::NAME.into(),
6561        raw_input: "hello".into(),
6562        input: json!({ "text": "hello" }),
6563        is_input_complete: false,
6564        thought_signature: None,
6565        id: "call_2".into(),
6566    };
6567    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6568        second_tool_use.clone(),
6569    ));
6570
6571    cx.run_until_parked();
6572
6573    complete_streaming_echo_tool_call_tx.send(()).unwrap();
6574
6575    cx.run_until_parked();
6576
6577    let completions = fake_model.pending_completions();
6578    let last_completion = completions.last().unwrap();
6579
6580    assert_eq!(
6581        last_completion.messages[1..],
6582        vec![
6583            LanguageModelRequestMessage {
6584                role: Role::User,
6585                content: vec![
6586                    "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6587                ],
6588                cache: false,
6589                reasoning_details: None,
6590            },
6591            LanguageModelRequestMessage {
6592                role: Role::Assistant,
6593                content: vec![
6594                    language_model::MessageContent::ToolUse(first_tool_use.clone()),
6595                    language_model::MessageContent::ToolUse(second_tool_use.clone())
6596                ],
6597                cache: false,
6598                reasoning_details: None,
6599            },
6600            LanguageModelRequestMessage {
6601                role: Role::User,
6602                content: vec![
6603                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6604                        tool_use_id: second_tool_use.id.clone(),
6605                        tool_name: second_tool_use.name,
6606                        is_error: true,
6607                        content: "failed".into(),
6608                        output: Some("failed".into()),
6609                    }),
6610                    language_model::MessageContent::ToolResult(LanguageModelToolResult {
6611                        tool_use_id: first_tool_use.id.clone(),
6612                        tool_name: first_tool_use.name,
6613                        is_error: false,
6614                        content: "hello world".into(),
6615                        output: Some("hello world".into()),
6616                    }),
6617                ],
6618                cache: true,
6619                reasoning_details: None,
6620            },
6621        ]
6622    );
6623}
6624
6625#[gpui::test]
6626async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6627    let ThreadTest {
6628        model, thread, fs, ..
6629    } = setup(cx, TestModel::Fake).await;
6630    let fake_model_a = model.as_fake();
6631
6632    thread.update(cx, |thread, _cx| {
6633        thread.add_tool(EchoTool);
6634        thread.add_tool(DelayTool);
6635    });
6636
6637    // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6638    fs.insert_file(
6639        paths::settings_file(),
6640        json!({
6641            "agent": {
6642                "profiles": {
6643                    "profile-a": {
6644                        "name": "Profile A",
6645                        "tools": {
6646                            EchoTool::NAME: true,
6647                            DelayTool::NAME: true,
6648                        }
6649                    },
6650                    "profile-b": {
6651                        "name": "Profile B",
6652                        "tools": {
6653                            DelayTool::NAME: true,
6654                        }
6655                    }
6656                }
6657            }
6658        })
6659        .to_string()
6660        .into_bytes(),
6661    )
6662    .await;
6663    cx.run_until_parked();
6664
6665    thread.update(cx, |thread, cx| {
6666        thread.set_profile(AgentProfileId("profile-a".into()), cx);
6667        thread.set_thinking_enabled(false, cx);
6668    });
6669
6670    // Send a message — first iteration starts with model A, profile-a, thinking off.
6671    thread
6672        .update(cx, |thread, cx| {
6673            thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6674        })
6675        .unwrap();
6676    cx.run_until_parked();
6677
6678    // Verify first request has both tools and thinking disabled.
6679    let completions = fake_model_a.pending_completions();
6680    assert_eq!(completions.len(), 1);
6681    let first_tools = tool_names_for_completion(&completions[0]);
6682    assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6683    assert!(!completions[0].thinking_allowed);
6684
6685    // Model A responds with an echo tool call.
6686    fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6687        LanguageModelToolUse {
6688            id: "tool_1".into(),
6689            name: "echo".into(),
6690            raw_input: r#"{"text":"hello"}"#.into(),
6691            input: json!({"text": "hello"}),
6692            is_input_complete: true,
6693            thought_signature: None,
6694        },
6695    ));
6696    fake_model_a.end_last_completion_stream();
6697
6698    // Before the next iteration runs, switch to profile-b (only DelayTool),
6699    // swap in a new model, and enable thinking.
6700    let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6701        "test-provider",
6702        "model-b",
6703        "Model B",
6704        true,
6705    ));
6706    thread.update(cx, |thread, cx| {
6707        thread.set_profile(AgentProfileId("profile-b".into()), cx);
6708        thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6709        thread.set_thinking_enabled(true, cx);
6710    });
6711
6712    // Run until parked — processes the echo tool call, loops back, picks up
6713    // the new model/profile/thinking, and makes a second request to model B.
6714    cx.run_until_parked();
6715
6716    // The second request should have gone to model B.
6717    let model_b_completions = fake_model_b.pending_completions();
6718    assert_eq!(
6719        model_b_completions.len(),
6720        1,
6721        "second request should go to model B"
6722    );
6723
6724    // Profile-b only has DelayTool, so echo should be gone.
6725    let second_tools = tool_names_for_completion(&model_b_completions[0]);
6726    assert_eq!(second_tools, vec![DelayTool::NAME]);
6727
6728    // Thinking should now be enabled.
6729    assert!(model_b_completions[0].thinking_allowed);
6730}