1use super::*;
2use acp_thread::{
3 AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
4 UserMessageId,
5};
6use agent_client_protocol::{self as acp};
7use agent_settings::AgentProfileId;
8use anyhow::Result;
9use client::{Client, RefreshLlmTokenListener, UserStore};
10use collections::IndexMap;
11use context_server::{ContextServer, ContextServerCommand, ContextServerId};
12use feature_flags::FeatureFlagAppExt as _;
13use fs::{FakeFs, Fs};
14use futures::{
15 FutureExt as _, StreamExt,
16 channel::{
17 mpsc::{self, UnboundedReceiver},
18 oneshot,
19 },
20 future::{Fuse, Shared},
21};
22use gpui::{
23 App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
24 http_client::FakeHttpClient,
25};
26use indoc::indoc;
27use language_model::{
28 CompletionIntent, LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent,
29 LanguageModelId, LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
30 LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
31 LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
32 fake_provider::FakeLanguageModel,
33};
34use pretty_assertions::assert_eq;
35use project::{
36 Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
37};
38use prompt_store::ProjectContext;
39use reqwest_client::ReqwestClient;
40use schemars::JsonSchema;
41use serde::{Deserialize, Serialize};
42use serde_json::json;
43use settings::{Settings, SettingsStore};
44use std::{
45 path::Path,
46 pin::Pin,
47 rc::Rc,
48 sync::{
49 Arc,
50 atomic::{AtomicBool, AtomicUsize, Ordering},
51 },
52 time::Duration,
53};
54use util::path;
55
56mod edit_file_thread_test;
57mod test_tools;
58use test_tools::*;
59
60pub(crate) fn init_test(cx: &mut TestAppContext) {
61 cx.update(|cx| {
62 let settings_store = SettingsStore::test(cx);
63 cx.set_global(settings_store);
64 });
65}
66
67pub(crate) struct FakeTerminalHandle {
68 killed: Arc<AtomicBool>,
69 stopped_by_user: Arc<AtomicBool>,
70 exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
71 wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
72 output: acp::TerminalOutputResponse,
73 id: acp::TerminalId,
74}
75
76impl FakeTerminalHandle {
77 pub(crate) fn new_never_exits(cx: &mut App) -> Self {
78 let killed = Arc::new(AtomicBool::new(false));
79 let stopped_by_user = Arc::new(AtomicBool::new(false));
80
81 let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
82
83 let wait_for_exit = cx
84 .spawn(async move |_cx| {
85 // Wait for the exit signal (sent when kill() is called)
86 let _ = exit_receiver.await;
87 acp::TerminalExitStatus::new()
88 })
89 .shared();
90
91 Self {
92 killed,
93 stopped_by_user,
94 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
95 wait_for_exit,
96 output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
97 id: acp::TerminalId::new("fake_terminal".to_string()),
98 }
99 }
100
101 pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
102 let killed = Arc::new(AtomicBool::new(false));
103 let stopped_by_user = Arc::new(AtomicBool::new(false));
104 let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
105
106 let wait_for_exit = cx
107 .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
108 .shared();
109
110 Self {
111 killed,
112 stopped_by_user,
113 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
114 wait_for_exit,
115 output: acp::TerminalOutputResponse::new("command output".to_string(), false),
116 id: acp::TerminalId::new("fake_terminal".to_string()),
117 }
118 }
119
120 pub(crate) fn was_killed(&self) -> bool {
121 self.killed.load(Ordering::SeqCst)
122 }
123
124 pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
125 self.stopped_by_user.store(stopped, Ordering::SeqCst);
126 }
127
128 pub(crate) fn signal_exit(&self) {
129 if let Some(sender) = self.exit_sender.borrow_mut().take() {
130 let _ = sender.send(());
131 }
132 }
133}
134
135impl crate::TerminalHandle for FakeTerminalHandle {
136 fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
137 Ok(self.id.clone())
138 }
139
140 fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
141 Ok(self.output.clone())
142 }
143
144 fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
145 Ok(self.wait_for_exit.clone())
146 }
147
148 fn kill(&self, _cx: &AsyncApp) -> Result<()> {
149 self.killed.store(true, Ordering::SeqCst);
150 self.signal_exit();
151 Ok(())
152 }
153
154 fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
155 Ok(self.stopped_by_user.load(Ordering::SeqCst))
156 }
157}
158
159struct FakeSubagentHandle {
160 session_id: acp::SessionId,
161 send_task: Shared<Task<String>>,
162}
163
164impl SubagentHandle for FakeSubagentHandle {
165 fn id(&self) -> acp::SessionId {
166 self.session_id.clone()
167 }
168
169 fn num_entries(&self, _cx: &App) -> usize {
170 unimplemented!()
171 }
172
173 fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
174 let task = self.send_task.clone();
175 cx.background_spawn(async move { Ok(task.await) })
176 }
177}
178
179#[derive(Default)]
180pub(crate) struct FakeThreadEnvironment {
181 terminal_handle: Option<Rc<FakeTerminalHandle>>,
182 subagent_handle: Option<Rc<FakeSubagentHandle>>,
183 terminal_creations: Arc<AtomicUsize>,
184}
185
186impl FakeThreadEnvironment {
187 pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
188 Self {
189 terminal_handle: Some(terminal_handle.into()),
190 ..self
191 }
192 }
193
194 pub(crate) fn terminal_creation_count(&self) -> usize {
195 self.terminal_creations.load(Ordering::SeqCst)
196 }
197}
198
199impl crate::ThreadEnvironment for FakeThreadEnvironment {
200 fn create_terminal(
201 &self,
202 _command: String,
203 _cwd: Option<std::path::PathBuf>,
204 _output_byte_limit: Option<u64>,
205 _cx: &mut AsyncApp,
206 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
207 self.terminal_creations.fetch_add(1, Ordering::SeqCst);
208 let handle = self
209 .terminal_handle
210 .clone()
211 .expect("Terminal handle not available on FakeThreadEnvironment");
212 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
213 }
214
215 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
216 Ok(self
217 .subagent_handle
218 .clone()
219 .expect("Subagent handle not available on FakeThreadEnvironment")
220 as Rc<dyn SubagentHandle>)
221 }
222}
223
224/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
225struct MultiTerminalEnvironment {
226 handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
227}
228
229impl MultiTerminalEnvironment {
230 fn new() -> Self {
231 Self {
232 handles: std::cell::RefCell::new(Vec::new()),
233 }
234 }
235
236 fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
237 self.handles.borrow().clone()
238 }
239}
240
241impl crate::ThreadEnvironment for MultiTerminalEnvironment {
242 fn create_terminal(
243 &self,
244 _command: String,
245 _cwd: Option<std::path::PathBuf>,
246 _output_byte_limit: Option<u64>,
247 cx: &mut AsyncApp,
248 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
249 let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
250 self.handles.borrow_mut().push(handle.clone());
251 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
252 }
253
254 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
255 unimplemented!()
256 }
257}
258
259fn always_allow_tools(cx: &mut TestAppContext) {
260 cx.update(|cx| {
261 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
262 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
263 agent_settings::AgentSettings::override_global(settings, cx);
264 });
265}
266
267#[gpui::test]
268async fn test_echo(cx: &mut TestAppContext) {
269 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
270 let fake_model = model.as_fake();
271
272 let events = thread
273 .update(cx, |thread, cx| {
274 thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
275 })
276 .unwrap();
277 cx.run_until_parked();
278 fake_model.send_last_completion_stream_text_chunk("Hello");
279 fake_model
280 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
281 fake_model.end_last_completion_stream();
282
283 let events = events.collect().await;
284 thread.update(cx, |thread, _cx| {
285 assert_eq!(
286 thread.last_received_or_pending_message().unwrap().role(),
287 Role::Assistant
288 );
289 assert_eq!(
290 thread
291 .last_received_or_pending_message()
292 .unwrap()
293 .to_markdown(),
294 "Hello\n"
295 )
296 });
297 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
298}
299
300#[gpui::test]
301async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
302 init_test(cx);
303 always_allow_tools(cx);
304
305 let fs = FakeFs::new(cx.executor());
306 let project = Project::test(fs, [], cx).await;
307
308 let environment = Rc::new(cx.update(|cx| {
309 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
310 }));
311 let handle = environment.terminal_handle.clone().unwrap();
312
313 #[allow(clippy::arc_with_non_send_sync)]
314 let tool = Arc::new(crate::TerminalTool::new(project, environment));
315 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
316
317 let task = cx.update(|cx| {
318 tool.run(
319 ToolInput::resolved(crate::TerminalToolInput {
320 command: "sleep 1000".to_string(),
321 cd: ".".to_string(),
322 timeout_ms: Some(5),
323 }),
324 event_stream,
325 cx,
326 )
327 });
328
329 let update = rx.expect_update_fields().await;
330 assert!(
331 update.content.iter().any(|blocks| {
332 blocks
333 .iter()
334 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
335 }),
336 "expected tool call update to include terminal content"
337 );
338
339 let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
340
341 let deadline = std::time::Instant::now() + Duration::from_millis(500);
342 loop {
343 if let Some(result) = task_future.as_mut().now_or_never() {
344 let result = result.expect("terminal tool task should complete");
345
346 assert!(
347 handle.was_killed(),
348 "expected terminal handle to be killed on timeout"
349 );
350 assert!(
351 result.contains("partial output"),
352 "expected result to include terminal output, got: {result}"
353 );
354 return;
355 }
356
357 if std::time::Instant::now() >= deadline {
358 panic!("timed out waiting for terminal tool task to complete");
359 }
360
361 cx.run_until_parked();
362 cx.background_executor.timer(Duration::from_millis(1)).await;
363 }
364}
365
366#[gpui::test]
367#[ignore]
368async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
369 init_test(cx);
370 always_allow_tools(cx);
371
372 let fs = FakeFs::new(cx.executor());
373 let project = Project::test(fs, [], cx).await;
374
375 let environment = Rc::new(cx.update(|cx| {
376 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
377 }));
378 let handle = environment.terminal_handle.clone().unwrap();
379
380 #[allow(clippy::arc_with_non_send_sync)]
381 let tool = Arc::new(crate::TerminalTool::new(project, environment));
382 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
383
384 let _task = cx.update(|cx| {
385 tool.run(
386 ToolInput::resolved(crate::TerminalToolInput {
387 command: "sleep 1000".to_string(),
388 cd: ".".to_string(),
389 timeout_ms: None,
390 }),
391 event_stream,
392 cx,
393 )
394 });
395
396 let update = rx.expect_update_fields().await;
397 assert!(
398 update.content.iter().any(|blocks| {
399 blocks
400 .iter()
401 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
402 }),
403 "expected tool call update to include terminal content"
404 );
405
406 cx.background_executor
407 .timer(Duration::from_millis(25))
408 .await;
409
410 assert!(
411 !handle.was_killed(),
412 "did not expect terminal handle to be killed without a timeout"
413 );
414}
415
416#[gpui::test]
417async fn test_thinking(cx: &mut TestAppContext) {
418 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
419 let fake_model = model.as_fake();
420
421 let events = thread
422 .update(cx, |thread, cx| {
423 thread.send(
424 UserMessageId::new(),
425 [indoc! {"
426 Testing:
427
428 Generate a thinking step where you just think the word 'Think',
429 and have your final answer be 'Hello'
430 "}],
431 cx,
432 )
433 })
434 .unwrap();
435 cx.run_until_parked();
436 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
437 text: "Think".to_string(),
438 signature: None,
439 });
440 fake_model.send_last_completion_stream_text_chunk("Hello");
441 fake_model
442 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
443 fake_model.end_last_completion_stream();
444
445 let events = events.collect().await;
446 thread.update(cx, |thread, _cx| {
447 assert_eq!(
448 thread.last_received_or_pending_message().unwrap().role(),
449 Role::Assistant
450 );
451 assert_eq!(
452 thread
453 .last_received_or_pending_message()
454 .unwrap()
455 .to_markdown(),
456 indoc! {"
457 <think>Think</think>
458 Hello
459 "}
460 )
461 });
462 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
463}
464
465#[gpui::test]
466async fn test_system_prompt(cx: &mut TestAppContext) {
467 let ThreadTest {
468 model,
469 thread,
470 project_context,
471 ..
472 } = setup(cx, TestModel::Fake).await;
473 let fake_model = model.as_fake();
474
475 project_context.update(cx, |project_context, _cx| {
476 project_context.shell = "test-shell".into()
477 });
478 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
479 thread
480 .update(cx, |thread, cx| {
481 thread.send(UserMessageId::new(), ["abc"], cx)
482 })
483 .unwrap();
484 cx.run_until_parked();
485 let mut pending_completions = fake_model.pending_completions();
486 assert_eq!(
487 pending_completions.len(),
488 1,
489 "unexpected pending completions: {:?}",
490 pending_completions
491 );
492
493 let pending_completion = pending_completions.pop().unwrap();
494 assert_eq!(pending_completion.messages[0].role, Role::System);
495
496 let system_message = &pending_completion.messages[0];
497 let system_prompt = system_message.content[0].to_str().unwrap();
498 assert!(
499 system_prompt.contains("test-shell"),
500 "unexpected system message: {:?}",
501 system_message
502 );
503 assert!(
504 system_prompt.contains("## Fixing Diagnostics"),
505 "unexpected system message: {:?}",
506 system_message
507 );
508}
509
510#[gpui::test]
511async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
512 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
513 let fake_model = model.as_fake();
514
515 thread
516 .update(cx, |thread, cx| {
517 thread.send(UserMessageId::new(), ["abc"], cx)
518 })
519 .unwrap();
520 cx.run_until_parked();
521 let mut pending_completions = fake_model.pending_completions();
522 assert_eq!(
523 pending_completions.len(),
524 1,
525 "unexpected pending completions: {:?}",
526 pending_completions
527 );
528
529 let pending_completion = pending_completions.pop().unwrap();
530 assert_eq!(pending_completion.messages[0].role, Role::System);
531
532 let system_message = &pending_completion.messages[0];
533 let system_prompt = system_message.content[0].to_str().unwrap();
534 assert!(
535 !system_prompt.contains("## Tool Use"),
536 "unexpected system message: {:?}",
537 system_message
538 );
539 assert!(
540 !system_prompt.contains("## Fixing Diagnostics"),
541 "unexpected system message: {:?}",
542 system_message
543 );
544}
545
546#[gpui::test]
547async fn test_prompt_caching(cx: &mut TestAppContext) {
548 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
549 let fake_model = model.as_fake();
550
551 // Send initial user message and verify it's cached
552 thread
553 .update(cx, |thread, cx| {
554 thread.send(UserMessageId::new(), ["Message 1"], cx)
555 })
556 .unwrap();
557 cx.run_until_parked();
558
559 let completion = fake_model.pending_completions().pop().unwrap();
560 assert_eq!(
561 completion.messages[1..],
562 vec![LanguageModelRequestMessage {
563 role: Role::User,
564 content: vec!["Message 1".into()],
565 cache: true,
566 reasoning_details: None,
567 }]
568 );
569 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
570 "Response to Message 1".into(),
571 ));
572 fake_model.end_last_completion_stream();
573 cx.run_until_parked();
574
575 // Send another user message and verify only the latest is cached
576 thread
577 .update(cx, |thread, cx| {
578 thread.send(UserMessageId::new(), ["Message 2"], cx)
579 })
580 .unwrap();
581 cx.run_until_parked();
582
583 let completion = fake_model.pending_completions().pop().unwrap();
584 assert_eq!(
585 completion.messages[1..],
586 vec![
587 LanguageModelRequestMessage {
588 role: Role::User,
589 content: vec!["Message 1".into()],
590 cache: false,
591 reasoning_details: None,
592 },
593 LanguageModelRequestMessage {
594 role: Role::Assistant,
595 content: vec!["Response to Message 1".into()],
596 cache: false,
597 reasoning_details: None,
598 },
599 LanguageModelRequestMessage {
600 role: Role::User,
601 content: vec!["Message 2".into()],
602 cache: true,
603 reasoning_details: None,
604 }
605 ]
606 );
607 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
608 "Response to Message 2".into(),
609 ));
610 fake_model.end_last_completion_stream();
611 cx.run_until_parked();
612
613 // Simulate a tool call and verify that the latest tool result is cached
614 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
615 thread
616 .update(cx, |thread, cx| {
617 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
618 })
619 .unwrap();
620 cx.run_until_parked();
621
622 let tool_use = LanguageModelToolUse {
623 id: "tool_1".into(),
624 name: EchoTool::NAME.into(),
625 raw_input: json!({"text": "test"}).to_string(),
626 input: json!({"text": "test"}),
627 is_input_complete: true,
628 thought_signature: None,
629 };
630 fake_model
631 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
632 fake_model.end_last_completion_stream();
633 cx.run_until_parked();
634
635 let completion = fake_model.pending_completions().pop().unwrap();
636 let tool_result = LanguageModelToolResult {
637 tool_use_id: "tool_1".into(),
638 tool_name: EchoTool::NAME.into(),
639 is_error: false,
640 content: "test".into(),
641 output: Some("test".into()),
642 };
643 assert_eq!(
644 completion.messages[1..],
645 vec![
646 LanguageModelRequestMessage {
647 role: Role::User,
648 content: vec!["Message 1".into()],
649 cache: false,
650 reasoning_details: None,
651 },
652 LanguageModelRequestMessage {
653 role: Role::Assistant,
654 content: vec!["Response to Message 1".into()],
655 cache: false,
656 reasoning_details: None,
657 },
658 LanguageModelRequestMessage {
659 role: Role::User,
660 content: vec!["Message 2".into()],
661 cache: false,
662 reasoning_details: None,
663 },
664 LanguageModelRequestMessage {
665 role: Role::Assistant,
666 content: vec!["Response to Message 2".into()],
667 cache: false,
668 reasoning_details: None,
669 },
670 LanguageModelRequestMessage {
671 role: Role::User,
672 content: vec!["Use the echo tool".into()],
673 cache: false,
674 reasoning_details: None,
675 },
676 LanguageModelRequestMessage {
677 role: Role::Assistant,
678 content: vec![MessageContent::ToolUse(tool_use)],
679 cache: false,
680 reasoning_details: None,
681 },
682 LanguageModelRequestMessage {
683 role: Role::User,
684 content: vec![MessageContent::ToolResult(tool_result)],
685 cache: true,
686 reasoning_details: None,
687 }
688 ]
689 );
690}
691
692#[gpui::test]
693#[cfg_attr(not(feature = "e2e"), ignore)]
694async fn test_basic_tool_calls(cx: &mut TestAppContext) {
695 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
696
697 // Test a tool call that's likely to complete *before* streaming stops.
698 let events = thread
699 .update(cx, |thread, cx| {
700 thread.add_tool(EchoTool);
701 thread.send(
702 UserMessageId::new(),
703 ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
704 cx,
705 )
706 })
707 .unwrap()
708 .collect()
709 .await;
710 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
711
712 // Test a tool calls that's likely to complete *after* streaming stops.
713 let events = thread
714 .update(cx, |thread, cx| {
715 thread.remove_tool(&EchoTool::NAME);
716 thread.add_tool(DelayTool);
717 thread.send(
718 UserMessageId::new(),
719 [
720 "Now call the delay tool with 200ms.",
721 "When the timer goes off, then you echo the output of the tool.",
722 ],
723 cx,
724 )
725 })
726 .unwrap()
727 .collect()
728 .await;
729 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
730 thread.update(cx, |thread, _cx| {
731 assert!(
732 thread
733 .last_received_or_pending_message()
734 .unwrap()
735 .as_agent_message()
736 .unwrap()
737 .content
738 .iter()
739 .any(|content| {
740 if let AgentMessageContent::Text(text) = content {
741 text.contains("Ding")
742 } else {
743 false
744 }
745 }),
746 "{}",
747 thread.to_markdown()
748 );
749 });
750}
751
752#[gpui::test]
753#[cfg_attr(not(feature = "e2e"), ignore)]
754async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
755 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
756
757 // Test a tool call that's likely to complete *before* streaming stops.
758 let mut events = thread
759 .update(cx, |thread, cx| {
760 thread.add_tool(WordListTool);
761 thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
762 })
763 .unwrap();
764
765 let mut saw_partial_tool_use = false;
766 while let Some(event) = events.next().await {
767 if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
768 thread.update(cx, |thread, _cx| {
769 // Look for a tool use in the thread's last message
770 let message = thread.last_received_or_pending_message().unwrap();
771 let agent_message = message.as_agent_message().unwrap();
772 let last_content = agent_message.content.last().unwrap();
773 if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
774 assert_eq!(last_tool_use.name.as_ref(), "word_list");
775 if tool_call.status == acp::ToolCallStatus::Pending {
776 if !last_tool_use.is_input_complete
777 && last_tool_use.input.get("g").is_none()
778 {
779 saw_partial_tool_use = true;
780 }
781 } else {
782 last_tool_use
783 .input
784 .get("a")
785 .expect("'a' has streamed because input is now complete");
786 last_tool_use
787 .input
788 .get("g")
789 .expect("'g' has streamed because input is now complete");
790 }
791 } else {
792 panic!("last content should be a tool use");
793 }
794 });
795 }
796 }
797
798 assert!(
799 saw_partial_tool_use,
800 "should see at least one partially streamed tool use in the history"
801 );
802}
803
804#[gpui::test]
805async fn test_tool_authorization(cx: &mut TestAppContext) {
806 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
807 let fake_model = model.as_fake();
808
809 let mut events = thread
810 .update(cx, |thread, cx| {
811 thread.add_tool(ToolRequiringPermission);
812 thread.send(UserMessageId::new(), ["abc"], cx)
813 })
814 .unwrap();
815 cx.run_until_parked();
816 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
817 LanguageModelToolUse {
818 id: "tool_id_1".into(),
819 name: ToolRequiringPermission::NAME.into(),
820 raw_input: "{}".into(),
821 input: json!({}),
822 is_input_complete: true,
823 thought_signature: None,
824 },
825 ));
826 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
827 LanguageModelToolUse {
828 id: "tool_id_2".into(),
829 name: ToolRequiringPermission::NAME.into(),
830 raw_input: "{}".into(),
831 input: json!({}),
832 is_input_complete: true,
833 thought_signature: None,
834 },
835 ));
836 fake_model.end_last_completion_stream();
837 let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
838 let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
839
840 // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
841 tool_call_auth_1
842 .response
843 .send(acp_thread::SelectedPermissionOutcome::new(
844 acp::PermissionOptionId::new("allow"),
845 acp::PermissionOptionKind::AllowOnce,
846 ))
847 .unwrap();
848 cx.run_until_parked();
849
850 // Reject the second - send "deny" option_id directly since Deny is now a button
851 tool_call_auth_2
852 .response
853 .send(acp_thread::SelectedPermissionOutcome::new(
854 acp::PermissionOptionId::new("deny"),
855 acp::PermissionOptionKind::RejectOnce,
856 ))
857 .unwrap();
858 cx.run_until_parked();
859
860 let completion = fake_model.pending_completions().pop().unwrap();
861 let message = completion.messages.last().unwrap();
862 assert_eq!(
863 message.content,
864 vec![
865 language_model::MessageContent::ToolResult(LanguageModelToolResult {
866 tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
867 tool_name: ToolRequiringPermission::NAME.into(),
868 is_error: false,
869 content: "Allowed".into(),
870 output: Some("Allowed".into())
871 }),
872 language_model::MessageContent::ToolResult(LanguageModelToolResult {
873 tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
874 tool_name: ToolRequiringPermission::NAME.into(),
875 is_error: true,
876 content: "Permission to run tool denied by user".into(),
877 output: Some("Permission to run tool denied by user".into())
878 })
879 ]
880 );
881
882 // Simulate yet another tool call.
883 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
884 LanguageModelToolUse {
885 id: "tool_id_3".into(),
886 name: ToolRequiringPermission::NAME.into(),
887 raw_input: "{}".into(),
888 input: json!({}),
889 is_input_complete: true,
890 thought_signature: None,
891 },
892 ));
893 fake_model.end_last_completion_stream();
894
895 // Respond by always allowing tools - send transformed option_id
896 // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
897 let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
898 tool_call_auth_3
899 .response
900 .send(acp_thread::SelectedPermissionOutcome::new(
901 acp::PermissionOptionId::new("always_allow:tool_requiring_permission"),
902 acp::PermissionOptionKind::AllowAlways,
903 ))
904 .unwrap();
905 cx.run_until_parked();
906 let completion = fake_model.pending_completions().pop().unwrap();
907 let message = completion.messages.last().unwrap();
908 assert_eq!(
909 message.content,
910 vec![language_model::MessageContent::ToolResult(
911 LanguageModelToolResult {
912 tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
913 tool_name: ToolRequiringPermission::NAME.into(),
914 is_error: false,
915 content: "Allowed".into(),
916 output: Some("Allowed".into())
917 }
918 )]
919 );
920
921 // Simulate a final tool call, ensuring we don't trigger authorization.
922 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
923 LanguageModelToolUse {
924 id: "tool_id_4".into(),
925 name: ToolRequiringPermission::NAME.into(),
926 raw_input: "{}".into(),
927 input: json!({}),
928 is_input_complete: true,
929 thought_signature: None,
930 },
931 ));
932 fake_model.end_last_completion_stream();
933 cx.run_until_parked();
934 let completion = fake_model.pending_completions().pop().unwrap();
935 let message = completion.messages.last().unwrap();
936 assert_eq!(
937 message.content,
938 vec![language_model::MessageContent::ToolResult(
939 LanguageModelToolResult {
940 tool_use_id: "tool_id_4".into(),
941 tool_name: ToolRequiringPermission::NAME.into(),
942 is_error: false,
943 content: "Allowed".into(),
944 output: Some("Allowed".into())
945 }
946 )]
947 );
948}
949
950#[gpui::test]
951async fn test_tool_hallucination(cx: &mut TestAppContext) {
952 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
953 let fake_model = model.as_fake();
954
955 let mut events = thread
956 .update(cx, |thread, cx| {
957 thread.send(UserMessageId::new(), ["abc"], cx)
958 })
959 .unwrap();
960 cx.run_until_parked();
961 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
962 LanguageModelToolUse {
963 id: "tool_id_1".into(),
964 name: "nonexistent_tool".into(),
965 raw_input: "{}".into(),
966 input: json!({}),
967 is_input_complete: true,
968 thought_signature: None,
969 },
970 ));
971 fake_model.end_last_completion_stream();
972
973 let tool_call = expect_tool_call(&mut events).await;
974 assert_eq!(tool_call.title, "nonexistent_tool");
975 assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
976 let update = expect_tool_call_update_fields(&mut events).await;
977 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
978}
979
980async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
981 let event = events
982 .next()
983 .await
984 .expect("no tool call authorization event received")
985 .unwrap();
986 match event {
987 ThreadEvent::ToolCall(tool_call) => tool_call,
988 event => {
989 panic!("Unexpected event {event:?}");
990 }
991 }
992}
993
994async fn expect_tool_call_update_fields(
995 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
996) -> acp::ToolCallUpdate {
997 let event = events
998 .next()
999 .await
1000 .expect("no tool call authorization event received")
1001 .unwrap();
1002 match event {
1003 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
1004 event => {
1005 panic!("Unexpected event {event:?}");
1006 }
1007 }
1008}
1009
1010async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1011 let event = events
1012 .next()
1013 .await
1014 .expect("no plan event received")
1015 .unwrap();
1016 match event {
1017 ThreadEvent::Plan(plan) => plan,
1018 event => {
1019 panic!("Unexpected event {event:?}");
1020 }
1021 }
1022}
1023
1024async fn next_tool_call_authorization(
1025 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1026) -> ToolCallAuthorization {
1027 loop {
1028 let event = events
1029 .next()
1030 .await
1031 .expect("no tool call authorization event received")
1032 .unwrap();
1033 if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1034 let permission_kinds = tool_call_authorization
1035 .options
1036 .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1037 .map(|option| option.kind);
1038 let allow_once = tool_call_authorization
1039 .options
1040 .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1041 .map(|option| option.kind);
1042
1043 assert_eq!(
1044 permission_kinds,
1045 Some(acp::PermissionOptionKind::AllowAlways)
1046 );
1047 assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1048 return tool_call_authorization;
1049 }
1050 }
1051}
1052
1053#[test]
1054fn test_permission_options_terminal_with_pattern() {
1055 let permission_options = ToolPermissionContext::new(
1056 TerminalTool::NAME,
1057 vec!["cargo build --release".to_string()],
1058 )
1059 .build_permission_options();
1060
1061 let PermissionOptions::Dropdown(choices) = permission_options else {
1062 panic!("Expected dropdown permission options");
1063 };
1064
1065 assert_eq!(choices.len(), 3);
1066 let labels: Vec<&str> = choices
1067 .iter()
1068 .map(|choice| choice.allow.name.as_ref())
1069 .collect();
1070 assert!(labels.contains(&"Always for terminal"));
1071 assert!(labels.contains(&"Always for `cargo build` commands"));
1072 assert!(labels.contains(&"Only this time"));
1073}
1074
1075#[test]
1076fn test_permission_options_terminal_command_with_flag_second_token() {
1077 let permission_options =
1078 ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1079 .build_permission_options();
1080
1081 let PermissionOptions::Dropdown(choices) = permission_options else {
1082 panic!("Expected dropdown permission options");
1083 };
1084
1085 assert_eq!(choices.len(), 3);
1086 let labels: Vec<&str> = choices
1087 .iter()
1088 .map(|choice| choice.allow.name.as_ref())
1089 .collect();
1090 assert!(labels.contains(&"Always for terminal"));
1091 assert!(labels.contains(&"Always for `ls` commands"));
1092 assert!(labels.contains(&"Only this time"));
1093}
1094
1095#[test]
1096fn test_permission_options_terminal_single_word_command() {
1097 let permission_options =
1098 ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1099 .build_permission_options();
1100
1101 let PermissionOptions::Dropdown(choices) = permission_options else {
1102 panic!("Expected dropdown permission options");
1103 };
1104
1105 assert_eq!(choices.len(), 3);
1106 let labels: Vec<&str> = choices
1107 .iter()
1108 .map(|choice| choice.allow.name.as_ref())
1109 .collect();
1110 assert!(labels.contains(&"Always for terminal"));
1111 assert!(labels.contains(&"Always for `whoami` commands"));
1112 assert!(labels.contains(&"Only this time"));
1113}
1114
1115#[test]
1116fn test_permission_options_edit_file_with_path_pattern() {
1117 let permission_options =
1118 ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1119 .build_permission_options();
1120
1121 let PermissionOptions::Dropdown(choices) = permission_options else {
1122 panic!("Expected dropdown permission options");
1123 };
1124
1125 let labels: Vec<&str> = choices
1126 .iter()
1127 .map(|choice| choice.allow.name.as_ref())
1128 .collect();
1129 assert!(labels.contains(&"Always for edit file"));
1130 assert!(labels.contains(&"Always for `src/`"));
1131}
1132
1133#[test]
1134fn test_permission_options_fetch_with_domain_pattern() {
1135 let permission_options =
1136 ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1137 .build_permission_options();
1138
1139 let PermissionOptions::Dropdown(choices) = permission_options else {
1140 panic!("Expected dropdown permission options");
1141 };
1142
1143 let labels: Vec<&str> = choices
1144 .iter()
1145 .map(|choice| choice.allow.name.as_ref())
1146 .collect();
1147 assert!(labels.contains(&"Always for fetch"));
1148 assert!(labels.contains(&"Always for `docs.rs`"));
1149}
1150
1151#[test]
1152fn test_permission_options_without_pattern() {
1153 let permission_options = ToolPermissionContext::new(
1154 TerminalTool::NAME,
1155 vec!["./deploy.sh --production".to_string()],
1156 )
1157 .build_permission_options();
1158
1159 let PermissionOptions::Dropdown(choices) = permission_options else {
1160 panic!("Expected dropdown permission options");
1161 };
1162
1163 assert_eq!(choices.len(), 2);
1164 let labels: Vec<&str> = choices
1165 .iter()
1166 .map(|choice| choice.allow.name.as_ref())
1167 .collect();
1168 assert!(labels.contains(&"Always for terminal"));
1169 assert!(labels.contains(&"Only this time"));
1170 assert!(!labels.iter().any(|label| label.contains("commands")));
1171}
1172
1173#[test]
1174fn test_permission_options_symlink_target_are_flat_once_only() {
1175 let permission_options =
1176 ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1177 .build_permission_options();
1178
1179 let PermissionOptions::Flat(options) = permission_options else {
1180 panic!("Expected flat permission options for symlink target authorization");
1181 };
1182
1183 assert_eq!(options.len(), 2);
1184 assert!(options.iter().any(|option| {
1185 option.option_id.0.as_ref() == "allow"
1186 && option.kind == acp::PermissionOptionKind::AllowOnce
1187 }));
1188 assert!(options.iter().any(|option| {
1189 option.option_id.0.as_ref() == "deny"
1190 && option.kind == acp::PermissionOptionKind::RejectOnce
1191 }));
1192}
1193
1194#[test]
1195fn test_permission_option_ids_for_terminal() {
1196 let permission_options = ToolPermissionContext::new(
1197 TerminalTool::NAME,
1198 vec!["cargo build --release".to_string()],
1199 )
1200 .build_permission_options();
1201
1202 let PermissionOptions::Dropdown(choices) = permission_options else {
1203 panic!("Expected dropdown permission options");
1204 };
1205
1206 // Expect 3 choices: always-tool, always-pattern, once
1207 assert_eq!(choices.len(), 3);
1208
1209 // First two choices both use the tool-level option IDs
1210 assert_eq!(
1211 choices[0].allow.option_id.0.as_ref(),
1212 "always_allow:terminal"
1213 );
1214 assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1215 assert!(choices[0].sub_patterns.is_empty());
1216
1217 assert_eq!(
1218 choices[1].allow.option_id.0.as_ref(),
1219 "always_allow:terminal"
1220 );
1221 assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1222 assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1223
1224 // Third choice is the one-time allow/deny
1225 assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1226 assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1227 assert!(choices[2].sub_patterns.is_empty());
1228}
1229
1230#[test]
1231fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1232 let permission_options = ToolPermissionContext::new(
1233 TerminalTool::NAME,
1234 vec!["cargo test 2>&1 | tail".to_string()],
1235 )
1236 .build_permission_options();
1237
1238 let PermissionOptions::DropdownWithPatterns {
1239 choices,
1240 patterns,
1241 tool_name,
1242 } = permission_options
1243 else {
1244 panic!("Expected DropdownWithPatterns permission options for pipeline command");
1245 };
1246
1247 assert_eq!(tool_name, TerminalTool::NAME);
1248
1249 // Should have "Always for terminal" and "Only this time" choices
1250 assert_eq!(choices.len(), 2);
1251 let labels: Vec<&str> = choices
1252 .iter()
1253 .map(|choice| choice.allow.name.as_ref())
1254 .collect();
1255 assert!(labels.contains(&"Always for terminal"));
1256 assert!(labels.contains(&"Only this time"));
1257
1258 // Should have per-command patterns for "cargo test" and "tail"
1259 assert_eq!(patterns.len(), 2);
1260 let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1261 assert!(pattern_names.contains(&"cargo test"));
1262 assert!(pattern_names.contains(&"tail"));
1263
1264 // Verify patterns are valid regex patterns
1265 let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1266 assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1267 assert!(regex_patterns.contains(&"^tail\\b"));
1268}
1269
1270#[test]
1271fn test_permission_options_terminal_pipeline_with_chaining() {
1272 let permission_options = ToolPermissionContext::new(
1273 TerminalTool::NAME,
1274 vec!["npm install && npm test | tail".to_string()],
1275 )
1276 .build_permission_options();
1277
1278 let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1279 panic!("Expected DropdownWithPatterns for chained pipeline command");
1280 };
1281
1282 // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1283 assert_eq!(patterns.len(), 3);
1284 let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1285 assert!(pattern_names.contains(&"npm install"));
1286 assert!(pattern_names.contains(&"npm test"));
1287 assert!(pattern_names.contains(&"tail"));
1288}
1289
1290#[gpui::test]
1291#[cfg_attr(not(feature = "e2e"), ignore)]
1292async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1293 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1294
1295 // Test concurrent tool calls with different delay times
1296 let events = thread
1297 .update(cx, |thread, cx| {
1298 thread.add_tool(DelayTool);
1299 thread.send(
1300 UserMessageId::new(),
1301 [
1302 "Call the delay tool twice in the same message.",
1303 "Once with 100ms. Once with 300ms.",
1304 "When both timers are complete, describe the outputs.",
1305 ],
1306 cx,
1307 )
1308 })
1309 .unwrap()
1310 .collect()
1311 .await;
1312
1313 let stop_reasons = stop_events(events);
1314 assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1315
1316 thread.update(cx, |thread, _cx| {
1317 let last_message = thread.last_received_or_pending_message().unwrap();
1318 let agent_message = last_message.as_agent_message().unwrap();
1319 let text = agent_message
1320 .content
1321 .iter()
1322 .filter_map(|content| {
1323 if let AgentMessageContent::Text(text) = content {
1324 Some(text.as_str())
1325 } else {
1326 None
1327 }
1328 })
1329 .collect::<String>();
1330
1331 assert!(text.contains("Ding"));
1332 });
1333}
1334
1335#[gpui::test]
1336async fn test_profiles(cx: &mut TestAppContext) {
1337 let ThreadTest {
1338 model, thread, fs, ..
1339 } = setup(cx, TestModel::Fake).await;
1340 let fake_model = model.as_fake();
1341
1342 thread.update(cx, |thread, _cx| {
1343 thread.add_tool(DelayTool);
1344 thread.add_tool(EchoTool);
1345 thread.add_tool(InfiniteTool);
1346 });
1347
1348 // Override profiles and wait for settings to be loaded.
1349 fs.insert_file(
1350 paths::settings_file(),
1351 json!({
1352 "agent": {
1353 "profiles": {
1354 "test-1": {
1355 "name": "Test Profile 1",
1356 "tools": {
1357 EchoTool::NAME: true,
1358 DelayTool::NAME: true,
1359 }
1360 },
1361 "test-2": {
1362 "name": "Test Profile 2",
1363 "tools": {
1364 InfiniteTool::NAME: true,
1365 }
1366 }
1367 }
1368 }
1369 })
1370 .to_string()
1371 .into_bytes(),
1372 )
1373 .await;
1374 cx.run_until_parked();
1375
1376 // Test that test-1 profile (default) has echo and delay tools
1377 thread
1378 .update(cx, |thread, cx| {
1379 thread.set_profile(AgentProfileId("test-1".into()), cx);
1380 thread.send(UserMessageId::new(), ["test"], cx)
1381 })
1382 .unwrap();
1383 cx.run_until_parked();
1384
1385 let mut pending_completions = fake_model.pending_completions();
1386 assert_eq!(pending_completions.len(), 1);
1387 let completion = pending_completions.pop().unwrap();
1388 let tool_names: Vec<String> = completion
1389 .tools
1390 .iter()
1391 .map(|tool| tool.name.clone())
1392 .collect();
1393 assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1394 fake_model.end_last_completion_stream();
1395
1396 // Switch to test-2 profile, and verify that it has only the infinite tool.
1397 thread
1398 .update(cx, |thread, cx| {
1399 thread.set_profile(AgentProfileId("test-2".into()), cx);
1400 thread.send(UserMessageId::new(), ["test2"], cx)
1401 })
1402 .unwrap();
1403 cx.run_until_parked();
1404 let mut pending_completions = fake_model.pending_completions();
1405 assert_eq!(pending_completions.len(), 1);
1406 let completion = pending_completions.pop().unwrap();
1407 let tool_names: Vec<String> = completion
1408 .tools
1409 .iter()
1410 .map(|tool| tool.name.clone())
1411 .collect();
1412 assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1413}
1414
1415#[gpui::test]
1416async fn test_mcp_tools(cx: &mut TestAppContext) {
1417 let ThreadTest {
1418 model,
1419 thread,
1420 context_server_store,
1421 fs,
1422 ..
1423 } = setup(cx, TestModel::Fake).await;
1424 let fake_model = model.as_fake();
1425
1426 // Override profiles and wait for settings to be loaded.
1427 fs.insert_file(
1428 paths::settings_file(),
1429 json!({
1430 "agent": {
1431 "tool_permissions": { "default": "allow" },
1432 "profiles": {
1433 "test": {
1434 "name": "Test Profile",
1435 "enable_all_context_servers": true,
1436 "tools": {
1437 EchoTool::NAME: true,
1438 }
1439 },
1440 }
1441 }
1442 })
1443 .to_string()
1444 .into_bytes(),
1445 )
1446 .await;
1447 cx.run_until_parked();
1448 thread.update(cx, |thread, cx| {
1449 thread.set_profile(AgentProfileId("test".into()), cx)
1450 });
1451
1452 let mut mcp_tool_calls = setup_context_server(
1453 "test_server",
1454 vec![context_server::types::Tool {
1455 name: "echo".into(),
1456 description: None,
1457 input_schema: serde_json::to_value(EchoTool::input_schema(
1458 LanguageModelToolSchemaFormat::JsonSchema,
1459 ))
1460 .unwrap(),
1461 output_schema: None,
1462 annotations: None,
1463 }],
1464 &context_server_store,
1465 cx,
1466 );
1467
1468 let events = thread.update(cx, |thread, cx| {
1469 thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1470 });
1471 cx.run_until_parked();
1472
1473 // Simulate the model calling the MCP tool.
1474 let completion = fake_model.pending_completions().pop().unwrap();
1475 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1476 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1477 LanguageModelToolUse {
1478 id: "tool_1".into(),
1479 name: "echo".into(),
1480 raw_input: json!({"text": "test"}).to_string(),
1481 input: json!({"text": "test"}),
1482 is_input_complete: true,
1483 thought_signature: None,
1484 },
1485 ));
1486 fake_model.end_last_completion_stream();
1487 cx.run_until_parked();
1488
1489 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1490 assert_eq!(tool_call_params.name, "echo");
1491 assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1492 tool_call_response
1493 .send(context_server::types::CallToolResponse {
1494 content: vec![context_server::types::ToolResponseContent::Text {
1495 text: "test".into(),
1496 }],
1497 is_error: None,
1498 meta: None,
1499 structured_content: None,
1500 })
1501 .unwrap();
1502 cx.run_until_parked();
1503
1504 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1505 fake_model.send_last_completion_stream_text_chunk("Done!");
1506 fake_model.end_last_completion_stream();
1507 events.collect::<Vec<_>>().await;
1508
1509 // Send again after adding the echo tool, ensuring the name collision is resolved.
1510 let events = thread.update(cx, |thread, cx| {
1511 thread.add_tool(EchoTool);
1512 thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1513 });
1514 cx.run_until_parked();
1515 let completion = fake_model.pending_completions().pop().unwrap();
1516 assert_eq!(
1517 tool_names_for_completion(&completion),
1518 vec!["echo", "test_server_echo"]
1519 );
1520 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1521 LanguageModelToolUse {
1522 id: "tool_2".into(),
1523 name: "test_server_echo".into(),
1524 raw_input: json!({"text": "mcp"}).to_string(),
1525 input: json!({"text": "mcp"}),
1526 is_input_complete: true,
1527 thought_signature: None,
1528 },
1529 ));
1530 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1531 LanguageModelToolUse {
1532 id: "tool_3".into(),
1533 name: "echo".into(),
1534 raw_input: json!({"text": "native"}).to_string(),
1535 input: json!({"text": "native"}),
1536 is_input_complete: true,
1537 thought_signature: None,
1538 },
1539 ));
1540 fake_model.end_last_completion_stream();
1541 cx.run_until_parked();
1542
1543 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1544 assert_eq!(tool_call_params.name, "echo");
1545 assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1546 tool_call_response
1547 .send(context_server::types::CallToolResponse {
1548 content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1549 is_error: None,
1550 meta: None,
1551 structured_content: None,
1552 })
1553 .unwrap();
1554 cx.run_until_parked();
1555
1556 // Ensure the tool results were inserted with the correct names.
1557 let completion = fake_model.pending_completions().pop().unwrap();
1558 assert_eq!(
1559 completion.messages.last().unwrap().content,
1560 vec![
1561 MessageContent::ToolResult(LanguageModelToolResult {
1562 tool_use_id: "tool_3".into(),
1563 tool_name: "echo".into(),
1564 is_error: false,
1565 content: "native".into(),
1566 output: Some("native".into()),
1567 },),
1568 MessageContent::ToolResult(LanguageModelToolResult {
1569 tool_use_id: "tool_2".into(),
1570 tool_name: "test_server_echo".into(),
1571 is_error: false,
1572 content: "mcp".into(),
1573 output: Some("mcp".into()),
1574 },),
1575 ]
1576 );
1577 fake_model.end_last_completion_stream();
1578 events.collect::<Vec<_>>().await;
1579}
1580
1581#[gpui::test]
1582async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1583 let ThreadTest {
1584 model,
1585 thread,
1586 context_server_store,
1587 fs,
1588 ..
1589 } = setup(cx, TestModel::Fake).await;
1590 let fake_model = model.as_fake();
1591
1592 // Setup settings to allow MCP tools
1593 fs.insert_file(
1594 paths::settings_file(),
1595 json!({
1596 "agent": {
1597 "always_allow_tool_actions": true,
1598 "profiles": {
1599 "test": {
1600 "name": "Test Profile",
1601 "enable_all_context_servers": true,
1602 "tools": {}
1603 },
1604 }
1605 }
1606 })
1607 .to_string()
1608 .into_bytes(),
1609 )
1610 .await;
1611 cx.run_until_parked();
1612 thread.update(cx, |thread, cx| {
1613 thread.set_profile(AgentProfileId("test".into()), cx)
1614 });
1615
1616 // Setup a context server with a tool
1617 let mut mcp_tool_calls = setup_context_server(
1618 "github_server",
1619 vec![context_server::types::Tool {
1620 name: "issue_read".into(),
1621 description: Some("Read a GitHub issue".into()),
1622 input_schema: json!({
1623 "type": "object",
1624 "properties": {
1625 "issue_url": { "type": "string" }
1626 }
1627 }),
1628 output_schema: None,
1629 annotations: None,
1630 }],
1631 &context_server_store,
1632 cx,
1633 );
1634
1635 // Send a message and have the model call the MCP tool
1636 let events = thread.update(cx, |thread, cx| {
1637 thread
1638 .send(UserMessageId::new(), ["Read issue #47404"], cx)
1639 .unwrap()
1640 });
1641 cx.run_until_parked();
1642
1643 // Verify the MCP tool is available to the model
1644 let completion = fake_model.pending_completions().pop().unwrap();
1645 assert_eq!(
1646 tool_names_for_completion(&completion),
1647 vec!["issue_read"],
1648 "MCP tool should be available"
1649 );
1650
1651 // Simulate the model calling the MCP tool
1652 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1653 LanguageModelToolUse {
1654 id: "tool_1".into(),
1655 name: "issue_read".into(),
1656 raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1657 .to_string(),
1658 input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1659 is_input_complete: true,
1660 thought_signature: None,
1661 },
1662 ));
1663 fake_model.end_last_completion_stream();
1664 cx.run_until_parked();
1665
1666 // The MCP server receives the tool call and responds with content
1667 let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1668 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1669 assert_eq!(tool_call_params.name, "issue_read");
1670 tool_call_response
1671 .send(context_server::types::CallToolResponse {
1672 content: vec![context_server::types::ToolResponseContent::Text {
1673 text: expected_tool_output.into(),
1674 }],
1675 is_error: None,
1676 meta: None,
1677 structured_content: None,
1678 })
1679 .unwrap();
1680 cx.run_until_parked();
1681
1682 // After tool completes, the model continues with a new completion request
1683 // that includes the tool results. We need to respond to this.
1684 let _completion = fake_model.pending_completions().pop().unwrap();
1685 fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1686 fake_model
1687 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1688 fake_model.end_last_completion_stream();
1689 events.collect::<Vec<_>>().await;
1690
1691 // Verify the tool result is stored in the thread by checking the markdown output.
1692 // The tool result is in the first assistant message (not the last one, which is
1693 // the model's response after the tool completed).
1694 thread.update(cx, |thread, _cx| {
1695 let markdown = thread.to_markdown();
1696 assert!(
1697 markdown.contains("**Tool Result**: issue_read"),
1698 "Thread should contain tool result header"
1699 );
1700 assert!(
1701 markdown.contains(expected_tool_output),
1702 "Thread should contain tool output: {}",
1703 expected_tool_output
1704 );
1705 });
1706
1707 // Simulate app restart: disconnect the MCP server.
1708 // After restart, the MCP server won't be connected yet when the thread is replayed.
1709 context_server_store.update(cx, |store, cx| {
1710 let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1711 });
1712 cx.run_until_parked();
1713
1714 // Replay the thread (this is what happens when loading a saved thread)
1715 let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1716
1717 let mut found_tool_call = None;
1718 let mut found_tool_call_update_with_output = None;
1719
1720 while let Some(event) = replay_events.next().await {
1721 let event = event.unwrap();
1722 match &event {
1723 ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1724 found_tool_call = Some(tc.clone());
1725 }
1726 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1727 if update.tool_call_id.to_string() == "tool_1" =>
1728 {
1729 if update.fields.raw_output.is_some() {
1730 found_tool_call_update_with_output = Some(update.clone());
1731 }
1732 }
1733 _ => {}
1734 }
1735 }
1736
1737 // The tool call should be found
1738 assert!(
1739 found_tool_call.is_some(),
1740 "Tool call should be emitted during replay"
1741 );
1742
1743 assert!(
1744 found_tool_call_update_with_output.is_some(),
1745 "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1746 );
1747
1748 let update = found_tool_call_update_with_output.unwrap();
1749 assert_eq!(
1750 update.fields.raw_output,
1751 Some(expected_tool_output.into()),
1752 "raw_output should contain the saved tool result"
1753 );
1754
1755 // Also verify the status is correct (completed, not failed)
1756 assert_eq!(
1757 update.fields.status,
1758 Some(acp::ToolCallStatus::Completed),
1759 "Tool call status should reflect the original completion status"
1760 );
1761}
1762
1763#[gpui::test]
1764async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1765 let ThreadTest {
1766 model,
1767 thread,
1768 context_server_store,
1769 fs,
1770 ..
1771 } = setup(cx, TestModel::Fake).await;
1772 let fake_model = model.as_fake();
1773
1774 // Set up a profile with all tools enabled
1775 fs.insert_file(
1776 paths::settings_file(),
1777 json!({
1778 "agent": {
1779 "profiles": {
1780 "test": {
1781 "name": "Test Profile",
1782 "enable_all_context_servers": true,
1783 "tools": {
1784 EchoTool::NAME: true,
1785 DelayTool::NAME: true,
1786 WordListTool::NAME: true,
1787 ToolRequiringPermission::NAME: true,
1788 InfiniteTool::NAME: true,
1789 }
1790 },
1791 }
1792 }
1793 })
1794 .to_string()
1795 .into_bytes(),
1796 )
1797 .await;
1798 cx.run_until_parked();
1799
1800 thread.update(cx, |thread, cx| {
1801 thread.set_profile(AgentProfileId("test".into()), cx);
1802 thread.add_tool(EchoTool);
1803 thread.add_tool(DelayTool);
1804 thread.add_tool(WordListTool);
1805 thread.add_tool(ToolRequiringPermission);
1806 thread.add_tool(InfiniteTool);
1807 });
1808
1809 // Set up multiple context servers with some overlapping tool names
1810 let _server1_calls = setup_context_server(
1811 "xxx",
1812 vec![
1813 context_server::types::Tool {
1814 name: "echo".into(), // Conflicts with native EchoTool
1815 description: None,
1816 input_schema: serde_json::to_value(EchoTool::input_schema(
1817 LanguageModelToolSchemaFormat::JsonSchema,
1818 ))
1819 .unwrap(),
1820 output_schema: None,
1821 annotations: None,
1822 },
1823 context_server::types::Tool {
1824 name: "unique_tool_1".into(),
1825 description: None,
1826 input_schema: json!({"type": "object", "properties": {}}),
1827 output_schema: None,
1828 annotations: None,
1829 },
1830 ],
1831 &context_server_store,
1832 cx,
1833 );
1834
1835 let _server2_calls = setup_context_server(
1836 "yyy",
1837 vec![
1838 context_server::types::Tool {
1839 name: "echo".into(), // Also conflicts with native EchoTool
1840 description: None,
1841 input_schema: serde_json::to_value(EchoTool::input_schema(
1842 LanguageModelToolSchemaFormat::JsonSchema,
1843 ))
1844 .unwrap(),
1845 output_schema: None,
1846 annotations: None,
1847 },
1848 context_server::types::Tool {
1849 name: "unique_tool_2".into(),
1850 description: None,
1851 input_schema: json!({"type": "object", "properties": {}}),
1852 output_schema: None,
1853 annotations: None,
1854 },
1855 context_server::types::Tool {
1856 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1857 description: None,
1858 input_schema: json!({"type": "object", "properties": {}}),
1859 output_schema: None,
1860 annotations: None,
1861 },
1862 context_server::types::Tool {
1863 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1864 description: None,
1865 input_schema: json!({"type": "object", "properties": {}}),
1866 output_schema: None,
1867 annotations: None,
1868 },
1869 ],
1870 &context_server_store,
1871 cx,
1872 );
1873 let _server3_calls = setup_context_server(
1874 "zzz",
1875 vec![
1876 context_server::types::Tool {
1877 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1878 description: None,
1879 input_schema: json!({"type": "object", "properties": {}}),
1880 output_schema: None,
1881 annotations: None,
1882 },
1883 context_server::types::Tool {
1884 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1885 description: None,
1886 input_schema: json!({"type": "object", "properties": {}}),
1887 output_schema: None,
1888 annotations: None,
1889 },
1890 context_server::types::Tool {
1891 name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1892 description: None,
1893 input_schema: json!({"type": "object", "properties": {}}),
1894 output_schema: None,
1895 annotations: None,
1896 },
1897 ],
1898 &context_server_store,
1899 cx,
1900 );
1901
1902 // Server with spaces in name - tests snake_case conversion for API compatibility
1903 let _server4_calls = setup_context_server(
1904 "Azure DevOps",
1905 vec![context_server::types::Tool {
1906 name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1907 description: None,
1908 input_schema: serde_json::to_value(EchoTool::input_schema(
1909 LanguageModelToolSchemaFormat::JsonSchema,
1910 ))
1911 .unwrap(),
1912 output_schema: None,
1913 annotations: None,
1914 }],
1915 &context_server_store,
1916 cx,
1917 );
1918
1919 thread
1920 .update(cx, |thread, cx| {
1921 thread.send(UserMessageId::new(), ["Go"], cx)
1922 })
1923 .unwrap();
1924 cx.run_until_parked();
1925 let completion = fake_model.pending_completions().pop().unwrap();
1926 assert_eq!(
1927 tool_names_for_completion(&completion),
1928 vec![
1929 "azure_dev_ops_echo",
1930 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1931 "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1932 "delay",
1933 "echo",
1934 "infinite",
1935 "tool_requiring_permission",
1936 "unique_tool_1",
1937 "unique_tool_2",
1938 "word_list",
1939 "xxx_echo",
1940 "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1941 "yyy_echo",
1942 "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1943 ]
1944 );
1945}
1946
1947#[gpui::test]
1948#[cfg_attr(not(feature = "e2e"), ignore)]
1949async fn test_cancellation(cx: &mut TestAppContext) {
1950 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1951
1952 let mut events = thread
1953 .update(cx, |thread, cx| {
1954 thread.add_tool(InfiniteTool);
1955 thread.add_tool(EchoTool);
1956 thread.send(
1957 UserMessageId::new(),
1958 ["Call the echo tool, then call the infinite tool, then explain their output"],
1959 cx,
1960 )
1961 })
1962 .unwrap();
1963
1964 // Wait until both tools are called.
1965 let mut expected_tools = vec!["Echo", "Infinite Tool"];
1966 let mut echo_id = None;
1967 let mut echo_completed = false;
1968 while let Some(event) = events.next().await {
1969 match event.unwrap() {
1970 ThreadEvent::ToolCall(tool_call) => {
1971 assert_eq!(tool_call.title, expected_tools.remove(0));
1972 if tool_call.title == "Echo" {
1973 echo_id = Some(tool_call.tool_call_id);
1974 }
1975 }
1976 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1977 acp::ToolCallUpdate {
1978 tool_call_id,
1979 fields:
1980 acp::ToolCallUpdateFields {
1981 status: Some(acp::ToolCallStatus::Completed),
1982 ..
1983 },
1984 ..
1985 },
1986 )) if Some(&tool_call_id) == echo_id.as_ref() => {
1987 echo_completed = true;
1988 }
1989 _ => {}
1990 }
1991
1992 if expected_tools.is_empty() && echo_completed {
1993 break;
1994 }
1995 }
1996
1997 // Cancel the current send and ensure that the event stream is closed, even
1998 // if one of the tools is still running.
1999 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2000 let events = events.collect::<Vec<_>>().await;
2001 let last_event = events.last();
2002 assert!(
2003 matches!(
2004 last_event,
2005 Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
2006 ),
2007 "unexpected event {last_event:?}"
2008 );
2009
2010 // Ensure we can still send a new message after cancellation.
2011 let events = thread
2012 .update(cx, |thread, cx| {
2013 thread.send(
2014 UserMessageId::new(),
2015 ["Testing: reply with 'Hello' then stop."],
2016 cx,
2017 )
2018 })
2019 .unwrap()
2020 .collect::<Vec<_>>()
2021 .await;
2022 thread.update(cx, |thread, _cx| {
2023 let message = thread.last_received_or_pending_message().unwrap();
2024 let agent_message = message.as_agent_message().unwrap();
2025 assert_eq!(
2026 agent_message.content,
2027 vec![AgentMessageContent::Text("Hello".to_string())]
2028 );
2029 });
2030 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2031}
2032
2033#[gpui::test]
2034async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2035 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2036 always_allow_tools(cx);
2037 let fake_model = model.as_fake();
2038
2039 let environment = Rc::new(cx.update(|cx| {
2040 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2041 }));
2042 let handle = environment.terminal_handle.clone().unwrap();
2043
2044 let mut events = thread
2045 .update(cx, |thread, cx| {
2046 thread.add_tool(crate::TerminalTool::new(
2047 thread.project().clone(),
2048 environment,
2049 ));
2050 thread.send(UserMessageId::new(), ["run a command"], cx)
2051 })
2052 .unwrap();
2053
2054 cx.run_until_parked();
2055
2056 // Simulate the model calling the terminal tool
2057 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2058 LanguageModelToolUse {
2059 id: "terminal_tool_1".into(),
2060 name: TerminalTool::NAME.into(),
2061 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2062 input: json!({"command": "sleep 1000", "cd": "."}),
2063 is_input_complete: true,
2064 thought_signature: None,
2065 },
2066 ));
2067 fake_model.end_last_completion_stream();
2068
2069 // Wait for the terminal tool to start running
2070 wait_for_terminal_tool_started(&mut events, cx).await;
2071
2072 // Cancel the thread while the terminal is running
2073 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2074
2075 // Collect remaining events, driving the executor to let cancellation complete
2076 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2077
2078 // Verify the terminal was killed
2079 assert!(
2080 handle.was_killed(),
2081 "expected terminal handle to be killed on cancellation"
2082 );
2083
2084 // Verify we got a cancellation stop event
2085 assert_eq!(
2086 stop_events(remaining_events),
2087 vec![acp::StopReason::Cancelled],
2088 );
2089
2090 // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2091 thread.update(cx, |thread, _cx| {
2092 let message = thread.last_received_or_pending_message().unwrap();
2093 let agent_message = message.as_agent_message().unwrap();
2094
2095 let tool_use = agent_message
2096 .content
2097 .iter()
2098 .find_map(|content| match content {
2099 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2100 _ => None,
2101 })
2102 .expect("expected tool use in agent message");
2103
2104 let tool_result = agent_message
2105 .tool_results
2106 .get(&tool_use.id)
2107 .expect("expected tool result");
2108
2109 let result_text = match &tool_result.content {
2110 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2111 _ => panic!("expected text content in tool result"),
2112 };
2113
2114 // "partial output" comes from FakeTerminalHandle's output field
2115 assert!(
2116 result_text.contains("partial output"),
2117 "expected tool result to contain terminal output, got: {result_text}"
2118 );
2119 // Match the actual format from process_content in terminal_tool.rs
2120 assert!(
2121 result_text.contains("The user stopped this command"),
2122 "expected tool result to indicate user stopped, got: {result_text}"
2123 );
2124 });
2125
2126 // Verify we can send a new message after cancellation
2127 verify_thread_recovery(&thread, &fake_model, cx).await;
2128}
2129
2130#[gpui::test]
2131async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2132 // This test verifies that tools which properly handle cancellation via
2133 // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2134 // to cancellation and report that they were cancelled.
2135 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2136 always_allow_tools(cx);
2137 let fake_model = model.as_fake();
2138
2139 let (tool, was_cancelled) = CancellationAwareTool::new();
2140
2141 let mut events = thread
2142 .update(cx, |thread, cx| {
2143 thread.add_tool(tool);
2144 thread.send(
2145 UserMessageId::new(),
2146 ["call the cancellation aware tool"],
2147 cx,
2148 )
2149 })
2150 .unwrap();
2151
2152 cx.run_until_parked();
2153
2154 // Simulate the model calling the cancellation-aware tool
2155 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2156 LanguageModelToolUse {
2157 id: "cancellation_aware_1".into(),
2158 name: "cancellation_aware".into(),
2159 raw_input: r#"{}"#.into(),
2160 input: json!({}),
2161 is_input_complete: true,
2162 thought_signature: None,
2163 },
2164 ));
2165 fake_model.end_last_completion_stream();
2166
2167 cx.run_until_parked();
2168
2169 // Wait for the tool call to be reported
2170 let mut tool_started = false;
2171 let deadline = cx.executor().num_cpus() * 100;
2172 for _ in 0..deadline {
2173 cx.run_until_parked();
2174
2175 while let Some(Some(event)) = events.next().now_or_never() {
2176 if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2177 if tool_call.title == "Cancellation Aware Tool" {
2178 tool_started = true;
2179 break;
2180 }
2181 }
2182 }
2183
2184 if tool_started {
2185 break;
2186 }
2187
2188 cx.background_executor
2189 .timer(Duration::from_millis(10))
2190 .await;
2191 }
2192 assert!(tool_started, "expected cancellation aware tool to start");
2193
2194 // Cancel the thread and wait for it to complete
2195 let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2196
2197 // The cancel task should complete promptly because the tool handles cancellation
2198 let timeout = cx.background_executor.timer(Duration::from_secs(5));
2199 futures::select! {
2200 _ = cancel_task.fuse() => {}
2201 _ = timeout.fuse() => {
2202 panic!("cancel task timed out - tool did not respond to cancellation");
2203 }
2204 }
2205
2206 // Verify the tool detected cancellation via its flag
2207 assert!(
2208 was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2209 "tool should have detected cancellation via event_stream.cancelled_by_user()"
2210 );
2211
2212 // Collect remaining events
2213 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2214
2215 // Verify we got a cancellation stop event
2216 assert_eq!(
2217 stop_events(remaining_events),
2218 vec![acp::StopReason::Cancelled],
2219 );
2220
2221 // Verify we can send a new message after cancellation
2222 verify_thread_recovery(&thread, &fake_model, cx).await;
2223}
2224
2225/// Helper to verify thread can recover after cancellation by sending a simple message.
2226async fn verify_thread_recovery(
2227 thread: &Entity<Thread>,
2228 fake_model: &FakeLanguageModel,
2229 cx: &mut TestAppContext,
2230) {
2231 let events = thread
2232 .update(cx, |thread, cx| {
2233 thread.send(
2234 UserMessageId::new(),
2235 ["Testing: reply with 'Hello' then stop."],
2236 cx,
2237 )
2238 })
2239 .unwrap();
2240 cx.run_until_parked();
2241 fake_model.send_last_completion_stream_text_chunk("Hello");
2242 fake_model
2243 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2244 fake_model.end_last_completion_stream();
2245
2246 let events = events.collect::<Vec<_>>().await;
2247 thread.update(cx, |thread, _cx| {
2248 let message = thread.last_received_or_pending_message().unwrap();
2249 let agent_message = message.as_agent_message().unwrap();
2250 assert_eq!(
2251 agent_message.content,
2252 vec![AgentMessageContent::Text("Hello".to_string())]
2253 );
2254 });
2255 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2256}
2257
2258/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2259async fn wait_for_terminal_tool_started(
2260 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2261 cx: &mut TestAppContext,
2262) {
2263 let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2264 for _ in 0..deadline {
2265 cx.run_until_parked();
2266
2267 while let Some(Some(event)) = events.next().now_or_never() {
2268 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2269 update,
2270 ))) = &event
2271 {
2272 if update.fields.content.as_ref().is_some_and(|content| {
2273 content
2274 .iter()
2275 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2276 }) {
2277 return;
2278 }
2279 }
2280 }
2281
2282 cx.background_executor
2283 .timer(Duration::from_millis(10))
2284 .await;
2285 }
2286 panic!("terminal tool did not start within the expected time");
2287}
2288
2289/// Collects events until a Stop event is received, driving the executor to completion.
2290async fn collect_events_until_stop(
2291 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2292 cx: &mut TestAppContext,
2293) -> Vec<Result<ThreadEvent>> {
2294 let mut collected = Vec::new();
2295 let deadline = cx.executor().num_cpus() * 200;
2296
2297 for _ in 0..deadline {
2298 cx.executor().advance_clock(Duration::from_millis(10));
2299 cx.run_until_parked();
2300
2301 while let Some(Some(event)) = events.next().now_or_never() {
2302 let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2303 collected.push(event);
2304 if is_stop {
2305 return collected;
2306 }
2307 }
2308 }
2309 panic!(
2310 "did not receive Stop event within the expected time; collected {} events",
2311 collected.len()
2312 );
2313}
2314
2315#[gpui::test]
2316async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2317 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2318 always_allow_tools(cx);
2319 let fake_model = model.as_fake();
2320
2321 let environment = Rc::new(cx.update(|cx| {
2322 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2323 }));
2324 let handle = environment.terminal_handle.clone().unwrap();
2325
2326 let message_id = UserMessageId::new();
2327 let mut events = thread
2328 .update(cx, |thread, cx| {
2329 thread.add_tool(crate::TerminalTool::new(
2330 thread.project().clone(),
2331 environment,
2332 ));
2333 thread.send(message_id.clone(), ["run a command"], cx)
2334 })
2335 .unwrap();
2336
2337 cx.run_until_parked();
2338
2339 // Simulate the model calling the terminal tool
2340 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2341 LanguageModelToolUse {
2342 id: "terminal_tool_1".into(),
2343 name: TerminalTool::NAME.into(),
2344 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2345 input: json!({"command": "sleep 1000", "cd": "."}),
2346 is_input_complete: true,
2347 thought_signature: None,
2348 },
2349 ));
2350 fake_model.end_last_completion_stream();
2351
2352 // Wait for the terminal tool to start running
2353 wait_for_terminal_tool_started(&mut events, cx).await;
2354
2355 // Truncate the thread while the terminal is running
2356 thread
2357 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2358 .unwrap();
2359
2360 // Drive the executor to let cancellation complete
2361 let _ = collect_events_until_stop(&mut events, cx).await;
2362
2363 // Verify the terminal was killed
2364 assert!(
2365 handle.was_killed(),
2366 "expected terminal handle to be killed on truncate"
2367 );
2368
2369 // Verify the thread is empty after truncation
2370 thread.update(cx, |thread, _cx| {
2371 assert_eq!(
2372 thread.to_markdown(),
2373 "",
2374 "expected thread to be empty after truncating the only message"
2375 );
2376 });
2377
2378 // Verify we can send a new message after truncation
2379 verify_thread_recovery(&thread, &fake_model, cx).await;
2380}
2381
2382#[gpui::test]
2383async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2384 // Tests that cancellation properly kills all running terminal tools when multiple are active.
2385 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2386 always_allow_tools(cx);
2387 let fake_model = model.as_fake();
2388
2389 let environment = Rc::new(MultiTerminalEnvironment::new());
2390
2391 let mut events = thread
2392 .update(cx, |thread, cx| {
2393 thread.add_tool(crate::TerminalTool::new(
2394 thread.project().clone(),
2395 environment.clone(),
2396 ));
2397 thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2398 })
2399 .unwrap();
2400
2401 cx.run_until_parked();
2402
2403 // Simulate the model calling two terminal tools
2404 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2405 LanguageModelToolUse {
2406 id: "terminal_tool_1".into(),
2407 name: TerminalTool::NAME.into(),
2408 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2409 input: json!({"command": "sleep 1000", "cd": "."}),
2410 is_input_complete: true,
2411 thought_signature: None,
2412 },
2413 ));
2414 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2415 LanguageModelToolUse {
2416 id: "terminal_tool_2".into(),
2417 name: TerminalTool::NAME.into(),
2418 raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2419 input: json!({"command": "sleep 2000", "cd": "."}),
2420 is_input_complete: true,
2421 thought_signature: None,
2422 },
2423 ));
2424 fake_model.end_last_completion_stream();
2425
2426 // Wait for both terminal tools to start by counting terminal content updates
2427 let mut terminals_started = 0;
2428 let deadline = cx.executor().num_cpus() * 100;
2429 for _ in 0..deadline {
2430 cx.run_until_parked();
2431
2432 while let Some(Some(event)) = events.next().now_or_never() {
2433 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2434 update,
2435 ))) = &event
2436 {
2437 if update.fields.content.as_ref().is_some_and(|content| {
2438 content
2439 .iter()
2440 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2441 }) {
2442 terminals_started += 1;
2443 if terminals_started >= 2 {
2444 break;
2445 }
2446 }
2447 }
2448 }
2449 if terminals_started >= 2 {
2450 break;
2451 }
2452
2453 cx.background_executor
2454 .timer(Duration::from_millis(10))
2455 .await;
2456 }
2457 assert!(
2458 terminals_started >= 2,
2459 "expected 2 terminal tools to start, got {terminals_started}"
2460 );
2461
2462 // Cancel the thread while both terminals are running
2463 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2464
2465 // Collect remaining events
2466 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2467
2468 // Verify both terminal handles were killed
2469 let handles = environment.handles();
2470 assert_eq!(
2471 handles.len(),
2472 2,
2473 "expected 2 terminal handles to be created"
2474 );
2475 assert!(
2476 handles[0].was_killed(),
2477 "expected first terminal handle to be killed on cancellation"
2478 );
2479 assert!(
2480 handles[1].was_killed(),
2481 "expected second terminal handle to be killed on cancellation"
2482 );
2483
2484 // Verify we got a cancellation stop event
2485 assert_eq!(
2486 stop_events(remaining_events),
2487 vec![acp::StopReason::Cancelled],
2488 );
2489}
2490
2491#[gpui::test]
2492async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2493 // Tests that clicking the stop button on the terminal card (as opposed to the main
2494 // cancel button) properly reports user stopped via the was_stopped_by_user path.
2495 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2496 always_allow_tools(cx);
2497 let fake_model = model.as_fake();
2498
2499 let environment = Rc::new(cx.update(|cx| {
2500 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2501 }));
2502 let handle = environment.terminal_handle.clone().unwrap();
2503
2504 let mut events = thread
2505 .update(cx, |thread, cx| {
2506 thread.add_tool(crate::TerminalTool::new(
2507 thread.project().clone(),
2508 environment,
2509 ));
2510 thread.send(UserMessageId::new(), ["run a command"], cx)
2511 })
2512 .unwrap();
2513
2514 cx.run_until_parked();
2515
2516 // Simulate the model calling the terminal tool
2517 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2518 LanguageModelToolUse {
2519 id: "terminal_tool_1".into(),
2520 name: TerminalTool::NAME.into(),
2521 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2522 input: json!({"command": "sleep 1000", "cd": "."}),
2523 is_input_complete: true,
2524 thought_signature: None,
2525 },
2526 ));
2527 fake_model.end_last_completion_stream();
2528
2529 // Wait for the terminal tool to start running
2530 wait_for_terminal_tool_started(&mut events, cx).await;
2531
2532 // Simulate user clicking stop on the terminal card itself.
2533 // This sets the flag and signals exit (simulating what the real UI would do).
2534 handle.set_stopped_by_user(true);
2535 handle.killed.store(true, Ordering::SeqCst);
2536 handle.signal_exit();
2537
2538 // Wait for the tool to complete
2539 cx.run_until_parked();
2540
2541 // The thread continues after tool completion - simulate the model ending its turn
2542 fake_model
2543 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2544 fake_model.end_last_completion_stream();
2545
2546 // Collect remaining events
2547 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2548
2549 // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2550 assert_eq!(
2551 stop_events(remaining_events),
2552 vec![acp::StopReason::EndTurn],
2553 );
2554
2555 // Verify the tool result indicates user stopped
2556 thread.update(cx, |thread, _cx| {
2557 let message = thread.last_received_or_pending_message().unwrap();
2558 let agent_message = message.as_agent_message().unwrap();
2559
2560 let tool_use = agent_message
2561 .content
2562 .iter()
2563 .find_map(|content| match content {
2564 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2565 _ => None,
2566 })
2567 .expect("expected tool use in agent message");
2568
2569 let tool_result = agent_message
2570 .tool_results
2571 .get(&tool_use.id)
2572 .expect("expected tool result");
2573
2574 let result_text = match &tool_result.content {
2575 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2576 _ => panic!("expected text content in tool result"),
2577 };
2578
2579 assert!(
2580 result_text.contains("The user stopped this command"),
2581 "expected tool result to indicate user stopped, got: {result_text}"
2582 );
2583 });
2584}
2585
2586#[gpui::test]
2587async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2588 // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2589 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2590 always_allow_tools(cx);
2591 let fake_model = model.as_fake();
2592
2593 let environment = Rc::new(cx.update(|cx| {
2594 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2595 }));
2596 let handle = environment.terminal_handle.clone().unwrap();
2597
2598 let mut events = thread
2599 .update(cx, |thread, cx| {
2600 thread.add_tool(crate::TerminalTool::new(
2601 thread.project().clone(),
2602 environment,
2603 ));
2604 thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2605 })
2606 .unwrap();
2607
2608 cx.run_until_parked();
2609
2610 // Simulate the model calling the terminal tool with a short timeout
2611 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2612 LanguageModelToolUse {
2613 id: "terminal_tool_1".into(),
2614 name: TerminalTool::NAME.into(),
2615 raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2616 input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2617 is_input_complete: true,
2618 thought_signature: None,
2619 },
2620 ));
2621 fake_model.end_last_completion_stream();
2622
2623 // Wait for the terminal tool to start running
2624 wait_for_terminal_tool_started(&mut events, cx).await;
2625
2626 // Advance clock past the timeout
2627 cx.executor().advance_clock(Duration::from_millis(200));
2628 cx.run_until_parked();
2629
2630 // The thread continues after tool completion - simulate the model ending its turn
2631 fake_model
2632 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2633 fake_model.end_last_completion_stream();
2634
2635 // Collect remaining events
2636 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2637
2638 // Verify the terminal was killed due to timeout
2639 assert!(
2640 handle.was_killed(),
2641 "expected terminal handle to be killed on timeout"
2642 );
2643
2644 // Verify we got an EndTurn (the tool completed, just with timeout)
2645 assert_eq!(
2646 stop_events(remaining_events),
2647 vec![acp::StopReason::EndTurn],
2648 );
2649
2650 // Verify the tool result indicates timeout, not user stopped
2651 thread.update(cx, |thread, _cx| {
2652 let message = thread.last_received_or_pending_message().unwrap();
2653 let agent_message = message.as_agent_message().unwrap();
2654
2655 let tool_use = agent_message
2656 .content
2657 .iter()
2658 .find_map(|content| match content {
2659 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2660 _ => None,
2661 })
2662 .expect("expected tool use in agent message");
2663
2664 let tool_result = agent_message
2665 .tool_results
2666 .get(&tool_use.id)
2667 .expect("expected tool result");
2668
2669 let result_text = match &tool_result.content {
2670 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2671 _ => panic!("expected text content in tool result"),
2672 };
2673
2674 assert!(
2675 result_text.contains("timed out"),
2676 "expected tool result to indicate timeout, got: {result_text}"
2677 );
2678 assert!(
2679 !result_text.contains("The user stopped"),
2680 "tool result should not mention user stopped when it timed out, got: {result_text}"
2681 );
2682 });
2683}
2684
2685#[gpui::test]
2686async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2687 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2688 let fake_model = model.as_fake();
2689
2690 let events_1 = thread
2691 .update(cx, |thread, cx| {
2692 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2693 })
2694 .unwrap();
2695 cx.run_until_parked();
2696 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2697 cx.run_until_parked();
2698
2699 let events_2 = thread
2700 .update(cx, |thread, cx| {
2701 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2702 })
2703 .unwrap();
2704 cx.run_until_parked();
2705 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2706 fake_model
2707 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2708 fake_model.end_last_completion_stream();
2709
2710 let events_1 = events_1.collect::<Vec<_>>().await;
2711 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2712 let events_2 = events_2.collect::<Vec<_>>().await;
2713 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2714}
2715
2716#[gpui::test]
2717async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2718 // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2719 // the retry loop waits on a timer. If the user switches models and sends a new message
2720 // during that delay, the old turn should exit immediately instead of retrying with the
2721 // stale model.
2722 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2723 let model_a = model.as_fake();
2724
2725 // Start a turn with model_a.
2726 let events_1 = thread
2727 .update(cx, |thread, cx| {
2728 thread.send(UserMessageId::new(), ["Hello"], cx)
2729 })
2730 .unwrap();
2731 cx.run_until_parked();
2732 assert_eq!(model_a.completion_count(), 1);
2733
2734 // Model returns a retryable upstream 500. The turn enters the retry delay.
2735 model_a.send_last_completion_stream_error(
2736 LanguageModelCompletionError::UpstreamProviderError {
2737 message: "Internal server error".to_string(),
2738 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2739 retry_after: None,
2740 },
2741 );
2742 model_a.end_last_completion_stream();
2743 cx.run_until_parked();
2744
2745 // The old completion was consumed; model_a has no pending requests yet because the
2746 // retry timer hasn't fired.
2747 assert_eq!(model_a.completion_count(), 0);
2748
2749 // Switch to model_b and send a new message. This cancels the old turn.
2750 let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2751 "fake", "model-b", "Model B", false,
2752 ));
2753 thread.update(cx, |thread, cx| {
2754 thread.set_model(model_b.clone(), cx);
2755 });
2756 let events_2 = thread
2757 .update(cx, |thread, cx| {
2758 thread.send(UserMessageId::new(), ["Continue"], cx)
2759 })
2760 .unwrap();
2761 cx.run_until_parked();
2762
2763 // model_b should have received its completion request.
2764 assert_eq!(model_b.as_fake().completion_count(), 1);
2765
2766 // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2767 cx.executor().advance_clock(Duration::from_secs(10));
2768 cx.run_until_parked();
2769
2770 // model_a must NOT have received another completion request — the cancelled turn
2771 // should have exited during the retry delay rather than retrying with the old model.
2772 assert_eq!(
2773 model_a.completion_count(),
2774 0,
2775 "old model should not receive a retry request after cancellation"
2776 );
2777
2778 // Complete model_b's turn.
2779 model_b
2780 .as_fake()
2781 .send_last_completion_stream_text_chunk("Done!");
2782 model_b
2783 .as_fake()
2784 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2785 model_b.as_fake().end_last_completion_stream();
2786
2787 let events_1 = events_1.collect::<Vec<_>>().await;
2788 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2789
2790 let events_2 = events_2.collect::<Vec<_>>().await;
2791 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2792}
2793
2794#[gpui::test]
2795async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2796 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2797 let fake_model = model.as_fake();
2798
2799 let events_1 = thread
2800 .update(cx, |thread, cx| {
2801 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2802 })
2803 .unwrap();
2804 cx.run_until_parked();
2805 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2806 fake_model
2807 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2808 fake_model.end_last_completion_stream();
2809 let events_1 = events_1.collect::<Vec<_>>().await;
2810
2811 let events_2 = thread
2812 .update(cx, |thread, cx| {
2813 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2814 })
2815 .unwrap();
2816 cx.run_until_parked();
2817 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2818 fake_model
2819 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2820 fake_model.end_last_completion_stream();
2821 let events_2 = events_2.collect::<Vec<_>>().await;
2822
2823 assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2824 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2825}
2826
2827#[gpui::test]
2828async fn test_refusal(cx: &mut TestAppContext) {
2829 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2830 let fake_model = model.as_fake();
2831
2832 let events = thread
2833 .update(cx, |thread, cx| {
2834 thread.send(UserMessageId::new(), ["Hello"], cx)
2835 })
2836 .unwrap();
2837 cx.run_until_parked();
2838 thread.read_with(cx, |thread, _| {
2839 assert_eq!(
2840 thread.to_markdown(),
2841 indoc! {"
2842 ## User
2843
2844 Hello
2845 "}
2846 );
2847 });
2848
2849 fake_model.send_last_completion_stream_text_chunk("Hey!");
2850 cx.run_until_parked();
2851 thread.read_with(cx, |thread, _| {
2852 assert_eq!(
2853 thread.to_markdown(),
2854 indoc! {"
2855 ## User
2856
2857 Hello
2858
2859 ## Assistant
2860
2861 Hey!
2862 "}
2863 );
2864 });
2865
2866 // If the model refuses to continue, the thread should remove all the messages after the last user message.
2867 fake_model
2868 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2869 let events = events.collect::<Vec<_>>().await;
2870 assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2871 thread.read_with(cx, |thread, _| {
2872 assert_eq!(thread.to_markdown(), "");
2873 });
2874}
2875
2876#[gpui::test]
2877async fn test_truncate_first_message(cx: &mut TestAppContext) {
2878 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2879 let fake_model = model.as_fake();
2880
2881 let message_id = UserMessageId::new();
2882 thread
2883 .update(cx, |thread, cx| {
2884 thread.send(message_id.clone(), ["Hello"], cx)
2885 })
2886 .unwrap();
2887 cx.run_until_parked();
2888 thread.read_with(cx, |thread, _| {
2889 assert_eq!(
2890 thread.to_markdown(),
2891 indoc! {"
2892 ## User
2893
2894 Hello
2895 "}
2896 );
2897 assert_eq!(thread.latest_token_usage(), None);
2898 });
2899
2900 fake_model.send_last_completion_stream_text_chunk("Hey!");
2901 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2902 language_model::TokenUsage {
2903 input_tokens: 32_000,
2904 output_tokens: 16_000,
2905 cache_creation_input_tokens: 0,
2906 cache_read_input_tokens: 0,
2907 },
2908 ));
2909 cx.run_until_parked();
2910 thread.read_with(cx, |thread, _| {
2911 assert_eq!(
2912 thread.to_markdown(),
2913 indoc! {"
2914 ## User
2915
2916 Hello
2917
2918 ## Assistant
2919
2920 Hey!
2921 "}
2922 );
2923 assert_eq!(
2924 thread.latest_token_usage(),
2925 Some(acp_thread::TokenUsage {
2926 used_tokens: 32_000 + 16_000,
2927 max_tokens: 1_000_000,
2928 max_output_tokens: None,
2929 input_tokens: 32_000,
2930 output_tokens: 16_000,
2931 })
2932 );
2933 });
2934
2935 thread
2936 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2937 .unwrap();
2938 cx.run_until_parked();
2939 thread.read_with(cx, |thread, _| {
2940 assert_eq!(thread.to_markdown(), "");
2941 assert_eq!(thread.latest_token_usage(), None);
2942 });
2943
2944 // Ensure we can still send a new message after truncation.
2945 thread
2946 .update(cx, |thread, cx| {
2947 thread.send(UserMessageId::new(), ["Hi"], cx)
2948 })
2949 .unwrap();
2950 thread.update(cx, |thread, _cx| {
2951 assert_eq!(
2952 thread.to_markdown(),
2953 indoc! {"
2954 ## User
2955
2956 Hi
2957 "}
2958 );
2959 });
2960 cx.run_until_parked();
2961 fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2962 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2963 language_model::TokenUsage {
2964 input_tokens: 40_000,
2965 output_tokens: 20_000,
2966 cache_creation_input_tokens: 0,
2967 cache_read_input_tokens: 0,
2968 },
2969 ));
2970 cx.run_until_parked();
2971 thread.read_with(cx, |thread, _| {
2972 assert_eq!(
2973 thread.to_markdown(),
2974 indoc! {"
2975 ## User
2976
2977 Hi
2978
2979 ## Assistant
2980
2981 Ahoy!
2982 "}
2983 );
2984
2985 assert_eq!(
2986 thread.latest_token_usage(),
2987 Some(acp_thread::TokenUsage {
2988 used_tokens: 40_000 + 20_000,
2989 max_tokens: 1_000_000,
2990 max_output_tokens: None,
2991 input_tokens: 40_000,
2992 output_tokens: 20_000,
2993 })
2994 );
2995 });
2996}
2997
2998#[gpui::test]
2999async fn test_truncate_second_message(cx: &mut TestAppContext) {
3000 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3001 let fake_model = model.as_fake();
3002
3003 thread
3004 .update(cx, |thread, cx| {
3005 thread.send(UserMessageId::new(), ["Message 1"], cx)
3006 })
3007 .unwrap();
3008 cx.run_until_parked();
3009 fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3010 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3011 language_model::TokenUsage {
3012 input_tokens: 32_000,
3013 output_tokens: 16_000,
3014 cache_creation_input_tokens: 0,
3015 cache_read_input_tokens: 0,
3016 },
3017 ));
3018 fake_model.end_last_completion_stream();
3019 cx.run_until_parked();
3020
3021 let assert_first_message_state = |cx: &mut TestAppContext| {
3022 thread.clone().read_with(cx, |thread, _| {
3023 assert_eq!(
3024 thread.to_markdown(),
3025 indoc! {"
3026 ## User
3027
3028 Message 1
3029
3030 ## Assistant
3031
3032 Message 1 response
3033 "}
3034 );
3035
3036 assert_eq!(
3037 thread.latest_token_usage(),
3038 Some(acp_thread::TokenUsage {
3039 used_tokens: 32_000 + 16_000,
3040 max_tokens: 1_000_000,
3041 max_output_tokens: None,
3042 input_tokens: 32_000,
3043 output_tokens: 16_000,
3044 })
3045 );
3046 });
3047 };
3048
3049 assert_first_message_state(cx);
3050
3051 let second_message_id = UserMessageId::new();
3052 thread
3053 .update(cx, |thread, cx| {
3054 thread.send(second_message_id.clone(), ["Message 2"], cx)
3055 })
3056 .unwrap();
3057 cx.run_until_parked();
3058
3059 fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3060 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3061 language_model::TokenUsage {
3062 input_tokens: 40_000,
3063 output_tokens: 20_000,
3064 cache_creation_input_tokens: 0,
3065 cache_read_input_tokens: 0,
3066 },
3067 ));
3068 fake_model.end_last_completion_stream();
3069 cx.run_until_parked();
3070
3071 thread.read_with(cx, |thread, _| {
3072 assert_eq!(
3073 thread.to_markdown(),
3074 indoc! {"
3075 ## User
3076
3077 Message 1
3078
3079 ## Assistant
3080
3081 Message 1 response
3082
3083 ## User
3084
3085 Message 2
3086
3087 ## Assistant
3088
3089 Message 2 response
3090 "}
3091 );
3092
3093 assert_eq!(
3094 thread.latest_token_usage(),
3095 Some(acp_thread::TokenUsage {
3096 used_tokens: 40_000 + 20_000,
3097 max_tokens: 1_000_000,
3098 max_output_tokens: None,
3099 input_tokens: 40_000,
3100 output_tokens: 20_000,
3101 })
3102 );
3103 });
3104
3105 thread
3106 .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3107 .unwrap();
3108 cx.run_until_parked();
3109
3110 assert_first_message_state(cx);
3111}
3112
3113#[gpui::test]
3114async fn test_title_generation(cx: &mut TestAppContext) {
3115 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3116 let fake_model = model.as_fake();
3117
3118 let summary_model = Arc::new(FakeLanguageModel::default());
3119 thread.update(cx, |thread, cx| {
3120 thread.set_summarization_model(Some(summary_model.clone()), cx)
3121 });
3122
3123 let send = thread
3124 .update(cx, |thread, cx| {
3125 thread.send(UserMessageId::new(), ["Hello"], cx)
3126 })
3127 .unwrap();
3128 cx.run_until_parked();
3129
3130 fake_model.send_last_completion_stream_text_chunk("Hey!");
3131 fake_model.end_last_completion_stream();
3132 cx.run_until_parked();
3133 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), None));
3134
3135 // Ensure the summary model has been invoked to generate a title.
3136 summary_model.send_last_completion_stream_text_chunk("Hello ");
3137 summary_model.send_last_completion_stream_text_chunk("world\nG");
3138 summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3139 summary_model.end_last_completion_stream();
3140 send.collect::<Vec<_>>().await;
3141 cx.run_until_parked();
3142 thread.read_with(cx, |thread, _| {
3143 assert_eq!(thread.title(), Some("Hello world".into()))
3144 });
3145
3146 // Send another message, ensuring no title is generated this time.
3147 let send = thread
3148 .update(cx, |thread, cx| {
3149 thread.send(UserMessageId::new(), ["Hello again"], cx)
3150 })
3151 .unwrap();
3152 cx.run_until_parked();
3153 fake_model.send_last_completion_stream_text_chunk("Hey again!");
3154 fake_model.end_last_completion_stream();
3155 cx.run_until_parked();
3156 assert_eq!(summary_model.pending_completions(), Vec::new());
3157 send.collect::<Vec<_>>().await;
3158 thread.read_with(cx, |thread, _| {
3159 assert_eq!(thread.title(), Some("Hello world".into()))
3160 });
3161}
3162
3163#[gpui::test]
3164async fn test_title_generation_failure_allows_retry(cx: &mut TestAppContext) {
3165 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3166 let fake_model = model.as_fake();
3167
3168 let summary_model = Arc::new(FakeLanguageModel::default());
3169 let fake_summary_model = summary_model.as_fake();
3170 thread.update(cx, |thread, cx| {
3171 thread.set_summarization_model(Some(summary_model.clone()), cx)
3172 });
3173
3174 let send = thread
3175 .update(cx, |thread, cx| {
3176 thread.send(UserMessageId::new(), ["Hello"], cx)
3177 })
3178 .unwrap();
3179 cx.run_until_parked();
3180
3181 fake_model.send_last_completion_stream_text_chunk("Hey!");
3182 fake_model.end_last_completion_stream();
3183 cx.run_until_parked();
3184
3185 fake_summary_model.send_last_completion_stream_error(
3186 LanguageModelCompletionError::UpstreamProviderError {
3187 message: "Internal server error".to_string(),
3188 status: gpui::http_client::StatusCode::INTERNAL_SERVER_ERROR,
3189 retry_after: None,
3190 },
3191 );
3192 fake_summary_model.end_last_completion_stream();
3193 send.collect::<Vec<_>>().await;
3194 cx.run_until_parked();
3195
3196 thread.read_with(cx, |thread, _| {
3197 assert_eq!(thread.title(), None);
3198 assert!(thread.has_failed_title_generation());
3199 assert!(!thread.is_generating_title());
3200 });
3201
3202 thread.update(cx, |thread, cx| {
3203 thread.generate_title(cx);
3204 });
3205 cx.run_until_parked();
3206
3207 thread.read_with(cx, |thread, _| {
3208 assert!(!thread.has_failed_title_generation());
3209 assert!(thread.is_generating_title());
3210 });
3211
3212 fake_summary_model.send_last_completion_stream_text_chunk("Retried title");
3213 fake_summary_model.end_last_completion_stream();
3214 cx.run_until_parked();
3215
3216 thread.read_with(cx, |thread, _| {
3217 assert_eq!(thread.title(), Some("Retried title".into()));
3218 assert!(!thread.has_failed_title_generation());
3219 assert!(!thread.is_generating_title());
3220 });
3221}
3222
3223#[gpui::test]
3224async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3225 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3226 let fake_model = model.as_fake();
3227
3228 let _events = thread
3229 .update(cx, |thread, cx| {
3230 thread.add_tool(ToolRequiringPermission);
3231 thread.add_tool(EchoTool);
3232 thread.send(UserMessageId::new(), ["Hey!"], cx)
3233 })
3234 .unwrap();
3235 cx.run_until_parked();
3236
3237 let permission_tool_use = LanguageModelToolUse {
3238 id: "tool_id_1".into(),
3239 name: ToolRequiringPermission::NAME.into(),
3240 raw_input: "{}".into(),
3241 input: json!({}),
3242 is_input_complete: true,
3243 thought_signature: None,
3244 };
3245 let echo_tool_use = LanguageModelToolUse {
3246 id: "tool_id_2".into(),
3247 name: EchoTool::NAME.into(),
3248 raw_input: json!({"text": "test"}).to_string(),
3249 input: json!({"text": "test"}),
3250 is_input_complete: true,
3251 thought_signature: None,
3252 };
3253 fake_model.send_last_completion_stream_text_chunk("Hi!");
3254 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3255 permission_tool_use,
3256 ));
3257 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3258 echo_tool_use.clone(),
3259 ));
3260 fake_model.end_last_completion_stream();
3261 cx.run_until_parked();
3262
3263 // Ensure pending tools are skipped when building a request.
3264 let request = thread
3265 .read_with(cx, |thread, cx| {
3266 thread.build_completion_request(CompletionIntent::EditFile, cx)
3267 })
3268 .unwrap();
3269 assert_eq!(
3270 request.messages[1..],
3271 vec![
3272 LanguageModelRequestMessage {
3273 role: Role::User,
3274 content: vec!["Hey!".into()],
3275 cache: true,
3276 reasoning_details: None,
3277 },
3278 LanguageModelRequestMessage {
3279 role: Role::Assistant,
3280 content: vec![
3281 MessageContent::Text("Hi!".into()),
3282 MessageContent::ToolUse(echo_tool_use.clone())
3283 ],
3284 cache: false,
3285 reasoning_details: None,
3286 },
3287 LanguageModelRequestMessage {
3288 role: Role::User,
3289 content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3290 tool_use_id: echo_tool_use.id.clone(),
3291 tool_name: echo_tool_use.name,
3292 is_error: false,
3293 content: "test".into(),
3294 output: Some("test".into())
3295 })],
3296 cache: false,
3297 reasoning_details: None,
3298 },
3299 ],
3300 );
3301}
3302
3303#[gpui::test]
3304async fn test_agent_connection(cx: &mut TestAppContext) {
3305 cx.update(settings::init);
3306 let templates = Templates::new();
3307
3308 // Initialize language model system with test provider
3309 cx.update(|cx| {
3310 gpui_tokio::init(cx);
3311
3312 let http_client = FakeHttpClient::with_404_response();
3313 let clock = Arc::new(clock::FakeSystemClock::new());
3314 let client = Client::new(clock, http_client, cx);
3315 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3316 language_model::init(cx);
3317 RefreshLlmTokenListener::register(client.clone(), user_store.clone(), cx);
3318 language_models::init(user_store, client.clone(), cx);
3319 LanguageModelRegistry::test(cx);
3320 });
3321 cx.executor().forbid_parking();
3322
3323 // Create a project for new_thread
3324 let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3325 fake_fs.insert_tree(path!("/test"), json!({})).await;
3326 let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3327 let cwd = PathList::new(&[Path::new("/test")]);
3328 let thread_store = cx.new(|cx| ThreadStore::new(cx));
3329
3330 // Create agent and connection
3331 let agent = cx
3332 .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3333 let connection = NativeAgentConnection(agent.clone());
3334
3335 // Create a thread using new_thread
3336 let connection_rc = Rc::new(connection.clone());
3337 let acp_thread = cx
3338 .update(|cx| connection_rc.new_session(project, cwd, cx))
3339 .await
3340 .expect("new_thread should succeed");
3341
3342 // Get the session_id from the AcpThread
3343 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3344
3345 // Test model_selector returns Some
3346 let selector_opt = connection.model_selector(&session_id);
3347 assert!(
3348 selector_opt.is_some(),
3349 "agent should always support ModelSelector"
3350 );
3351 let selector = selector_opt.unwrap();
3352
3353 // Test list_models
3354 let listed_models = cx
3355 .update(|cx| selector.list_models(cx))
3356 .await
3357 .expect("list_models should succeed");
3358 let AgentModelList::Grouped(listed_models) = listed_models else {
3359 panic!("Unexpected model list type");
3360 };
3361 assert!(!listed_models.is_empty(), "should have at least one model");
3362 assert_eq!(
3363 listed_models[&AgentModelGroupName("Fake".into())][0]
3364 .id
3365 .0
3366 .as_ref(),
3367 "fake/fake"
3368 );
3369
3370 // Test selected_model returns the default
3371 let model = cx
3372 .update(|cx| selector.selected_model(cx))
3373 .await
3374 .expect("selected_model should succeed");
3375 let model = cx
3376 .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3377 .unwrap();
3378 let model = model.as_fake();
3379 assert_eq!(model.id().0, "fake", "should return default model");
3380
3381 let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3382 cx.run_until_parked();
3383 model.send_last_completion_stream_text_chunk("def");
3384 cx.run_until_parked();
3385 acp_thread.read_with(cx, |thread, cx| {
3386 assert_eq!(
3387 thread.to_markdown(cx),
3388 indoc! {"
3389 ## User
3390
3391 abc
3392
3393 ## Assistant
3394
3395 def
3396
3397 "}
3398 )
3399 });
3400
3401 // Test cancel
3402 cx.update(|cx| connection.cancel(&session_id, cx));
3403 request.await.expect("prompt should fail gracefully");
3404
3405 // Explicitly close the session and drop the ACP thread.
3406 cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3407 .await
3408 .unwrap();
3409 drop(acp_thread);
3410 let result = cx
3411 .update(|cx| {
3412 connection.prompt(
3413 acp_thread::UserMessageId::new(),
3414 acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3415 cx,
3416 )
3417 })
3418 .await;
3419 assert_eq!(
3420 result.as_ref().unwrap_err().to_string(),
3421 "Session not found",
3422 "unexpected result: {:?}",
3423 result
3424 );
3425}
3426
3427#[gpui::test]
3428async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3429 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3430 thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3431 let fake_model = model.as_fake();
3432
3433 let mut events = thread
3434 .update(cx, |thread, cx| {
3435 thread.send(UserMessageId::new(), ["Echo something"], cx)
3436 })
3437 .unwrap();
3438 cx.run_until_parked();
3439
3440 // Simulate streaming partial input.
3441 let input = json!({});
3442 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3443 LanguageModelToolUse {
3444 id: "1".into(),
3445 name: EchoTool::NAME.into(),
3446 raw_input: input.to_string(),
3447 input,
3448 is_input_complete: false,
3449 thought_signature: None,
3450 },
3451 ));
3452
3453 // Input streaming completed
3454 let input = json!({ "text": "Hello!" });
3455 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3456 LanguageModelToolUse {
3457 id: "1".into(),
3458 name: "echo".into(),
3459 raw_input: input.to_string(),
3460 input,
3461 is_input_complete: true,
3462 thought_signature: None,
3463 },
3464 ));
3465 fake_model.end_last_completion_stream();
3466 cx.run_until_parked();
3467
3468 let tool_call = expect_tool_call(&mut events).await;
3469 assert_eq!(
3470 tool_call,
3471 acp::ToolCall::new("1", "Echo")
3472 .raw_input(json!({}))
3473 .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3474 );
3475 let update = expect_tool_call_update_fields(&mut events).await;
3476 assert_eq!(
3477 update,
3478 acp::ToolCallUpdate::new(
3479 "1",
3480 acp::ToolCallUpdateFields::new()
3481 .title("Echo")
3482 .kind(acp::ToolKind::Other)
3483 .raw_input(json!({ "text": "Hello!"}))
3484 )
3485 );
3486 let update = expect_tool_call_update_fields(&mut events).await;
3487 assert_eq!(
3488 update,
3489 acp::ToolCallUpdate::new(
3490 "1",
3491 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3492 )
3493 );
3494 let update = expect_tool_call_update_fields(&mut events).await;
3495 assert_eq!(
3496 update,
3497 acp::ToolCallUpdate::new(
3498 "1",
3499 acp::ToolCallUpdateFields::new()
3500 .status(acp::ToolCallStatus::Completed)
3501 .raw_output("Hello!")
3502 )
3503 );
3504}
3505
3506#[gpui::test]
3507async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3508 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3509 thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3510 let fake_model = model.as_fake();
3511
3512 let mut events = thread
3513 .update(cx, |thread, cx| {
3514 thread.send(UserMessageId::new(), ["Make a plan"], cx)
3515 })
3516 .unwrap();
3517 cx.run_until_parked();
3518
3519 let input = json!({
3520 "plan": [
3521 {
3522 "step": "Inspect the code",
3523 "status": "completed",
3524 },
3525 {
3526 "step": "Implement the tool",
3527 "status": "in_progress"
3528 },
3529 {
3530 "step": "Run tests",
3531 "status": "pending",
3532 }
3533 ]
3534 });
3535 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3536 LanguageModelToolUse {
3537 id: "plan_1".into(),
3538 name: UpdatePlanTool::NAME.into(),
3539 raw_input: input.to_string(),
3540 input,
3541 is_input_complete: true,
3542 thought_signature: None,
3543 },
3544 ));
3545 fake_model.end_last_completion_stream();
3546 cx.run_until_parked();
3547
3548 let tool_call = expect_tool_call(&mut events).await;
3549 assert_eq!(
3550 tool_call,
3551 acp::ToolCall::new("plan_1", "Update plan")
3552 .kind(acp::ToolKind::Think)
3553 .raw_input(json!({
3554 "plan": [
3555 {
3556 "step": "Inspect the code",
3557 "status": "completed",
3558 },
3559 {
3560 "step": "Implement the tool",
3561 "status": "in_progress"
3562 },
3563 {
3564 "step": "Run tests",
3565 "status": "pending",
3566 }
3567 ]
3568 }))
3569 .meta(acp::Meta::from_iter([(
3570 "tool_name".into(),
3571 "update_plan".into()
3572 )]))
3573 );
3574
3575 let update = expect_tool_call_update_fields(&mut events).await;
3576 assert_eq!(
3577 update,
3578 acp::ToolCallUpdate::new(
3579 "plan_1",
3580 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3581 )
3582 );
3583
3584 let plan = expect_plan(&mut events).await;
3585 assert_eq!(
3586 plan,
3587 acp::Plan::new(vec![
3588 acp::PlanEntry::new(
3589 "Inspect the code",
3590 acp::PlanEntryPriority::Medium,
3591 acp::PlanEntryStatus::Completed,
3592 ),
3593 acp::PlanEntry::new(
3594 "Implement the tool",
3595 acp::PlanEntryPriority::Medium,
3596 acp::PlanEntryStatus::InProgress,
3597 ),
3598 acp::PlanEntry::new(
3599 "Run tests",
3600 acp::PlanEntryPriority::Medium,
3601 acp::PlanEntryStatus::Pending,
3602 ),
3603 ])
3604 );
3605
3606 let update = expect_tool_call_update_fields(&mut events).await;
3607 assert_eq!(
3608 update,
3609 acp::ToolCallUpdate::new(
3610 "plan_1",
3611 acp::ToolCallUpdateFields::new()
3612 .status(acp::ToolCallStatus::Completed)
3613 .raw_output("Plan updated")
3614 )
3615 );
3616}
3617
3618#[gpui::test]
3619async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3620 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3621 let fake_model = model.as_fake();
3622
3623 let mut events = thread
3624 .update(cx, |thread, cx| {
3625 thread.send(UserMessageId::new(), ["Hello!"], cx)
3626 })
3627 .unwrap();
3628 cx.run_until_parked();
3629
3630 fake_model.send_last_completion_stream_text_chunk("Hey!");
3631 fake_model.end_last_completion_stream();
3632
3633 let mut retry_events = Vec::new();
3634 while let Some(Ok(event)) = events.next().await {
3635 match event {
3636 ThreadEvent::Retry(retry_status) => {
3637 retry_events.push(retry_status);
3638 }
3639 ThreadEvent::Stop(..) => break,
3640 _ => {}
3641 }
3642 }
3643
3644 assert_eq!(retry_events.len(), 0);
3645 thread.read_with(cx, |thread, _cx| {
3646 assert_eq!(
3647 thread.to_markdown(),
3648 indoc! {"
3649 ## User
3650
3651 Hello!
3652
3653 ## Assistant
3654
3655 Hey!
3656 "}
3657 )
3658 });
3659}
3660
3661#[gpui::test]
3662async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3663 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3664 let fake_model = model.as_fake();
3665
3666 let mut events = thread
3667 .update(cx, |thread, cx| {
3668 thread.send(UserMessageId::new(), ["Hello!"], cx)
3669 })
3670 .unwrap();
3671 cx.run_until_parked();
3672
3673 fake_model.send_last_completion_stream_text_chunk("Hey,");
3674 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3675 provider: LanguageModelProviderName::new("Anthropic"),
3676 retry_after: Some(Duration::from_secs(3)),
3677 });
3678 fake_model.end_last_completion_stream();
3679
3680 cx.executor().advance_clock(Duration::from_secs(3));
3681 cx.run_until_parked();
3682
3683 fake_model.send_last_completion_stream_text_chunk("there!");
3684 fake_model.end_last_completion_stream();
3685 cx.run_until_parked();
3686
3687 let mut retry_events = Vec::new();
3688 while let Some(Ok(event)) = events.next().await {
3689 match event {
3690 ThreadEvent::Retry(retry_status) => {
3691 retry_events.push(retry_status);
3692 }
3693 ThreadEvent::Stop(..) => break,
3694 _ => {}
3695 }
3696 }
3697
3698 assert_eq!(retry_events.len(), 1);
3699 assert!(matches!(
3700 retry_events[0],
3701 acp_thread::RetryStatus { attempt: 1, .. }
3702 ));
3703 thread.read_with(cx, |thread, _cx| {
3704 assert_eq!(
3705 thread.to_markdown(),
3706 indoc! {"
3707 ## User
3708
3709 Hello!
3710
3711 ## Assistant
3712
3713 Hey,
3714
3715 [resume]
3716
3717 ## Assistant
3718
3719 there!
3720 "}
3721 )
3722 });
3723}
3724
3725#[gpui::test]
3726async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3727 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3728 let fake_model = model.as_fake();
3729
3730 let events = thread
3731 .update(cx, |thread, cx| {
3732 thread.add_tool(EchoTool);
3733 thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3734 })
3735 .unwrap();
3736 cx.run_until_parked();
3737
3738 let tool_use_1 = LanguageModelToolUse {
3739 id: "tool_1".into(),
3740 name: EchoTool::NAME.into(),
3741 raw_input: json!({"text": "test"}).to_string(),
3742 input: json!({"text": "test"}),
3743 is_input_complete: true,
3744 thought_signature: None,
3745 };
3746 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3747 tool_use_1.clone(),
3748 ));
3749 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3750 provider: LanguageModelProviderName::new("Anthropic"),
3751 retry_after: Some(Duration::from_secs(3)),
3752 });
3753 fake_model.end_last_completion_stream();
3754
3755 cx.executor().advance_clock(Duration::from_secs(3));
3756 let completion = fake_model.pending_completions().pop().unwrap();
3757 assert_eq!(
3758 completion.messages[1..],
3759 vec![
3760 LanguageModelRequestMessage {
3761 role: Role::User,
3762 content: vec!["Call the echo tool!".into()],
3763 cache: false,
3764 reasoning_details: None,
3765 },
3766 LanguageModelRequestMessage {
3767 role: Role::Assistant,
3768 content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3769 cache: false,
3770 reasoning_details: None,
3771 },
3772 LanguageModelRequestMessage {
3773 role: Role::User,
3774 content: vec![language_model::MessageContent::ToolResult(
3775 LanguageModelToolResult {
3776 tool_use_id: tool_use_1.id.clone(),
3777 tool_name: tool_use_1.name.clone(),
3778 is_error: false,
3779 content: "test".into(),
3780 output: Some("test".into())
3781 }
3782 )],
3783 cache: true,
3784 reasoning_details: None,
3785 },
3786 ]
3787 );
3788
3789 fake_model.send_last_completion_stream_text_chunk("Done");
3790 fake_model.end_last_completion_stream();
3791 cx.run_until_parked();
3792 events.collect::<Vec<_>>().await;
3793 thread.read_with(cx, |thread, _cx| {
3794 assert_eq!(
3795 thread.last_received_or_pending_message(),
3796 Some(Message::Agent(AgentMessage {
3797 content: vec![AgentMessageContent::Text("Done".into())],
3798 tool_results: IndexMap::default(),
3799 reasoning_details: None,
3800 }))
3801 );
3802 })
3803}
3804
3805#[gpui::test]
3806async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3807 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3808 let fake_model = model.as_fake();
3809
3810 let mut events = thread
3811 .update(cx, |thread, cx| {
3812 thread.send(UserMessageId::new(), ["Hello!"], cx)
3813 })
3814 .unwrap();
3815 cx.run_until_parked();
3816
3817 for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3818 fake_model.send_last_completion_stream_error(
3819 LanguageModelCompletionError::ServerOverloaded {
3820 provider: LanguageModelProviderName::new("Anthropic"),
3821 retry_after: Some(Duration::from_secs(3)),
3822 },
3823 );
3824 fake_model.end_last_completion_stream();
3825 cx.executor().advance_clock(Duration::from_secs(3));
3826 cx.run_until_parked();
3827 }
3828
3829 let mut errors = Vec::new();
3830 let mut retry_events = Vec::new();
3831 while let Some(event) = events.next().await {
3832 match event {
3833 Ok(ThreadEvent::Retry(retry_status)) => {
3834 retry_events.push(retry_status);
3835 }
3836 Ok(ThreadEvent::Stop(..)) => break,
3837 Err(error) => errors.push(error),
3838 _ => {}
3839 }
3840 }
3841
3842 assert_eq!(
3843 retry_events.len(),
3844 crate::thread::MAX_RETRY_ATTEMPTS as usize
3845 );
3846 for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3847 assert_eq!(retry_events[i].attempt, i + 1);
3848 }
3849 assert_eq!(errors.len(), 1);
3850 let error = errors[0]
3851 .downcast_ref::<LanguageModelCompletionError>()
3852 .unwrap();
3853 assert!(matches!(
3854 error,
3855 LanguageModelCompletionError::ServerOverloaded { .. }
3856 ));
3857}
3858
3859#[gpui::test]
3860async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3861 cx: &mut TestAppContext,
3862) {
3863 init_test(cx);
3864 always_allow_tools(cx);
3865
3866 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3867 let fake_model = model.as_fake();
3868
3869 thread.update(cx, |thread, _cx| {
3870 thread.add_tool(StreamingEchoTool::new());
3871 });
3872
3873 let _events = thread
3874 .update(cx, |thread, cx| {
3875 thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3876 })
3877 .unwrap();
3878 cx.run_until_parked();
3879
3880 // Send a partial tool use (is_input_complete = false), simulating the LLM
3881 // streaming input for a tool.
3882 let tool_use = LanguageModelToolUse {
3883 id: "tool_1".into(),
3884 name: "streaming_echo".into(),
3885 raw_input: r#"{"text": "partial"}"#.into(),
3886 input: json!({"text": "partial"}),
3887 is_input_complete: false,
3888 thought_signature: None,
3889 };
3890 fake_model
3891 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3892 cx.run_until_parked();
3893
3894 // Send a stream error WITHOUT ever sending is_input_complete = true.
3895 // Before the fix, this would deadlock: the tool waits for more partials
3896 // (or cancellation), run_turn_internal waits for the tool, and the sender
3897 // keeping the channel open lives inside RunningTurn.
3898 fake_model.send_last_completion_stream_error(
3899 LanguageModelCompletionError::UpstreamProviderError {
3900 message: "Internal server error".to_string(),
3901 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3902 retry_after: None,
3903 },
3904 );
3905 fake_model.end_last_completion_stream();
3906
3907 // Advance past the retry delay so run_turn_internal retries.
3908 cx.executor().advance_clock(Duration::from_secs(5));
3909 cx.run_until_parked();
3910
3911 // The retry request should contain the streaming tool's error result,
3912 // proving the tool terminated and its result was forwarded.
3913 let completion = fake_model
3914 .pending_completions()
3915 .pop()
3916 .expect("No running turn");
3917 assert_eq!(
3918 completion.messages[1..],
3919 vec![
3920 LanguageModelRequestMessage {
3921 role: Role::User,
3922 content: vec!["Use the streaming_echo tool".into()],
3923 cache: false,
3924 reasoning_details: None,
3925 },
3926 LanguageModelRequestMessage {
3927 role: Role::Assistant,
3928 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3929 cache: false,
3930 reasoning_details: None,
3931 },
3932 LanguageModelRequestMessage {
3933 role: Role::User,
3934 content: vec![language_model::MessageContent::ToolResult(
3935 LanguageModelToolResult {
3936 tool_use_id: tool_use.id.clone(),
3937 tool_name: tool_use.name,
3938 is_error: true,
3939 content: "Failed to receive tool input: tool input was not fully received"
3940 .into(),
3941 output: Some(
3942 "Failed to receive tool input: tool input was not fully received"
3943 .into()
3944 ),
3945 }
3946 )],
3947 cache: true,
3948 reasoning_details: None,
3949 },
3950 ]
3951 );
3952
3953 // Finish the retry round so the turn completes cleanly.
3954 fake_model.send_last_completion_stream_text_chunk("Done");
3955 fake_model.end_last_completion_stream();
3956 cx.run_until_parked();
3957
3958 thread.read_with(cx, |thread, _cx| {
3959 assert!(
3960 thread.is_turn_complete(),
3961 "Thread should not be stuck; the turn should have completed",
3962 );
3963 });
3964}
3965
3966#[gpui::test]
3967async fn test_streaming_tool_json_parse_error_is_forwarded_to_running_tool(
3968 cx: &mut TestAppContext,
3969) {
3970 init_test(cx);
3971 always_allow_tools(cx);
3972
3973 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3974 let fake_model = model.as_fake();
3975
3976 thread.update(cx, |thread, _cx| {
3977 thread.add_tool(StreamingJsonErrorContextTool);
3978 });
3979
3980 let _events = thread
3981 .update(cx, |thread, cx| {
3982 thread.send(
3983 UserMessageId::new(),
3984 ["Use the streaming_json_error_context tool"],
3985 cx,
3986 )
3987 })
3988 .unwrap();
3989 cx.run_until_parked();
3990
3991 let tool_use = LanguageModelToolUse {
3992 id: "tool_1".into(),
3993 name: StreamingJsonErrorContextTool::NAME.into(),
3994 raw_input: r#"{"text": "partial"#.into(),
3995 input: json!({"text": "partial"}),
3996 is_input_complete: false,
3997 thought_signature: None,
3998 };
3999 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use));
4000 cx.run_until_parked();
4001
4002 fake_model.send_last_completion_stream_event(
4003 LanguageModelCompletionEvent::ToolUseJsonParseError {
4004 id: "tool_1".into(),
4005 tool_name: StreamingJsonErrorContextTool::NAME.into(),
4006 raw_input: r#"{"text": "partial"#.into(),
4007 json_parse_error: "EOF while parsing a string at line 1 column 17".into(),
4008 },
4009 );
4010 fake_model
4011 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
4012 fake_model.end_last_completion_stream();
4013 cx.run_until_parked();
4014
4015 cx.executor().advance_clock(Duration::from_secs(5));
4016 cx.run_until_parked();
4017
4018 let completion = fake_model
4019 .pending_completions()
4020 .pop()
4021 .expect("No running turn");
4022
4023 let tool_results: Vec<_> = completion
4024 .messages
4025 .iter()
4026 .flat_map(|message| &message.content)
4027 .filter_map(|content| match content {
4028 MessageContent::ToolResult(result)
4029 if result.tool_use_id == language_model::LanguageModelToolUseId::from("tool_1") =>
4030 {
4031 Some(result)
4032 }
4033 _ => None,
4034 })
4035 .collect();
4036
4037 assert_eq!(
4038 tool_results.len(),
4039 1,
4040 "Expected exactly 1 tool result for tool_1, got {}: {:#?}",
4041 tool_results.len(),
4042 tool_results
4043 );
4044
4045 let result = tool_results[0];
4046 assert!(result.is_error);
4047 let content_text = match &result.content {
4048 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
4049 other => panic!("Expected text content, got {:?}", other),
4050 };
4051 assert!(
4052 content_text.contains("Saw partial text 'partial' before invalid JSON"),
4053 "Expected tool-enriched partial context, got: {content_text}"
4054 );
4055 assert!(
4056 content_text
4057 .contains("Error parsing input JSON: EOF while parsing a string at line 1 column 17"),
4058 "Expected forwarded JSON parse error, got: {content_text}"
4059 );
4060 assert!(
4061 !content_text.contains("tool input was not fully received"),
4062 "Should not contain orphaned sender error, got: {content_text}"
4063 );
4064
4065 fake_model.send_last_completion_stream_text_chunk("Done");
4066 fake_model.end_last_completion_stream();
4067 cx.run_until_parked();
4068
4069 thread.read_with(cx, |thread, _cx| {
4070 assert!(
4071 thread.is_turn_complete(),
4072 "Thread should not be stuck; the turn should have completed",
4073 );
4074 });
4075}
4076
4077/// Filters out the stop events for asserting against in tests
4078fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
4079 result_events
4080 .into_iter()
4081 .filter_map(|event| match event.unwrap() {
4082 ThreadEvent::Stop(stop_reason) => Some(stop_reason),
4083 _ => None,
4084 })
4085 .collect()
4086}
4087
4088struct ThreadTest {
4089 model: Arc<dyn LanguageModel>,
4090 thread: Entity<Thread>,
4091 project_context: Entity<ProjectContext>,
4092 context_server_store: Entity<ContextServerStore>,
4093 fs: Arc<FakeFs>,
4094}
4095
4096enum TestModel {
4097 Sonnet4,
4098 Fake,
4099}
4100
4101impl TestModel {
4102 fn id(&self) -> LanguageModelId {
4103 match self {
4104 TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
4105 TestModel::Fake => unreachable!(),
4106 }
4107 }
4108}
4109
4110async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
4111 cx.executor().allow_parking();
4112
4113 let fs = FakeFs::new(cx.background_executor.clone());
4114 fs.create_dir(paths::settings_file().parent().unwrap())
4115 .await
4116 .unwrap();
4117 fs.insert_file(
4118 paths::settings_file(),
4119 json!({
4120 "agent": {
4121 "default_profile": "test-profile",
4122 "profiles": {
4123 "test-profile": {
4124 "name": "Test Profile",
4125 "tools": {
4126 EchoTool::NAME: true,
4127 DelayTool::NAME: true,
4128 WordListTool::NAME: true,
4129 ToolRequiringPermission::NAME: true,
4130 InfiniteTool::NAME: true,
4131 CancellationAwareTool::NAME: true,
4132 StreamingEchoTool::NAME: true,
4133 StreamingJsonErrorContextTool::NAME: true,
4134 StreamingFailingEchoTool::NAME: true,
4135 TerminalTool::NAME: true,
4136 UpdatePlanTool::NAME: true,
4137 }
4138 }
4139 }
4140 }
4141 })
4142 .to_string()
4143 .into_bytes(),
4144 )
4145 .await;
4146
4147 cx.update(|cx| {
4148 settings::init(cx);
4149
4150 match model {
4151 TestModel::Fake => {}
4152 TestModel::Sonnet4 => {
4153 gpui_tokio::init(cx);
4154 let http_client = ReqwestClient::user_agent("agent tests").unwrap();
4155 cx.set_http_client(Arc::new(http_client));
4156 let client = Client::production(cx);
4157 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
4158 language_model::init(cx);
4159 RefreshLlmTokenListener::register(client.clone(), user_store.clone(), cx);
4160 language_models::init(user_store, client.clone(), cx);
4161 }
4162 };
4163
4164 watch_settings(fs.clone(), cx);
4165 });
4166
4167 let templates = Templates::new();
4168
4169 fs.insert_tree(path!("/test"), json!({})).await;
4170 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
4171
4172 let model = cx
4173 .update(|cx| {
4174 if let TestModel::Fake = model {
4175 Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
4176 } else {
4177 let model_id = model.id();
4178 let models = LanguageModelRegistry::read_global(cx);
4179 let model = models
4180 .available_models(cx)
4181 .find(|model| model.id() == model_id)
4182 .unwrap();
4183
4184 let provider = models.provider(&model.provider_id()).unwrap();
4185 let authenticated = provider.authenticate(cx);
4186
4187 cx.spawn(async move |_cx| {
4188 authenticated.await.unwrap();
4189 model
4190 })
4191 }
4192 })
4193 .await;
4194
4195 let project_context = cx.new(|_cx| ProjectContext::default());
4196 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4197 let context_server_registry =
4198 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4199 let thread = cx.new(|cx| {
4200 Thread::new(
4201 project,
4202 project_context.clone(),
4203 context_server_registry,
4204 templates,
4205 Some(model.clone()),
4206 cx,
4207 )
4208 });
4209 ThreadTest {
4210 model,
4211 thread,
4212 project_context,
4213 context_server_store,
4214 fs,
4215 }
4216}
4217
4218#[cfg(test)]
4219#[ctor::ctor]
4220fn init_logger() {
4221 if std::env::var("RUST_LOG").is_ok() {
4222 env_logger::init();
4223 }
4224}
4225
4226fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4227 let fs = fs.clone();
4228 cx.spawn({
4229 async move |cx| {
4230 let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4231 cx.background_executor(),
4232 fs,
4233 paths::settings_file().clone(),
4234 );
4235 let _watcher_task = watcher_task;
4236
4237 while let Some(new_settings_content) = new_settings_content_rx.next().await {
4238 cx.update(|cx| {
4239 SettingsStore::update_global(cx, |settings, cx| {
4240 settings.set_user_settings(&new_settings_content, cx)
4241 })
4242 })
4243 .ok();
4244 }
4245 }
4246 })
4247 .detach();
4248}
4249
4250fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4251 completion
4252 .tools
4253 .iter()
4254 .map(|tool| tool.name.clone())
4255 .collect()
4256}
4257
4258fn setup_context_server(
4259 name: &'static str,
4260 tools: Vec<context_server::types::Tool>,
4261 context_server_store: &Entity<ContextServerStore>,
4262 cx: &mut TestAppContext,
4263) -> mpsc::UnboundedReceiver<(
4264 context_server::types::CallToolParams,
4265 oneshot::Sender<context_server::types::CallToolResponse>,
4266)> {
4267 cx.update(|cx| {
4268 let mut settings = ProjectSettings::get_global(cx).clone();
4269 settings.context_servers.insert(
4270 name.into(),
4271 project::project_settings::ContextServerSettings::Stdio {
4272 enabled: true,
4273 remote: false,
4274 command: ContextServerCommand {
4275 path: "somebinary".into(),
4276 args: Vec::new(),
4277 env: None,
4278 timeout: None,
4279 },
4280 },
4281 );
4282 ProjectSettings::override_global(settings, cx);
4283 });
4284
4285 let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4286 let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4287 .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4288 context_server::types::InitializeResponse {
4289 protocol_version: context_server::types::ProtocolVersion(
4290 context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4291 ),
4292 server_info: context_server::types::Implementation {
4293 name: name.into(),
4294 version: "1.0.0".to_string(),
4295 },
4296 capabilities: context_server::types::ServerCapabilities {
4297 tools: Some(context_server::types::ToolsCapabilities {
4298 list_changed: Some(true),
4299 }),
4300 ..Default::default()
4301 },
4302 meta: None,
4303 }
4304 })
4305 .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4306 let tools = tools.clone();
4307 async move {
4308 context_server::types::ListToolsResponse {
4309 tools,
4310 next_cursor: None,
4311 meta: None,
4312 }
4313 }
4314 })
4315 .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4316 let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4317 async move {
4318 let (response_tx, response_rx) = oneshot::channel();
4319 mcp_tool_calls_tx
4320 .unbounded_send((params, response_tx))
4321 .unwrap();
4322 response_rx.await.unwrap()
4323 }
4324 });
4325 context_server_store.update(cx, |store, cx| {
4326 store.start_server(
4327 Arc::new(ContextServer::new(
4328 ContextServerId(name.into()),
4329 Arc::new(fake_transport),
4330 )),
4331 cx,
4332 );
4333 });
4334 cx.run_until_parked();
4335 mcp_tool_calls_rx
4336}
4337
4338#[gpui::test]
4339async fn test_tokens_before_message(cx: &mut TestAppContext) {
4340 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4341 let fake_model = model.as_fake();
4342
4343 // First message
4344 let message_1_id = UserMessageId::new();
4345 thread
4346 .update(cx, |thread, cx| {
4347 thread.send(message_1_id.clone(), ["First message"], cx)
4348 })
4349 .unwrap();
4350 cx.run_until_parked();
4351
4352 // Before any response, tokens_before_message should return None for first message
4353 thread.read_with(cx, |thread, _| {
4354 assert_eq!(
4355 thread.tokens_before_message(&message_1_id),
4356 None,
4357 "First message should have no tokens before it"
4358 );
4359 });
4360
4361 // Complete first message with usage
4362 fake_model.send_last_completion_stream_text_chunk("Response 1");
4363 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4364 language_model::TokenUsage {
4365 input_tokens: 100,
4366 output_tokens: 50,
4367 cache_creation_input_tokens: 0,
4368 cache_read_input_tokens: 0,
4369 },
4370 ));
4371 fake_model.end_last_completion_stream();
4372 cx.run_until_parked();
4373
4374 // First message still has no tokens before it
4375 thread.read_with(cx, |thread, _| {
4376 assert_eq!(
4377 thread.tokens_before_message(&message_1_id),
4378 None,
4379 "First message should still have no tokens before it after response"
4380 );
4381 });
4382
4383 // Second message
4384 let message_2_id = UserMessageId::new();
4385 thread
4386 .update(cx, |thread, cx| {
4387 thread.send(message_2_id.clone(), ["Second message"], cx)
4388 })
4389 .unwrap();
4390 cx.run_until_parked();
4391
4392 // Second message should have first message's input tokens before it
4393 thread.read_with(cx, |thread, _| {
4394 assert_eq!(
4395 thread.tokens_before_message(&message_2_id),
4396 Some(100),
4397 "Second message should have 100 tokens before it (from first request)"
4398 );
4399 });
4400
4401 // Complete second message
4402 fake_model.send_last_completion_stream_text_chunk("Response 2");
4403 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4404 language_model::TokenUsage {
4405 input_tokens: 250, // Total for this request (includes previous context)
4406 output_tokens: 75,
4407 cache_creation_input_tokens: 0,
4408 cache_read_input_tokens: 0,
4409 },
4410 ));
4411 fake_model.end_last_completion_stream();
4412 cx.run_until_parked();
4413
4414 // Third message
4415 let message_3_id = UserMessageId::new();
4416 thread
4417 .update(cx, |thread, cx| {
4418 thread.send(message_3_id.clone(), ["Third message"], cx)
4419 })
4420 .unwrap();
4421 cx.run_until_parked();
4422
4423 // Third message should have second message's input tokens (250) before it
4424 thread.read_with(cx, |thread, _| {
4425 assert_eq!(
4426 thread.tokens_before_message(&message_3_id),
4427 Some(250),
4428 "Third message should have 250 tokens before it (from second request)"
4429 );
4430 // Second message should still have 100
4431 assert_eq!(
4432 thread.tokens_before_message(&message_2_id),
4433 Some(100),
4434 "Second message should still have 100 tokens before it"
4435 );
4436 // First message still has none
4437 assert_eq!(
4438 thread.tokens_before_message(&message_1_id),
4439 None,
4440 "First message should still have no tokens before it"
4441 );
4442 });
4443}
4444
4445#[gpui::test]
4446async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4447 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4448 let fake_model = model.as_fake();
4449
4450 // Set up three messages with responses
4451 let message_1_id = UserMessageId::new();
4452 thread
4453 .update(cx, |thread, cx| {
4454 thread.send(message_1_id.clone(), ["Message 1"], cx)
4455 })
4456 .unwrap();
4457 cx.run_until_parked();
4458 fake_model.send_last_completion_stream_text_chunk("Response 1");
4459 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4460 language_model::TokenUsage {
4461 input_tokens: 100,
4462 output_tokens: 50,
4463 cache_creation_input_tokens: 0,
4464 cache_read_input_tokens: 0,
4465 },
4466 ));
4467 fake_model.end_last_completion_stream();
4468 cx.run_until_parked();
4469
4470 let message_2_id = UserMessageId::new();
4471 thread
4472 .update(cx, |thread, cx| {
4473 thread.send(message_2_id.clone(), ["Message 2"], cx)
4474 })
4475 .unwrap();
4476 cx.run_until_parked();
4477 fake_model.send_last_completion_stream_text_chunk("Response 2");
4478 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4479 language_model::TokenUsage {
4480 input_tokens: 250,
4481 output_tokens: 75,
4482 cache_creation_input_tokens: 0,
4483 cache_read_input_tokens: 0,
4484 },
4485 ));
4486 fake_model.end_last_completion_stream();
4487 cx.run_until_parked();
4488
4489 // Verify initial state
4490 thread.read_with(cx, |thread, _| {
4491 assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4492 });
4493
4494 // Truncate at message 2 (removes message 2 and everything after)
4495 thread
4496 .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4497 .unwrap();
4498 cx.run_until_parked();
4499
4500 // After truncation, message_2_id no longer exists, so lookup should return None
4501 thread.read_with(cx, |thread, _| {
4502 assert_eq!(
4503 thread.tokens_before_message(&message_2_id),
4504 None,
4505 "After truncation, message 2 no longer exists"
4506 );
4507 // Message 1 still exists but has no tokens before it
4508 assert_eq!(
4509 thread.tokens_before_message(&message_1_id),
4510 None,
4511 "First message still has no tokens before it"
4512 );
4513 });
4514}
4515
4516#[gpui::test]
4517async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4518 init_test(cx);
4519
4520 let fs = FakeFs::new(cx.executor());
4521 fs.insert_tree("/root", json!({})).await;
4522 let project = Project::test(fs, ["/root".as_ref()], cx).await;
4523
4524 // Test 1: Deny rule blocks command
4525 {
4526 let environment = Rc::new(cx.update(|cx| {
4527 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4528 }));
4529
4530 cx.update(|cx| {
4531 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4532 settings.tool_permissions.tools.insert(
4533 TerminalTool::NAME.into(),
4534 agent_settings::ToolRules {
4535 default: Some(settings::ToolPermissionMode::Confirm),
4536 always_allow: vec![],
4537 always_deny: vec![
4538 agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4539 ],
4540 always_confirm: vec![],
4541 invalid_patterns: vec![],
4542 },
4543 );
4544 agent_settings::AgentSettings::override_global(settings, cx);
4545 });
4546
4547 #[allow(clippy::arc_with_non_send_sync)]
4548 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4549 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4550
4551 let task = cx.update(|cx| {
4552 tool.run(
4553 ToolInput::resolved(crate::TerminalToolInput {
4554 command: "rm -rf /".to_string(),
4555 cd: ".".to_string(),
4556 timeout_ms: None,
4557 }),
4558 event_stream,
4559 cx,
4560 )
4561 });
4562
4563 let result = task.await;
4564 assert!(
4565 result.is_err(),
4566 "expected command to be blocked by deny rule"
4567 );
4568 let err_msg = result.unwrap_err().to_lowercase();
4569 assert!(
4570 err_msg.contains("blocked"),
4571 "error should mention the command was blocked"
4572 );
4573 }
4574
4575 // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4576 {
4577 let environment = Rc::new(cx.update(|cx| {
4578 FakeThreadEnvironment::default()
4579 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4580 }));
4581
4582 cx.update(|cx| {
4583 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4584 settings.tool_permissions.tools.insert(
4585 TerminalTool::NAME.into(),
4586 agent_settings::ToolRules {
4587 default: Some(settings::ToolPermissionMode::Deny),
4588 always_allow: vec![
4589 agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4590 ],
4591 always_deny: vec![],
4592 always_confirm: vec![],
4593 invalid_patterns: vec![],
4594 },
4595 );
4596 agent_settings::AgentSettings::override_global(settings, cx);
4597 });
4598
4599 #[allow(clippy::arc_with_non_send_sync)]
4600 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4601 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4602
4603 let task = cx.update(|cx| {
4604 tool.run(
4605 ToolInput::resolved(crate::TerminalToolInput {
4606 command: "echo hello".to_string(),
4607 cd: ".".to_string(),
4608 timeout_ms: None,
4609 }),
4610 event_stream,
4611 cx,
4612 )
4613 });
4614
4615 let update = rx.expect_update_fields().await;
4616 assert!(
4617 update.content.iter().any(|blocks| {
4618 blocks
4619 .iter()
4620 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4621 }),
4622 "expected terminal content (allow rule should skip confirmation and override default deny)"
4623 );
4624
4625 let result = task.await;
4626 assert!(
4627 result.is_ok(),
4628 "expected command to succeed without confirmation"
4629 );
4630 }
4631
4632 // Test 3: global default: allow does NOT override always_confirm patterns
4633 {
4634 let environment = Rc::new(cx.update(|cx| {
4635 FakeThreadEnvironment::default()
4636 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4637 }));
4638
4639 cx.update(|cx| {
4640 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4641 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4642 settings.tool_permissions.tools.insert(
4643 TerminalTool::NAME.into(),
4644 agent_settings::ToolRules {
4645 default: Some(settings::ToolPermissionMode::Allow),
4646 always_allow: vec![],
4647 always_deny: vec![],
4648 always_confirm: vec![
4649 agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4650 ],
4651 invalid_patterns: vec![],
4652 },
4653 );
4654 agent_settings::AgentSettings::override_global(settings, cx);
4655 });
4656
4657 #[allow(clippy::arc_with_non_send_sync)]
4658 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4659 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4660
4661 let _task = cx.update(|cx| {
4662 tool.run(
4663 ToolInput::resolved(crate::TerminalToolInput {
4664 command: "sudo rm file".to_string(),
4665 cd: ".".to_string(),
4666 timeout_ms: None,
4667 }),
4668 event_stream,
4669 cx,
4670 )
4671 });
4672
4673 // With global default: allow, confirm patterns are still respected
4674 // The expect_authorization() call will panic if no authorization is requested,
4675 // which validates that the confirm pattern still triggers confirmation
4676 let _auth = rx.expect_authorization().await;
4677
4678 drop(_task);
4679 }
4680
4681 // Test 4: tool-specific default: deny is respected even with global default: allow
4682 {
4683 let environment = Rc::new(cx.update(|cx| {
4684 FakeThreadEnvironment::default()
4685 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4686 }));
4687
4688 cx.update(|cx| {
4689 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4690 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4691 settings.tool_permissions.tools.insert(
4692 TerminalTool::NAME.into(),
4693 agent_settings::ToolRules {
4694 default: Some(settings::ToolPermissionMode::Deny),
4695 always_allow: vec![],
4696 always_deny: vec![],
4697 always_confirm: vec![],
4698 invalid_patterns: vec![],
4699 },
4700 );
4701 agent_settings::AgentSettings::override_global(settings, cx);
4702 });
4703
4704 #[allow(clippy::arc_with_non_send_sync)]
4705 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4706 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4707
4708 let task = cx.update(|cx| {
4709 tool.run(
4710 ToolInput::resolved(crate::TerminalToolInput {
4711 command: "echo hello".to_string(),
4712 cd: ".".to_string(),
4713 timeout_ms: None,
4714 }),
4715 event_stream,
4716 cx,
4717 )
4718 });
4719
4720 // tool-specific default: deny is respected even with global default: allow
4721 let result = task.await;
4722 assert!(
4723 result.is_err(),
4724 "expected command to be blocked by tool-specific deny default"
4725 );
4726 let err_msg = result.unwrap_err().to_lowercase();
4727 assert!(
4728 err_msg.contains("disabled"),
4729 "error should mention the tool is disabled, got: {err_msg}"
4730 );
4731 }
4732}
4733
4734#[gpui::test]
4735async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4736 init_test(cx);
4737 cx.update(|cx| {
4738 LanguageModelRegistry::test(cx);
4739 });
4740 cx.update(|cx| {
4741 cx.update_flags(true, vec!["subagents".to_string()]);
4742 });
4743
4744 let fs = FakeFs::new(cx.executor());
4745 fs.insert_tree(
4746 "/",
4747 json!({
4748 "a": {
4749 "b.md": "Lorem"
4750 }
4751 }),
4752 )
4753 .await;
4754 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4755 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4756 let agent = cx.update(|cx| {
4757 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4758 });
4759 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4760
4761 let acp_thread = cx
4762 .update(|cx| {
4763 connection
4764 .clone()
4765 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4766 })
4767 .await
4768 .unwrap();
4769 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4770 let thread = agent.read_with(cx, |agent, _| {
4771 agent.sessions.get(&session_id).unwrap().thread.clone()
4772 });
4773 let model = Arc::new(FakeLanguageModel::default());
4774
4775 // Ensure empty threads are not saved, even if they get mutated.
4776 thread.update(cx, |thread, cx| {
4777 thread.set_model(model.clone(), cx);
4778 });
4779 cx.run_until_parked();
4780
4781 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4782 cx.run_until_parked();
4783 model.send_last_completion_stream_text_chunk("spawning subagent");
4784 let subagent_tool_input = SpawnAgentToolInput {
4785 label: "label".to_string(),
4786 message: "subagent task prompt".to_string(),
4787 session_id: None,
4788 };
4789 let subagent_tool_use = LanguageModelToolUse {
4790 id: "subagent_1".into(),
4791 name: SpawnAgentTool::NAME.into(),
4792 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4793 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4794 is_input_complete: true,
4795 thought_signature: None,
4796 };
4797 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4798 subagent_tool_use,
4799 ));
4800 model.end_last_completion_stream();
4801
4802 cx.run_until_parked();
4803
4804 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4805 thread
4806 .running_subagent_ids(cx)
4807 .get(0)
4808 .expect("subagent thread should be running")
4809 .clone()
4810 });
4811
4812 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4813 agent
4814 .sessions
4815 .get(&subagent_session_id)
4816 .expect("subagent session should exist")
4817 .acp_thread
4818 .clone()
4819 });
4820
4821 model.send_last_completion_stream_text_chunk("subagent task response");
4822 model.end_last_completion_stream();
4823
4824 cx.run_until_parked();
4825
4826 assert_eq!(
4827 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4828 indoc! {"
4829 ## User
4830
4831 subagent task prompt
4832
4833 ## Assistant
4834
4835 subagent task response
4836
4837 "}
4838 );
4839
4840 model.send_last_completion_stream_text_chunk("Response");
4841 model.end_last_completion_stream();
4842
4843 send.await.unwrap();
4844
4845 assert_eq!(
4846 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4847 indoc! {r#"
4848 ## User
4849
4850 Prompt
4851
4852 ## Assistant
4853
4854 spawning subagent
4855
4856 **Tool Call: label**
4857 Status: Completed
4858
4859 subagent task response
4860
4861 ## Assistant
4862
4863 Response
4864
4865 "#},
4866 );
4867}
4868
4869#[gpui::test]
4870async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4871 init_test(cx);
4872 cx.update(|cx| {
4873 LanguageModelRegistry::test(cx);
4874 });
4875 cx.update(|cx| {
4876 cx.update_flags(true, vec!["subagents".to_string()]);
4877 });
4878
4879 let fs = FakeFs::new(cx.executor());
4880 fs.insert_tree(
4881 "/",
4882 json!({
4883 "a": {
4884 "b.md": "Lorem"
4885 }
4886 }),
4887 )
4888 .await;
4889 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4890 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4891 let agent = cx.update(|cx| {
4892 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4893 });
4894 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4895
4896 let acp_thread = cx
4897 .update(|cx| {
4898 connection
4899 .clone()
4900 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4901 })
4902 .await
4903 .unwrap();
4904 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4905 let thread = agent.read_with(cx, |agent, _| {
4906 agent.sessions.get(&session_id).unwrap().thread.clone()
4907 });
4908 let model = Arc::new(FakeLanguageModel::default());
4909
4910 // Ensure empty threads are not saved, even if they get mutated.
4911 thread.update(cx, |thread, cx| {
4912 thread.set_model(model.clone(), cx);
4913 });
4914 cx.run_until_parked();
4915
4916 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4917 cx.run_until_parked();
4918 model.send_last_completion_stream_text_chunk("spawning subagent");
4919 let subagent_tool_input = SpawnAgentToolInput {
4920 label: "label".to_string(),
4921 message: "subagent task prompt".to_string(),
4922 session_id: None,
4923 };
4924 let subagent_tool_use = LanguageModelToolUse {
4925 id: "subagent_1".into(),
4926 name: SpawnAgentTool::NAME.into(),
4927 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4928 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4929 is_input_complete: true,
4930 thought_signature: None,
4931 };
4932 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4933 subagent_tool_use,
4934 ));
4935 model.end_last_completion_stream();
4936
4937 cx.run_until_parked();
4938
4939 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4940 thread
4941 .running_subagent_ids(cx)
4942 .get(0)
4943 .expect("subagent thread should be running")
4944 .clone()
4945 });
4946
4947 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4948 agent
4949 .sessions
4950 .get(&subagent_session_id)
4951 .expect("subagent session should exist")
4952 .acp_thread
4953 .clone()
4954 });
4955
4956 model.send_last_completion_stream_text_chunk("subagent task response 1");
4957 model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4958 text: "thinking more about the subagent task".into(),
4959 signature: None,
4960 });
4961 model.send_last_completion_stream_text_chunk("subagent task response 2");
4962 model.end_last_completion_stream();
4963
4964 cx.run_until_parked();
4965
4966 assert_eq!(
4967 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4968 indoc! {"
4969 ## User
4970
4971 subagent task prompt
4972
4973 ## Assistant
4974
4975 subagent task response 1
4976
4977 <thinking>
4978 thinking more about the subagent task
4979 </thinking>
4980
4981 subagent task response 2
4982
4983 "}
4984 );
4985
4986 model.send_last_completion_stream_text_chunk("Response");
4987 model.end_last_completion_stream();
4988
4989 send.await.unwrap();
4990
4991 assert_eq!(
4992 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4993 indoc! {r#"
4994 ## User
4995
4996 Prompt
4997
4998 ## Assistant
4999
5000 spawning subagent
5001
5002 **Tool Call: label**
5003 Status: Completed
5004
5005 subagent task response 1
5006
5007 subagent task response 2
5008
5009 ## Assistant
5010
5011 Response
5012
5013 "#},
5014 );
5015}
5016
5017#[gpui::test]
5018async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
5019 init_test(cx);
5020 cx.update(|cx| {
5021 LanguageModelRegistry::test(cx);
5022 });
5023 cx.update(|cx| {
5024 cx.update_flags(true, vec!["subagents".to_string()]);
5025 });
5026
5027 let fs = FakeFs::new(cx.executor());
5028 fs.insert_tree(
5029 "/",
5030 json!({
5031 "a": {
5032 "b.md": "Lorem"
5033 }
5034 }),
5035 )
5036 .await;
5037 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5038 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5039 let agent = cx.update(|cx| {
5040 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5041 });
5042 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5043
5044 let acp_thread = cx
5045 .update(|cx| {
5046 connection
5047 .clone()
5048 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5049 })
5050 .await
5051 .unwrap();
5052 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5053 let thread = agent.read_with(cx, |agent, _| {
5054 agent.sessions.get(&session_id).unwrap().thread.clone()
5055 });
5056 let model = Arc::new(FakeLanguageModel::default());
5057
5058 // Ensure empty threads are not saved, even if they get mutated.
5059 thread.update(cx, |thread, cx| {
5060 thread.set_model(model.clone(), cx);
5061 });
5062 cx.run_until_parked();
5063
5064 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5065 cx.run_until_parked();
5066 model.send_last_completion_stream_text_chunk("spawning subagent");
5067 let subagent_tool_input = SpawnAgentToolInput {
5068 label: "label".to_string(),
5069 message: "subagent task prompt".to_string(),
5070 session_id: None,
5071 };
5072 let subagent_tool_use = LanguageModelToolUse {
5073 id: "subagent_1".into(),
5074 name: SpawnAgentTool::NAME.into(),
5075 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5076 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5077 is_input_complete: true,
5078 thought_signature: None,
5079 };
5080 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5081 subagent_tool_use,
5082 ));
5083 model.end_last_completion_stream();
5084
5085 cx.run_until_parked();
5086
5087 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5088 thread
5089 .running_subagent_ids(cx)
5090 .get(0)
5091 .expect("subagent thread should be running")
5092 .clone()
5093 });
5094 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5095 agent
5096 .sessions
5097 .get(&subagent_session_id)
5098 .expect("subagent session should exist")
5099 .acp_thread
5100 .clone()
5101 });
5102
5103 // model.send_last_completion_stream_text_chunk("subagent task response");
5104 // model.end_last_completion_stream();
5105
5106 // cx.run_until_parked();
5107
5108 acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
5109
5110 cx.run_until_parked();
5111
5112 send.await.unwrap();
5113
5114 acp_thread.read_with(cx, |thread, cx| {
5115 assert_eq!(thread.status(), ThreadStatus::Idle);
5116 assert_eq!(
5117 thread.to_markdown(cx),
5118 indoc! {"
5119 ## User
5120
5121 Prompt
5122
5123 ## Assistant
5124
5125 spawning subagent
5126
5127 **Tool Call: label**
5128 Status: Canceled
5129
5130 "}
5131 );
5132 });
5133 subagent_acp_thread.read_with(cx, |thread, cx| {
5134 assert_eq!(thread.status(), ThreadStatus::Idle);
5135 assert_eq!(
5136 thread.to_markdown(cx),
5137 indoc! {"
5138 ## User
5139
5140 subagent task prompt
5141
5142 "}
5143 );
5144 });
5145}
5146
5147#[gpui::test]
5148async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
5149 init_test(cx);
5150 cx.update(|cx| {
5151 LanguageModelRegistry::test(cx);
5152 });
5153 cx.update(|cx| {
5154 cx.update_flags(true, vec!["subagents".to_string()]);
5155 });
5156
5157 let fs = FakeFs::new(cx.executor());
5158 fs.insert_tree(
5159 "/",
5160 json!({
5161 "a": {
5162 "b.md": "Lorem"
5163 }
5164 }),
5165 )
5166 .await;
5167 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5168 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5169 let agent = cx.update(|cx| {
5170 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5171 });
5172 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5173
5174 let acp_thread = cx
5175 .update(|cx| {
5176 connection
5177 .clone()
5178 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5179 })
5180 .await
5181 .unwrap();
5182 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5183 let thread = agent.read_with(cx, |agent, _| {
5184 agent.sessions.get(&session_id).unwrap().thread.clone()
5185 });
5186 let model = Arc::new(FakeLanguageModel::default());
5187
5188 thread.update(cx, |thread, cx| {
5189 thread.set_model(model.clone(), cx);
5190 });
5191 cx.run_until_parked();
5192
5193 // === First turn: create subagent ===
5194 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5195 cx.run_until_parked();
5196 model.send_last_completion_stream_text_chunk("spawning subagent");
5197 let subagent_tool_input = SpawnAgentToolInput {
5198 label: "initial task".to_string(),
5199 message: "do the first task".to_string(),
5200 session_id: None,
5201 };
5202 let subagent_tool_use = LanguageModelToolUse {
5203 id: "subagent_1".into(),
5204 name: SpawnAgentTool::NAME.into(),
5205 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5206 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5207 is_input_complete: true,
5208 thought_signature: None,
5209 };
5210 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5211 subagent_tool_use,
5212 ));
5213 model.end_last_completion_stream();
5214
5215 cx.run_until_parked();
5216
5217 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5218 thread
5219 .running_subagent_ids(cx)
5220 .get(0)
5221 .expect("subagent thread should be running")
5222 .clone()
5223 });
5224
5225 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5226 agent
5227 .sessions
5228 .get(&subagent_session_id)
5229 .expect("subagent session should exist")
5230 .acp_thread
5231 .clone()
5232 });
5233
5234 // Subagent responds
5235 model.send_last_completion_stream_text_chunk("first task response");
5236 model.end_last_completion_stream();
5237
5238 cx.run_until_parked();
5239
5240 // Parent model responds to complete first turn
5241 model.send_last_completion_stream_text_chunk("First response");
5242 model.end_last_completion_stream();
5243
5244 send.await.unwrap();
5245
5246 // Verify subagent is no longer running
5247 thread.read_with(cx, |thread, cx| {
5248 assert!(
5249 thread.running_subagent_ids(cx).is_empty(),
5250 "subagent should not be running after completion"
5251 );
5252 });
5253
5254 // === Second turn: resume subagent with session_id ===
5255 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5256 cx.run_until_parked();
5257 model.send_last_completion_stream_text_chunk("resuming subagent");
5258 let resume_tool_input = SpawnAgentToolInput {
5259 label: "follow-up task".to_string(),
5260 message: "do the follow-up task".to_string(),
5261 session_id: Some(subagent_session_id.clone()),
5262 };
5263 let resume_tool_use = LanguageModelToolUse {
5264 id: "subagent_2".into(),
5265 name: SpawnAgentTool::NAME.into(),
5266 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5267 input: serde_json::to_value(&resume_tool_input).unwrap(),
5268 is_input_complete: true,
5269 thought_signature: None,
5270 };
5271 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5272 model.end_last_completion_stream();
5273
5274 cx.run_until_parked();
5275
5276 // Subagent should be running again with the same session
5277 thread.read_with(cx, |thread, cx| {
5278 let running = thread.running_subagent_ids(cx);
5279 assert_eq!(running.len(), 1, "subagent should be running");
5280 assert_eq!(running[0], subagent_session_id, "should be same session");
5281 });
5282
5283 // Subagent responds to follow-up
5284 model.send_last_completion_stream_text_chunk("follow-up task response");
5285 model.end_last_completion_stream();
5286
5287 cx.run_until_parked();
5288
5289 // Parent model responds to complete second turn
5290 model.send_last_completion_stream_text_chunk("Second response");
5291 model.end_last_completion_stream();
5292
5293 send2.await.unwrap();
5294
5295 // Verify subagent is no longer running
5296 thread.read_with(cx, |thread, cx| {
5297 assert!(
5298 thread.running_subagent_ids(cx).is_empty(),
5299 "subagent should not be running after resume completion"
5300 );
5301 });
5302
5303 // Verify the subagent's acp thread has both conversation turns
5304 assert_eq!(
5305 subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5306 indoc! {"
5307 ## User
5308
5309 do the first task
5310
5311 ## Assistant
5312
5313 first task response
5314
5315 ## User
5316
5317 do the follow-up task
5318
5319 ## Assistant
5320
5321 follow-up task response
5322
5323 "}
5324 );
5325}
5326
5327#[gpui::test]
5328async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5329 init_test(cx);
5330
5331 cx.update(|cx| {
5332 cx.update_flags(true, vec!["subagents".to_string()]);
5333 });
5334
5335 let fs = FakeFs::new(cx.executor());
5336 fs.insert_tree(path!("/test"), json!({})).await;
5337 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5338 let project_context = cx.new(|_cx| ProjectContext::default());
5339 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5340 let context_server_registry =
5341 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5342 let model = Arc::new(FakeLanguageModel::default());
5343
5344 let parent_thread = cx.new(|cx| {
5345 Thread::new(
5346 project.clone(),
5347 project_context,
5348 context_server_registry,
5349 Templates::new(),
5350 Some(model.clone()),
5351 cx,
5352 )
5353 });
5354
5355 let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5356 subagent_thread.read_with(cx, |subagent_thread, cx| {
5357 assert!(subagent_thread.is_subagent());
5358 assert_eq!(subagent_thread.depth(), 1);
5359 assert_eq!(
5360 subagent_thread.model().map(|model| model.id()),
5361 Some(model.id())
5362 );
5363 assert_eq!(
5364 subagent_thread.parent_thread_id(),
5365 Some(parent_thread.read(cx).id().clone())
5366 );
5367
5368 let request = subagent_thread
5369 .build_completion_request(CompletionIntent::UserPrompt, cx)
5370 .unwrap();
5371 assert_eq!(request.intent, Some(CompletionIntent::Subagent));
5372 });
5373}
5374
5375#[gpui::test]
5376async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5377 init_test(cx);
5378
5379 cx.update(|cx| {
5380 cx.update_flags(true, vec!["subagents".to_string()]);
5381 });
5382
5383 let fs = FakeFs::new(cx.executor());
5384 fs.insert_tree(path!("/test"), json!({})).await;
5385 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5386 let project_context = cx.new(|_cx| ProjectContext::default());
5387 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5388 let context_server_registry =
5389 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5390 let model = Arc::new(FakeLanguageModel::default());
5391 let environment = Rc::new(cx.update(|cx| {
5392 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5393 }));
5394
5395 let deep_parent_thread = cx.new(|cx| {
5396 let mut thread = Thread::new(
5397 project.clone(),
5398 project_context,
5399 context_server_registry,
5400 Templates::new(),
5401 Some(model.clone()),
5402 cx,
5403 );
5404 thread.set_subagent_context(SubagentContext {
5405 parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5406 depth: MAX_SUBAGENT_DEPTH - 1,
5407 });
5408 thread
5409 });
5410 let deep_subagent_thread = cx.new(|cx| {
5411 let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5412 thread.add_default_tools(environment, cx);
5413 thread
5414 });
5415
5416 deep_subagent_thread.read_with(cx, |thread, _| {
5417 assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5418 assert!(
5419 !thread.has_registered_tool(SpawnAgentTool::NAME),
5420 "subagent tool should not be present at max depth"
5421 );
5422 });
5423}
5424
5425#[gpui::test]
5426async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5427 init_test(cx);
5428
5429 cx.update(|cx| {
5430 cx.update_flags(true, vec!["subagents".to_string()]);
5431 });
5432
5433 let fs = FakeFs::new(cx.executor());
5434 fs.insert_tree(path!("/test"), json!({})).await;
5435 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5436 let project_context = cx.new(|_cx| ProjectContext::default());
5437 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5438 let context_server_registry =
5439 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5440 let model = Arc::new(FakeLanguageModel::default());
5441
5442 let parent = cx.new(|cx| {
5443 Thread::new(
5444 project.clone(),
5445 project_context.clone(),
5446 context_server_registry.clone(),
5447 Templates::new(),
5448 Some(model.clone()),
5449 cx,
5450 )
5451 });
5452
5453 let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5454
5455 parent.update(cx, |thread, _cx| {
5456 thread.register_running_subagent(subagent.downgrade());
5457 });
5458
5459 subagent
5460 .update(cx, |thread, cx| {
5461 thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5462 })
5463 .unwrap();
5464 cx.run_until_parked();
5465
5466 subagent.read_with(cx, |thread, _| {
5467 assert!(!thread.is_turn_complete(), "subagent should be running");
5468 });
5469
5470 parent.update(cx, |thread, cx| {
5471 thread.cancel(cx).detach();
5472 });
5473
5474 subagent.read_with(cx, |thread, _| {
5475 assert!(
5476 thread.is_turn_complete(),
5477 "subagent should be cancelled when parent cancels"
5478 );
5479 });
5480}
5481
5482#[gpui::test]
5483async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5484 init_test(cx);
5485 cx.update(|cx| {
5486 LanguageModelRegistry::test(cx);
5487 });
5488 cx.update(|cx| {
5489 cx.update_flags(true, vec!["subagents".to_string()]);
5490 });
5491
5492 let fs = FakeFs::new(cx.executor());
5493 fs.insert_tree(
5494 "/",
5495 json!({
5496 "a": {
5497 "b.md": "Lorem"
5498 }
5499 }),
5500 )
5501 .await;
5502 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5503 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5504 let agent = cx.update(|cx| {
5505 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5506 });
5507 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5508
5509 let acp_thread = cx
5510 .update(|cx| {
5511 connection
5512 .clone()
5513 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5514 })
5515 .await
5516 .unwrap();
5517 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5518 let thread = agent.read_with(cx, |agent, _| {
5519 agent.sessions.get(&session_id).unwrap().thread.clone()
5520 });
5521 let model = Arc::new(FakeLanguageModel::default());
5522
5523 thread.update(cx, |thread, cx| {
5524 thread.set_model(model.clone(), cx);
5525 });
5526 cx.run_until_parked();
5527
5528 // Start the parent turn
5529 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5530 cx.run_until_parked();
5531 model.send_last_completion_stream_text_chunk("spawning subagent");
5532 let subagent_tool_input = SpawnAgentToolInput {
5533 label: "label".to_string(),
5534 message: "subagent task prompt".to_string(),
5535 session_id: None,
5536 };
5537 let subagent_tool_use = LanguageModelToolUse {
5538 id: "subagent_1".into(),
5539 name: SpawnAgentTool::NAME.into(),
5540 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5541 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5542 is_input_complete: true,
5543 thought_signature: None,
5544 };
5545 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5546 subagent_tool_use,
5547 ));
5548 model.end_last_completion_stream();
5549
5550 cx.run_until_parked();
5551
5552 // Verify subagent is running
5553 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5554 thread
5555 .running_subagent_ids(cx)
5556 .get(0)
5557 .expect("subagent thread should be running")
5558 .clone()
5559 });
5560
5561 // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5562 model.send_last_completion_stream_text_chunk("partial work");
5563 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5564 TokenUsage {
5565 input_tokens: 850_000,
5566 output_tokens: 0,
5567 cache_creation_input_tokens: 0,
5568 cache_read_input_tokens: 0,
5569 },
5570 ));
5571
5572 cx.run_until_parked();
5573
5574 // The subagent should no longer be running
5575 thread.read_with(cx, |thread, cx| {
5576 assert!(
5577 thread.running_subagent_ids(cx).is_empty(),
5578 "subagent should be stopped after context window warning"
5579 );
5580 });
5581
5582 // The parent model should get a new completion request to respond to the tool error
5583 model.send_last_completion_stream_text_chunk("Response after warning");
5584 model.end_last_completion_stream();
5585
5586 send.await.unwrap();
5587
5588 // Verify the parent thread shows the warning error in the tool call
5589 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5590 assert!(
5591 markdown.contains("nearing the end of its context window"),
5592 "tool output should contain context window warning message, got:\n{markdown}"
5593 );
5594 assert!(
5595 markdown.contains("Status: Failed"),
5596 "tool call should have Failed status, got:\n{markdown}"
5597 );
5598
5599 // Verify the subagent session still exists (can be resumed)
5600 agent.read_with(cx, |agent, _cx| {
5601 assert!(
5602 agent.sessions.contains_key(&subagent_session_id),
5603 "subagent session should still exist for potential resume"
5604 );
5605 });
5606}
5607
5608#[gpui::test]
5609async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5610 init_test(cx);
5611 cx.update(|cx| {
5612 LanguageModelRegistry::test(cx);
5613 });
5614 cx.update(|cx| {
5615 cx.update_flags(true, vec!["subagents".to_string()]);
5616 });
5617
5618 let fs = FakeFs::new(cx.executor());
5619 fs.insert_tree(
5620 "/",
5621 json!({
5622 "a": {
5623 "b.md": "Lorem"
5624 }
5625 }),
5626 )
5627 .await;
5628 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5629 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5630 let agent = cx.update(|cx| {
5631 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5632 });
5633 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5634
5635 let acp_thread = cx
5636 .update(|cx| {
5637 connection
5638 .clone()
5639 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5640 })
5641 .await
5642 .unwrap();
5643 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5644 let thread = agent.read_with(cx, |agent, _| {
5645 agent.sessions.get(&session_id).unwrap().thread.clone()
5646 });
5647 let model = Arc::new(FakeLanguageModel::default());
5648
5649 thread.update(cx, |thread, cx| {
5650 thread.set_model(model.clone(), cx);
5651 });
5652 cx.run_until_parked();
5653
5654 // === First turn: create subagent, trigger context window warning ===
5655 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5656 cx.run_until_parked();
5657 model.send_last_completion_stream_text_chunk("spawning subagent");
5658 let subagent_tool_input = SpawnAgentToolInput {
5659 label: "initial task".to_string(),
5660 message: "do the first task".to_string(),
5661 session_id: None,
5662 };
5663 let subagent_tool_use = LanguageModelToolUse {
5664 id: "subagent_1".into(),
5665 name: SpawnAgentTool::NAME.into(),
5666 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5667 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5668 is_input_complete: true,
5669 thought_signature: None,
5670 };
5671 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5672 subagent_tool_use,
5673 ));
5674 model.end_last_completion_stream();
5675
5676 cx.run_until_parked();
5677
5678 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5679 thread
5680 .running_subagent_ids(cx)
5681 .get(0)
5682 .expect("subagent thread should be running")
5683 .clone()
5684 });
5685
5686 // Subagent sends a usage update that crosses the warning threshold.
5687 // This triggers Normal→Warning, stopping the subagent.
5688 model.send_last_completion_stream_text_chunk("partial work");
5689 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5690 TokenUsage {
5691 input_tokens: 850_000,
5692 output_tokens: 0,
5693 cache_creation_input_tokens: 0,
5694 cache_read_input_tokens: 0,
5695 },
5696 ));
5697
5698 cx.run_until_parked();
5699
5700 // Verify the first turn was stopped with a context window warning
5701 thread.read_with(cx, |thread, cx| {
5702 assert!(
5703 thread.running_subagent_ids(cx).is_empty(),
5704 "subagent should be stopped after context window warning"
5705 );
5706 });
5707
5708 // Parent model responds to complete first turn
5709 model.send_last_completion_stream_text_chunk("First response");
5710 model.end_last_completion_stream();
5711
5712 send.await.unwrap();
5713
5714 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5715 assert!(
5716 markdown.contains("nearing the end of its context window"),
5717 "first turn should have context window warning, got:\n{markdown}"
5718 );
5719
5720 // === Second turn: resume the same subagent (now at Warning level) ===
5721 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5722 cx.run_until_parked();
5723 model.send_last_completion_stream_text_chunk("resuming subagent");
5724 let resume_tool_input = SpawnAgentToolInput {
5725 label: "follow-up task".to_string(),
5726 message: "do the follow-up task".to_string(),
5727 session_id: Some(subagent_session_id.clone()),
5728 };
5729 let resume_tool_use = LanguageModelToolUse {
5730 id: "subagent_2".into(),
5731 name: SpawnAgentTool::NAME.into(),
5732 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5733 input: serde_json::to_value(&resume_tool_input).unwrap(),
5734 is_input_complete: true,
5735 thought_signature: None,
5736 };
5737 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5738 model.end_last_completion_stream();
5739
5740 cx.run_until_parked();
5741
5742 // Subagent responds with tokens still at warning level (no worse).
5743 // Since ratio_before_prompt was already Warning, this should NOT
5744 // trigger the context window warning again.
5745 model.send_last_completion_stream_text_chunk("follow-up task response");
5746 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5747 TokenUsage {
5748 input_tokens: 870_000,
5749 output_tokens: 0,
5750 cache_creation_input_tokens: 0,
5751 cache_read_input_tokens: 0,
5752 },
5753 ));
5754 model.end_last_completion_stream();
5755
5756 cx.run_until_parked();
5757
5758 // Parent model responds to complete second turn
5759 model.send_last_completion_stream_text_chunk("Second response");
5760 model.end_last_completion_stream();
5761
5762 send2.await.unwrap();
5763
5764 // The resumed subagent should have completed normally since the ratio
5765 // didn't transition (it was Warning before and stayed at Warning)
5766 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5767 assert!(
5768 markdown.contains("follow-up task response"),
5769 "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5770 );
5771 // The second tool call should NOT have a context window warning
5772 let second_tool_pos = markdown
5773 .find("follow-up task")
5774 .expect("should find follow-up tool call");
5775 let after_second_tool = &markdown[second_tool_pos..];
5776 assert!(
5777 !after_second_tool.contains("nearing the end of its context window"),
5778 "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5779 );
5780}
5781
5782#[gpui::test]
5783async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5784 init_test(cx);
5785 cx.update(|cx| {
5786 LanguageModelRegistry::test(cx);
5787 });
5788 cx.update(|cx| {
5789 cx.update_flags(true, vec!["subagents".to_string()]);
5790 });
5791
5792 let fs = FakeFs::new(cx.executor());
5793 fs.insert_tree(
5794 "/",
5795 json!({
5796 "a": {
5797 "b.md": "Lorem"
5798 }
5799 }),
5800 )
5801 .await;
5802 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5803 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5804 let agent = cx.update(|cx| {
5805 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5806 });
5807 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5808
5809 let acp_thread = cx
5810 .update(|cx| {
5811 connection
5812 .clone()
5813 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5814 })
5815 .await
5816 .unwrap();
5817 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5818 let thread = agent.read_with(cx, |agent, _| {
5819 agent.sessions.get(&session_id).unwrap().thread.clone()
5820 });
5821 let model = Arc::new(FakeLanguageModel::default());
5822
5823 thread.update(cx, |thread, cx| {
5824 thread.set_model(model.clone(), cx);
5825 });
5826 cx.run_until_parked();
5827
5828 // Start the parent turn
5829 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5830 cx.run_until_parked();
5831 model.send_last_completion_stream_text_chunk("spawning subagent");
5832 let subagent_tool_input = SpawnAgentToolInput {
5833 label: "label".to_string(),
5834 message: "subagent task prompt".to_string(),
5835 session_id: None,
5836 };
5837 let subagent_tool_use = LanguageModelToolUse {
5838 id: "subagent_1".into(),
5839 name: SpawnAgentTool::NAME.into(),
5840 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5841 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5842 is_input_complete: true,
5843 thought_signature: None,
5844 };
5845 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5846 subagent_tool_use,
5847 ));
5848 model.end_last_completion_stream();
5849
5850 cx.run_until_parked();
5851
5852 // Verify subagent is running
5853 thread.read_with(cx, |thread, cx| {
5854 assert!(
5855 !thread.running_subagent_ids(cx).is_empty(),
5856 "subagent should be running"
5857 );
5858 });
5859
5860 // The subagent's model returns a non-retryable error
5861 model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5862 tokens: None,
5863 });
5864
5865 cx.run_until_parked();
5866
5867 // The subagent should no longer be running
5868 thread.read_with(cx, |thread, cx| {
5869 assert!(
5870 thread.running_subagent_ids(cx).is_empty(),
5871 "subagent should not be running after error"
5872 );
5873 });
5874
5875 // The parent model should get a new completion request to respond to the tool error
5876 model.send_last_completion_stream_text_chunk("Response after error");
5877 model.end_last_completion_stream();
5878
5879 send.await.unwrap();
5880
5881 // Verify the parent thread shows the error in the tool call
5882 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5883 assert!(
5884 markdown.contains("Status: Failed"),
5885 "tool call should have Failed status after model error, got:\n{markdown}"
5886 );
5887}
5888
5889#[gpui::test]
5890async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5891 init_test(cx);
5892
5893 let fs = FakeFs::new(cx.executor());
5894 fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5895 .await;
5896 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5897
5898 cx.update(|cx| {
5899 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5900 settings.tool_permissions.tools.insert(
5901 EditFileTool::NAME.into(),
5902 agent_settings::ToolRules {
5903 default: Some(settings::ToolPermissionMode::Allow),
5904 always_allow: vec![],
5905 always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5906 always_confirm: vec![],
5907 invalid_patterns: vec![],
5908 },
5909 );
5910 agent_settings::AgentSettings::override_global(settings, cx);
5911 });
5912
5913 let context_server_registry =
5914 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5915 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5916 let templates = crate::Templates::new();
5917 let thread = cx.new(|cx| {
5918 crate::Thread::new(
5919 project.clone(),
5920 cx.new(|_cx| prompt_store::ProjectContext::default()),
5921 context_server_registry,
5922 templates.clone(),
5923 None,
5924 cx,
5925 )
5926 });
5927
5928 #[allow(clippy::arc_with_non_send_sync)]
5929 let tool = Arc::new(crate::EditFileTool::new(
5930 project.clone(),
5931 thread.downgrade(),
5932 language_registry,
5933 templates,
5934 ));
5935 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5936
5937 let task = cx.update(|cx| {
5938 tool.run(
5939 ToolInput::resolved(crate::EditFileToolInput {
5940 display_description: "Edit sensitive file".to_string(),
5941 path: "root/sensitive_config.txt".into(),
5942 mode: crate::EditFileMode::Edit,
5943 }),
5944 event_stream,
5945 cx,
5946 )
5947 });
5948
5949 let result = task.await;
5950 assert!(result.is_err(), "expected edit to be blocked");
5951 assert!(
5952 result.unwrap_err().to_string().contains("blocked"),
5953 "error should mention the edit was blocked"
5954 );
5955}
5956
5957#[gpui::test]
5958async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5959 init_test(cx);
5960
5961 let fs = FakeFs::new(cx.executor());
5962 fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5963 .await;
5964 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5965
5966 cx.update(|cx| {
5967 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5968 settings.tool_permissions.tools.insert(
5969 DeletePathTool::NAME.into(),
5970 agent_settings::ToolRules {
5971 default: Some(settings::ToolPermissionMode::Allow),
5972 always_allow: vec![],
5973 always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5974 always_confirm: vec![],
5975 invalid_patterns: vec![],
5976 },
5977 );
5978 agent_settings::AgentSettings::override_global(settings, cx);
5979 });
5980
5981 let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5982
5983 #[allow(clippy::arc_with_non_send_sync)]
5984 let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5985 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5986
5987 let task = cx.update(|cx| {
5988 tool.run(
5989 ToolInput::resolved(crate::DeletePathToolInput {
5990 path: "root/important_data.txt".to_string(),
5991 }),
5992 event_stream,
5993 cx,
5994 )
5995 });
5996
5997 let result = task.await;
5998 assert!(result.is_err(), "expected deletion to be blocked");
5999 assert!(
6000 result.unwrap_err().contains("blocked"),
6001 "error should mention the deletion was blocked"
6002 );
6003}
6004
6005#[gpui::test]
6006async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
6007 init_test(cx);
6008
6009 let fs = FakeFs::new(cx.executor());
6010 fs.insert_tree(
6011 "/root",
6012 json!({
6013 "safe.txt": "content",
6014 "protected": {}
6015 }),
6016 )
6017 .await;
6018 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6019
6020 cx.update(|cx| {
6021 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6022 settings.tool_permissions.tools.insert(
6023 MovePathTool::NAME.into(),
6024 agent_settings::ToolRules {
6025 default: Some(settings::ToolPermissionMode::Allow),
6026 always_allow: vec![],
6027 always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
6028 always_confirm: vec![],
6029 invalid_patterns: vec![],
6030 },
6031 );
6032 agent_settings::AgentSettings::override_global(settings, cx);
6033 });
6034
6035 #[allow(clippy::arc_with_non_send_sync)]
6036 let tool = Arc::new(crate::MovePathTool::new(project));
6037 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6038
6039 let task = cx.update(|cx| {
6040 tool.run(
6041 ToolInput::resolved(crate::MovePathToolInput {
6042 source_path: "root/safe.txt".to_string(),
6043 destination_path: "root/protected/safe.txt".to_string(),
6044 }),
6045 event_stream,
6046 cx,
6047 )
6048 });
6049
6050 let result = task.await;
6051 assert!(
6052 result.is_err(),
6053 "expected move to be blocked due to destination path"
6054 );
6055 assert!(
6056 result.unwrap_err().contains("blocked"),
6057 "error should mention the move was blocked"
6058 );
6059}
6060
6061#[gpui::test]
6062async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
6063 init_test(cx);
6064
6065 let fs = FakeFs::new(cx.executor());
6066 fs.insert_tree(
6067 "/root",
6068 json!({
6069 "secret.txt": "secret content",
6070 "public": {}
6071 }),
6072 )
6073 .await;
6074 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6075
6076 cx.update(|cx| {
6077 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6078 settings.tool_permissions.tools.insert(
6079 MovePathTool::NAME.into(),
6080 agent_settings::ToolRules {
6081 default: Some(settings::ToolPermissionMode::Allow),
6082 always_allow: vec![],
6083 always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
6084 always_confirm: vec![],
6085 invalid_patterns: vec![],
6086 },
6087 );
6088 agent_settings::AgentSettings::override_global(settings, cx);
6089 });
6090
6091 #[allow(clippy::arc_with_non_send_sync)]
6092 let tool = Arc::new(crate::MovePathTool::new(project));
6093 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6094
6095 let task = cx.update(|cx| {
6096 tool.run(
6097 ToolInput::resolved(crate::MovePathToolInput {
6098 source_path: "root/secret.txt".to_string(),
6099 destination_path: "root/public/not_secret.txt".to_string(),
6100 }),
6101 event_stream,
6102 cx,
6103 )
6104 });
6105
6106 let result = task.await;
6107 assert!(
6108 result.is_err(),
6109 "expected move to be blocked due to source path"
6110 );
6111 assert!(
6112 result.unwrap_err().contains("blocked"),
6113 "error should mention the move was blocked"
6114 );
6115}
6116
6117#[gpui::test]
6118async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
6119 init_test(cx);
6120
6121 let fs = FakeFs::new(cx.executor());
6122 fs.insert_tree(
6123 "/root",
6124 json!({
6125 "confidential.txt": "confidential data",
6126 "dest": {}
6127 }),
6128 )
6129 .await;
6130 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6131
6132 cx.update(|cx| {
6133 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6134 settings.tool_permissions.tools.insert(
6135 CopyPathTool::NAME.into(),
6136 agent_settings::ToolRules {
6137 default: Some(settings::ToolPermissionMode::Allow),
6138 always_allow: vec![],
6139 always_deny: vec![
6140 agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
6141 ],
6142 always_confirm: vec![],
6143 invalid_patterns: vec![],
6144 },
6145 );
6146 agent_settings::AgentSettings::override_global(settings, cx);
6147 });
6148
6149 #[allow(clippy::arc_with_non_send_sync)]
6150 let tool = Arc::new(crate::CopyPathTool::new(project));
6151 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6152
6153 let task = cx.update(|cx| {
6154 tool.run(
6155 ToolInput::resolved(crate::CopyPathToolInput {
6156 source_path: "root/confidential.txt".to_string(),
6157 destination_path: "root/dest/copy.txt".to_string(),
6158 }),
6159 event_stream,
6160 cx,
6161 )
6162 });
6163
6164 let result = task.await;
6165 assert!(result.is_err(), "expected copy to be blocked");
6166 assert!(
6167 result.unwrap_err().contains("blocked"),
6168 "error should mention the copy was blocked"
6169 );
6170}
6171
6172#[gpui::test]
6173async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
6174 init_test(cx);
6175
6176 let fs = FakeFs::new(cx.executor());
6177 fs.insert_tree(
6178 "/root",
6179 json!({
6180 "normal.txt": "normal content",
6181 "readonly": {
6182 "config.txt": "readonly content"
6183 }
6184 }),
6185 )
6186 .await;
6187 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6188
6189 cx.update(|cx| {
6190 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6191 settings.tool_permissions.tools.insert(
6192 SaveFileTool::NAME.into(),
6193 agent_settings::ToolRules {
6194 default: Some(settings::ToolPermissionMode::Allow),
6195 always_allow: vec![],
6196 always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6197 always_confirm: vec![],
6198 invalid_patterns: vec![],
6199 },
6200 );
6201 agent_settings::AgentSettings::override_global(settings, cx);
6202 });
6203
6204 #[allow(clippy::arc_with_non_send_sync)]
6205 let tool = Arc::new(crate::SaveFileTool::new(project));
6206 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6207
6208 let task = cx.update(|cx| {
6209 tool.run(
6210 ToolInput::resolved(crate::SaveFileToolInput {
6211 paths: vec![
6212 std::path::PathBuf::from("root/normal.txt"),
6213 std::path::PathBuf::from("root/readonly/config.txt"),
6214 ],
6215 }),
6216 event_stream,
6217 cx,
6218 )
6219 });
6220
6221 let result = task.await;
6222 assert!(
6223 result.is_err(),
6224 "expected save to be blocked due to denied path"
6225 );
6226 assert!(
6227 result.unwrap_err().contains("blocked"),
6228 "error should mention the save was blocked"
6229 );
6230}
6231
6232#[gpui::test]
6233async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6234 init_test(cx);
6235
6236 let fs = FakeFs::new(cx.executor());
6237 fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6238 .await;
6239 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6240
6241 cx.update(|cx| {
6242 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6243 settings.tool_permissions.tools.insert(
6244 SaveFileTool::NAME.into(),
6245 agent_settings::ToolRules {
6246 default: Some(settings::ToolPermissionMode::Allow),
6247 always_allow: vec![],
6248 always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6249 always_confirm: vec![],
6250 invalid_patterns: vec![],
6251 },
6252 );
6253 agent_settings::AgentSettings::override_global(settings, cx);
6254 });
6255
6256 #[allow(clippy::arc_with_non_send_sync)]
6257 let tool = Arc::new(crate::SaveFileTool::new(project));
6258 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6259
6260 let task = cx.update(|cx| {
6261 tool.run(
6262 ToolInput::resolved(crate::SaveFileToolInput {
6263 paths: vec![std::path::PathBuf::from("root/config.secret")],
6264 }),
6265 event_stream,
6266 cx,
6267 )
6268 });
6269
6270 let result = task.await;
6271 assert!(result.is_err(), "expected save to be blocked");
6272 assert!(
6273 result.unwrap_err().contains("blocked"),
6274 "error should mention the save was blocked"
6275 );
6276}
6277
6278#[gpui::test]
6279async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6280 init_test(cx);
6281
6282 cx.update(|cx| {
6283 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6284 settings.tool_permissions.tools.insert(
6285 WebSearchTool::NAME.into(),
6286 agent_settings::ToolRules {
6287 default: Some(settings::ToolPermissionMode::Allow),
6288 always_allow: vec![],
6289 always_deny: vec![
6290 agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6291 ],
6292 always_confirm: vec![],
6293 invalid_patterns: vec![],
6294 },
6295 );
6296 agent_settings::AgentSettings::override_global(settings, cx);
6297 });
6298
6299 #[allow(clippy::arc_with_non_send_sync)]
6300 let tool = Arc::new(crate::WebSearchTool);
6301 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6302
6303 let input: crate::WebSearchToolInput =
6304 serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6305
6306 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6307
6308 let result = task.await;
6309 assert!(result.is_err(), "expected search to be blocked");
6310 match result.unwrap_err() {
6311 crate::WebSearchToolOutput::Error { error } => {
6312 assert!(
6313 error.contains("blocked"),
6314 "error should mention the search was blocked"
6315 );
6316 }
6317 other => panic!("expected Error variant, got: {other:?}"),
6318 }
6319}
6320
6321#[gpui::test]
6322async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6323 init_test(cx);
6324
6325 let fs = FakeFs::new(cx.executor());
6326 fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6327 .await;
6328 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6329
6330 cx.update(|cx| {
6331 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6332 settings.tool_permissions.tools.insert(
6333 EditFileTool::NAME.into(),
6334 agent_settings::ToolRules {
6335 default: Some(settings::ToolPermissionMode::Confirm),
6336 always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6337 always_deny: vec![],
6338 always_confirm: vec![],
6339 invalid_patterns: vec![],
6340 },
6341 );
6342 agent_settings::AgentSettings::override_global(settings, cx);
6343 });
6344
6345 let context_server_registry =
6346 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6347 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6348 let templates = crate::Templates::new();
6349 let thread = cx.new(|cx| {
6350 crate::Thread::new(
6351 project.clone(),
6352 cx.new(|_cx| prompt_store::ProjectContext::default()),
6353 context_server_registry,
6354 templates.clone(),
6355 None,
6356 cx,
6357 )
6358 });
6359
6360 #[allow(clippy::arc_with_non_send_sync)]
6361 let tool = Arc::new(crate::EditFileTool::new(
6362 project,
6363 thread.downgrade(),
6364 language_registry,
6365 templates,
6366 ));
6367 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6368
6369 let _task = cx.update(|cx| {
6370 tool.run(
6371 ToolInput::resolved(crate::EditFileToolInput {
6372 display_description: "Edit README".to_string(),
6373 path: "root/README.md".into(),
6374 mode: crate::EditFileMode::Edit,
6375 }),
6376 event_stream,
6377 cx,
6378 )
6379 });
6380
6381 cx.run_until_parked();
6382
6383 let event = rx.try_recv();
6384 assert!(
6385 !matches!(event, Ok(Ok(ThreadEvent::ToolCallAuthorization(_)))),
6386 "expected no authorization request for allowed .md file"
6387 );
6388}
6389
6390#[gpui::test]
6391async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6392 init_test(cx);
6393
6394 let fs = FakeFs::new(cx.executor());
6395 fs.insert_tree(
6396 "/root",
6397 json!({
6398 ".zed": {
6399 "settings.json": "{}"
6400 },
6401 "README.md": "# Hello"
6402 }),
6403 )
6404 .await;
6405 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6406
6407 cx.update(|cx| {
6408 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6409 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6410 agent_settings::AgentSettings::override_global(settings, cx);
6411 });
6412
6413 let context_server_registry =
6414 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6415 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6416 let templates = crate::Templates::new();
6417 let thread = cx.new(|cx| {
6418 crate::Thread::new(
6419 project.clone(),
6420 cx.new(|_cx| prompt_store::ProjectContext::default()),
6421 context_server_registry,
6422 templates.clone(),
6423 None,
6424 cx,
6425 )
6426 });
6427
6428 #[allow(clippy::arc_with_non_send_sync)]
6429 let tool = Arc::new(crate::EditFileTool::new(
6430 project,
6431 thread.downgrade(),
6432 language_registry,
6433 templates,
6434 ));
6435
6436 // Editing a file inside .zed/ should still prompt even with global default: allow,
6437 // because local settings paths are sensitive and require confirmation regardless.
6438 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6439 let _task = cx.update(|cx| {
6440 tool.run(
6441 ToolInput::resolved(crate::EditFileToolInput {
6442 display_description: "Edit local settings".to_string(),
6443 path: "root/.zed/settings.json".into(),
6444 mode: crate::EditFileMode::Edit,
6445 }),
6446 event_stream,
6447 cx,
6448 )
6449 });
6450
6451 let _update = rx.expect_update_fields().await;
6452 let _auth = rx.expect_authorization().await;
6453}
6454
6455#[gpui::test]
6456async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6457 init_test(cx);
6458
6459 cx.update(|cx| {
6460 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6461 settings.tool_permissions.tools.insert(
6462 FetchTool::NAME.into(),
6463 agent_settings::ToolRules {
6464 default: Some(settings::ToolPermissionMode::Allow),
6465 always_allow: vec![],
6466 always_deny: vec![
6467 agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6468 ],
6469 always_confirm: vec![],
6470 invalid_patterns: vec![],
6471 },
6472 );
6473 agent_settings::AgentSettings::override_global(settings, cx);
6474 });
6475
6476 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6477
6478 #[allow(clippy::arc_with_non_send_sync)]
6479 let tool = Arc::new(crate::FetchTool::new(http_client));
6480 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6481
6482 let input: crate::FetchToolInput =
6483 serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6484
6485 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6486
6487 let result = task.await;
6488 assert!(result.is_err(), "expected fetch to be blocked");
6489 assert!(
6490 result.unwrap_err().contains("blocked"),
6491 "error should mention the fetch was blocked"
6492 );
6493}
6494
6495#[gpui::test]
6496async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6497 init_test(cx);
6498
6499 cx.update(|cx| {
6500 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6501 settings.tool_permissions.tools.insert(
6502 FetchTool::NAME.into(),
6503 agent_settings::ToolRules {
6504 default: Some(settings::ToolPermissionMode::Confirm),
6505 always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6506 always_deny: vec![],
6507 always_confirm: vec![],
6508 invalid_patterns: vec![],
6509 },
6510 );
6511 agent_settings::AgentSettings::override_global(settings, cx);
6512 });
6513
6514 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6515
6516 #[allow(clippy::arc_with_non_send_sync)]
6517 let tool = Arc::new(crate::FetchTool::new(http_client));
6518 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6519
6520 let input: crate::FetchToolInput =
6521 serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6522
6523 let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6524
6525 cx.run_until_parked();
6526
6527 let event = rx.try_recv();
6528 assert!(
6529 !matches!(event, Ok(Ok(ThreadEvent::ToolCallAuthorization(_)))),
6530 "expected no authorization request for allowed docs.rs URL"
6531 );
6532}
6533
6534#[gpui::test]
6535async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6536 init_test(cx);
6537 always_allow_tools(cx);
6538
6539 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6540 let fake_model = model.as_fake();
6541
6542 // Add a tool so we can simulate tool calls
6543 thread.update(cx, |thread, _cx| {
6544 thread.add_tool(EchoTool);
6545 });
6546
6547 // Start a turn by sending a message
6548 let mut events = thread
6549 .update(cx, |thread, cx| {
6550 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6551 })
6552 .unwrap();
6553 cx.run_until_parked();
6554
6555 // Simulate the model making a tool call
6556 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6557 LanguageModelToolUse {
6558 id: "tool_1".into(),
6559 name: "echo".into(),
6560 raw_input: r#"{"text": "hello"}"#.into(),
6561 input: json!({"text": "hello"}),
6562 is_input_complete: true,
6563 thought_signature: None,
6564 },
6565 ));
6566 fake_model
6567 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6568
6569 // Signal that a message is queued before ending the stream
6570 thread.update(cx, |thread, _cx| {
6571 thread.set_has_queued_message(true);
6572 });
6573
6574 // Now end the stream - tool will run, and the boundary check should see the queue
6575 fake_model.end_last_completion_stream();
6576
6577 // Collect all events until the turn stops
6578 let all_events = collect_events_until_stop(&mut events, cx).await;
6579
6580 // Verify we received the tool call event
6581 let tool_call_ids: Vec<_> = all_events
6582 .iter()
6583 .filter_map(|e| match e {
6584 Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6585 _ => None,
6586 })
6587 .collect();
6588 assert_eq!(
6589 tool_call_ids,
6590 vec!["tool_1"],
6591 "Should have received a tool call event for our echo tool"
6592 );
6593
6594 // The turn should have stopped with EndTurn
6595 let stop_reasons = stop_events(all_events);
6596 assert_eq!(
6597 stop_reasons,
6598 vec![acp::StopReason::EndTurn],
6599 "Turn should have ended after tool completion due to queued message"
6600 );
6601
6602 // Verify the queued message flag is still set
6603 thread.update(cx, |thread, _cx| {
6604 assert!(
6605 thread.has_queued_message(),
6606 "Should still have queued message flag set"
6607 );
6608 });
6609
6610 // Thread should be idle now
6611 thread.update(cx, |thread, _cx| {
6612 assert!(
6613 thread.is_turn_complete(),
6614 "Thread should not be running after turn ends"
6615 );
6616 });
6617}
6618
6619#[gpui::test]
6620async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6621 init_test(cx);
6622 always_allow_tools(cx);
6623
6624 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6625 let fake_model = model.as_fake();
6626
6627 thread.update(cx, |thread, _cx| {
6628 thread.add_tool(StreamingFailingEchoTool {
6629 receive_chunks_until_failure: 1,
6630 });
6631 });
6632
6633 let _events = thread
6634 .update(cx, |thread, cx| {
6635 thread.send(
6636 UserMessageId::new(),
6637 ["Use the streaming_failing_echo tool"],
6638 cx,
6639 )
6640 })
6641 .unwrap();
6642 cx.run_until_parked();
6643
6644 let tool_use = LanguageModelToolUse {
6645 id: "call_1".into(),
6646 name: StreamingFailingEchoTool::NAME.into(),
6647 raw_input: "hello".into(),
6648 input: json!({}),
6649 is_input_complete: false,
6650 thought_signature: None,
6651 };
6652
6653 fake_model
6654 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6655
6656 cx.run_until_parked();
6657
6658 let completions = fake_model.pending_completions();
6659 let last_completion = completions.last().unwrap();
6660
6661 assert_eq!(
6662 last_completion.messages[1..],
6663 vec![
6664 LanguageModelRequestMessage {
6665 role: Role::User,
6666 content: vec!["Use the streaming_failing_echo tool".into()],
6667 cache: false,
6668 reasoning_details: None,
6669 },
6670 LanguageModelRequestMessage {
6671 role: Role::Assistant,
6672 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6673 cache: false,
6674 reasoning_details: None,
6675 },
6676 LanguageModelRequestMessage {
6677 role: Role::User,
6678 content: vec![language_model::MessageContent::ToolResult(
6679 LanguageModelToolResult {
6680 tool_use_id: tool_use.id.clone(),
6681 tool_name: tool_use.name,
6682 is_error: true,
6683 content: "failed".into(),
6684 output: Some("failed".into()),
6685 }
6686 )],
6687 cache: true,
6688 reasoning_details: None,
6689 },
6690 ]
6691 );
6692}
6693
6694#[gpui::test]
6695async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6696 init_test(cx);
6697 always_allow_tools(cx);
6698
6699 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6700 let fake_model = model.as_fake();
6701
6702 let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6703 oneshot::channel();
6704
6705 thread.update(cx, |thread, _cx| {
6706 thread.add_tool(
6707 StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6708 );
6709 thread.add_tool(StreamingFailingEchoTool {
6710 receive_chunks_until_failure: 1,
6711 });
6712 });
6713
6714 let _events = thread
6715 .update(cx, |thread, cx| {
6716 thread.send(
6717 UserMessageId::new(),
6718 ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6719 cx,
6720 )
6721 })
6722 .unwrap();
6723 cx.run_until_parked();
6724
6725 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6726 LanguageModelToolUse {
6727 id: "call_1".into(),
6728 name: StreamingEchoTool::NAME.into(),
6729 raw_input: "hello".into(),
6730 input: json!({ "text": "hello" }),
6731 is_input_complete: false,
6732 thought_signature: None,
6733 },
6734 ));
6735 let first_tool_use = LanguageModelToolUse {
6736 id: "call_1".into(),
6737 name: StreamingEchoTool::NAME.into(),
6738 raw_input: "hello world".into(),
6739 input: json!({ "text": "hello world" }),
6740 is_input_complete: true,
6741 thought_signature: None,
6742 };
6743 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6744 first_tool_use.clone(),
6745 ));
6746 let second_tool_use = LanguageModelToolUse {
6747 name: StreamingFailingEchoTool::NAME.into(),
6748 raw_input: "hello".into(),
6749 input: json!({ "text": "hello" }),
6750 is_input_complete: false,
6751 thought_signature: None,
6752 id: "call_2".into(),
6753 };
6754 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6755 second_tool_use.clone(),
6756 ));
6757
6758 cx.run_until_parked();
6759
6760 complete_streaming_echo_tool_call_tx.send(()).unwrap();
6761
6762 cx.run_until_parked();
6763
6764 let completions = fake_model.pending_completions();
6765 let last_completion = completions.last().unwrap();
6766
6767 assert_eq!(
6768 last_completion.messages[1..],
6769 vec![
6770 LanguageModelRequestMessage {
6771 role: Role::User,
6772 content: vec![
6773 "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6774 ],
6775 cache: false,
6776 reasoning_details: None,
6777 },
6778 LanguageModelRequestMessage {
6779 role: Role::Assistant,
6780 content: vec![
6781 language_model::MessageContent::ToolUse(first_tool_use.clone()),
6782 language_model::MessageContent::ToolUse(second_tool_use.clone())
6783 ],
6784 cache: false,
6785 reasoning_details: None,
6786 },
6787 LanguageModelRequestMessage {
6788 role: Role::User,
6789 content: vec![
6790 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6791 tool_use_id: second_tool_use.id.clone(),
6792 tool_name: second_tool_use.name,
6793 is_error: true,
6794 content: "failed".into(),
6795 output: Some("failed".into()),
6796 }),
6797 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6798 tool_use_id: first_tool_use.id.clone(),
6799 tool_name: first_tool_use.name,
6800 is_error: false,
6801 content: "hello world".into(),
6802 output: Some("hello world".into()),
6803 }),
6804 ],
6805 cache: true,
6806 reasoning_details: None,
6807 },
6808 ]
6809 );
6810}
6811
6812#[gpui::test]
6813async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6814 let ThreadTest {
6815 model, thread, fs, ..
6816 } = setup(cx, TestModel::Fake).await;
6817 let fake_model_a = model.as_fake();
6818
6819 thread.update(cx, |thread, _cx| {
6820 thread.add_tool(EchoTool);
6821 thread.add_tool(DelayTool);
6822 });
6823
6824 // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6825 fs.insert_file(
6826 paths::settings_file(),
6827 json!({
6828 "agent": {
6829 "profiles": {
6830 "profile-a": {
6831 "name": "Profile A",
6832 "tools": {
6833 EchoTool::NAME: true,
6834 DelayTool::NAME: true,
6835 }
6836 },
6837 "profile-b": {
6838 "name": "Profile B",
6839 "tools": {
6840 DelayTool::NAME: true,
6841 }
6842 }
6843 }
6844 }
6845 })
6846 .to_string()
6847 .into_bytes(),
6848 )
6849 .await;
6850 cx.run_until_parked();
6851
6852 thread.update(cx, |thread, cx| {
6853 thread.set_profile(AgentProfileId("profile-a".into()), cx);
6854 thread.set_thinking_enabled(false, cx);
6855 });
6856
6857 // Send a message — first iteration starts with model A, profile-a, thinking off.
6858 thread
6859 .update(cx, |thread, cx| {
6860 thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6861 })
6862 .unwrap();
6863 cx.run_until_parked();
6864
6865 // Verify first request has both tools and thinking disabled.
6866 let completions = fake_model_a.pending_completions();
6867 assert_eq!(completions.len(), 1);
6868 let first_tools = tool_names_for_completion(&completions[0]);
6869 assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6870 assert!(!completions[0].thinking_allowed);
6871
6872 // Model A responds with an echo tool call.
6873 fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6874 LanguageModelToolUse {
6875 id: "tool_1".into(),
6876 name: "echo".into(),
6877 raw_input: r#"{"text":"hello"}"#.into(),
6878 input: json!({"text": "hello"}),
6879 is_input_complete: true,
6880 thought_signature: None,
6881 },
6882 ));
6883 fake_model_a.end_last_completion_stream();
6884
6885 // Before the next iteration runs, switch to profile-b (only DelayTool),
6886 // swap in a new model, and enable thinking.
6887 let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6888 "test-provider",
6889 "model-b",
6890 "Model B",
6891 true,
6892 ));
6893 thread.update(cx, |thread, cx| {
6894 thread.set_profile(AgentProfileId("profile-b".into()), cx);
6895 thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6896 thread.set_thinking_enabled(true, cx);
6897 });
6898
6899 // Run until parked — processes the echo tool call, loops back, picks up
6900 // the new model/profile/thinking, and makes a second request to model B.
6901 cx.run_until_parked();
6902
6903 // The second request should have gone to model B.
6904 let model_b_completions = fake_model_b.pending_completions();
6905 assert_eq!(
6906 model_b_completions.len(),
6907 1,
6908 "second request should go to model B"
6909 );
6910
6911 // Profile-b only has DelayTool, so echo should be gone.
6912 let second_tools = tool_names_for_completion(&model_b_completions[0]);
6913 assert_eq!(second_tools, vec![DelayTool::NAME]);
6914
6915 // Thinking should now be enabled.
6916 assert!(model_b_completions[0].thinking_allowed);
6917}