1use super::*;
2use acp_thread::{
3 AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
4 UserMessageId,
5};
6use agent_client_protocol::{self as acp};
7use agent_settings::AgentProfileId;
8use anyhow::Result;
9use client::{Client, UserStore};
10use cloud_llm_client::CompletionIntent;
11use collections::IndexMap;
12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
13use feature_flags::FeatureFlagAppExt as _;
14use fs::{FakeFs, Fs};
15use futures::{
16 FutureExt as _, StreamExt,
17 channel::{
18 mpsc::{self, UnboundedReceiver},
19 oneshot,
20 },
21 future::{Fuse, Shared},
22};
23use gpui::{
24 App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
25 http_client::FakeHttpClient,
26};
27use indoc::indoc;
28use language_model::{
29 LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
30 LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
31 LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
32 LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
33 fake_provider::FakeLanguageModel,
34};
35use pretty_assertions::assert_eq;
36use project::{
37 Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
38};
39use prompt_store::ProjectContext;
40use reqwest_client::ReqwestClient;
41use schemars::JsonSchema;
42use serde::{Deserialize, Serialize};
43use serde_json::json;
44use settings::{Settings, SettingsStore};
45use std::{
46 path::Path,
47 pin::Pin,
48 rc::Rc,
49 sync::{
50 Arc,
51 atomic::{AtomicBool, AtomicUsize, Ordering},
52 },
53 time::Duration,
54};
55use util::path;
56
57mod edit_file_thread_test;
58mod test_tools;
59use test_tools::*;
60
61pub(crate) fn init_test(cx: &mut TestAppContext) {
62 cx.update(|cx| {
63 let settings_store = SettingsStore::test(cx);
64 cx.set_global(settings_store);
65 });
66}
67
68pub(crate) struct FakeTerminalHandle {
69 killed: Arc<AtomicBool>,
70 stopped_by_user: Arc<AtomicBool>,
71 exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
72 wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
73 output: acp::TerminalOutputResponse,
74 id: acp::TerminalId,
75}
76
77impl FakeTerminalHandle {
78 pub(crate) fn new_never_exits(cx: &mut App) -> Self {
79 let killed = Arc::new(AtomicBool::new(false));
80 let stopped_by_user = Arc::new(AtomicBool::new(false));
81
82 let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
83
84 let wait_for_exit = cx
85 .spawn(async move |_cx| {
86 // Wait for the exit signal (sent when kill() is called)
87 let _ = exit_receiver.await;
88 acp::TerminalExitStatus::new()
89 })
90 .shared();
91
92 Self {
93 killed,
94 stopped_by_user,
95 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
96 wait_for_exit,
97 output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
98 id: acp::TerminalId::new("fake_terminal".to_string()),
99 }
100 }
101
102 pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
103 let killed = Arc::new(AtomicBool::new(false));
104 let stopped_by_user = Arc::new(AtomicBool::new(false));
105 let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
106
107 let wait_for_exit = cx
108 .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
109 .shared();
110
111 Self {
112 killed,
113 stopped_by_user,
114 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
115 wait_for_exit,
116 output: acp::TerminalOutputResponse::new("command output".to_string(), false),
117 id: acp::TerminalId::new("fake_terminal".to_string()),
118 }
119 }
120
121 pub(crate) fn was_killed(&self) -> bool {
122 self.killed.load(Ordering::SeqCst)
123 }
124
125 pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
126 self.stopped_by_user.store(stopped, Ordering::SeqCst);
127 }
128
129 pub(crate) fn signal_exit(&self) {
130 if let Some(sender) = self.exit_sender.borrow_mut().take() {
131 let _ = sender.send(());
132 }
133 }
134}
135
136impl crate::TerminalHandle for FakeTerminalHandle {
137 fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
138 Ok(self.id.clone())
139 }
140
141 fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
142 Ok(self.output.clone())
143 }
144
145 fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
146 Ok(self.wait_for_exit.clone())
147 }
148
149 fn kill(&self, _cx: &AsyncApp) -> Result<()> {
150 self.killed.store(true, Ordering::SeqCst);
151 self.signal_exit();
152 Ok(())
153 }
154
155 fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
156 Ok(self.stopped_by_user.load(Ordering::SeqCst))
157 }
158}
159
160struct FakeSubagentHandle {
161 session_id: acp::SessionId,
162 send_task: Shared<Task<String>>,
163}
164
165impl SubagentHandle for FakeSubagentHandle {
166 fn id(&self) -> acp::SessionId {
167 self.session_id.clone()
168 }
169
170 fn num_entries(&self, _cx: &App) -> usize {
171 unimplemented!()
172 }
173
174 fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
175 let task = self.send_task.clone();
176 cx.background_spawn(async move { Ok(task.await) })
177 }
178}
179
180#[derive(Default)]
181pub(crate) struct FakeThreadEnvironment {
182 terminal_handle: Option<Rc<FakeTerminalHandle>>,
183 subagent_handle: Option<Rc<FakeSubagentHandle>>,
184 terminal_creations: Arc<AtomicUsize>,
185}
186
187impl FakeThreadEnvironment {
188 pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
189 Self {
190 terminal_handle: Some(terminal_handle.into()),
191 ..self
192 }
193 }
194
195 pub(crate) fn terminal_creation_count(&self) -> usize {
196 self.terminal_creations.load(Ordering::SeqCst)
197 }
198}
199
200impl crate::ThreadEnvironment for FakeThreadEnvironment {
201 fn create_terminal(
202 &self,
203 _command: String,
204 _cwd: Option<std::path::PathBuf>,
205 _output_byte_limit: Option<u64>,
206 _cx: &mut AsyncApp,
207 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
208 self.terminal_creations.fetch_add(1, Ordering::SeqCst);
209 let handle = self
210 .terminal_handle
211 .clone()
212 .expect("Terminal handle not available on FakeThreadEnvironment");
213 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
214 }
215
216 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
217 Ok(self
218 .subagent_handle
219 .clone()
220 .expect("Subagent handle not available on FakeThreadEnvironment")
221 as Rc<dyn SubagentHandle>)
222 }
223}
224
225/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
226struct MultiTerminalEnvironment {
227 handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
228}
229
230impl MultiTerminalEnvironment {
231 fn new() -> Self {
232 Self {
233 handles: std::cell::RefCell::new(Vec::new()),
234 }
235 }
236
237 fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
238 self.handles.borrow().clone()
239 }
240}
241
242impl crate::ThreadEnvironment for MultiTerminalEnvironment {
243 fn create_terminal(
244 &self,
245 _command: String,
246 _cwd: Option<std::path::PathBuf>,
247 _output_byte_limit: Option<u64>,
248 cx: &mut AsyncApp,
249 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
250 let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
251 self.handles.borrow_mut().push(handle.clone());
252 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
253 }
254
255 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
256 unimplemented!()
257 }
258}
259
260fn always_allow_tools(cx: &mut TestAppContext) {
261 cx.update(|cx| {
262 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
263 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
264 agent_settings::AgentSettings::override_global(settings, cx);
265 });
266}
267
268#[gpui::test]
269async fn test_echo(cx: &mut TestAppContext) {
270 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
271 let fake_model = model.as_fake();
272
273 let events = thread
274 .update(cx, |thread, cx| {
275 thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
276 })
277 .unwrap();
278 cx.run_until_parked();
279 fake_model.send_last_completion_stream_text_chunk("Hello");
280 fake_model
281 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
282 fake_model.end_last_completion_stream();
283
284 let events = events.collect().await;
285 thread.update(cx, |thread, _cx| {
286 assert_eq!(
287 thread.last_received_or_pending_message().unwrap().role(),
288 Role::Assistant
289 );
290 assert_eq!(
291 thread
292 .last_received_or_pending_message()
293 .unwrap()
294 .to_markdown(),
295 "Hello\n"
296 )
297 });
298 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
299}
300
301#[gpui::test]
302async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
303 init_test(cx);
304 always_allow_tools(cx);
305
306 let fs = FakeFs::new(cx.executor());
307 let project = Project::test(fs, [], cx).await;
308
309 let environment = Rc::new(cx.update(|cx| {
310 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
311 }));
312 let handle = environment.terminal_handle.clone().unwrap();
313
314 #[allow(clippy::arc_with_non_send_sync)]
315 let tool = Arc::new(crate::TerminalTool::new(project, environment));
316 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
317
318 let task = cx.update(|cx| {
319 tool.run(
320 ToolInput::resolved(crate::TerminalToolInput {
321 command: "sleep 1000".to_string(),
322 cd: ".".to_string(),
323 timeout_ms: Some(5),
324 }),
325 event_stream,
326 cx,
327 )
328 });
329
330 let update = rx.expect_update_fields().await;
331 assert!(
332 update.content.iter().any(|blocks| {
333 blocks
334 .iter()
335 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
336 }),
337 "expected tool call update to include terminal content"
338 );
339
340 let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
341
342 let deadline = std::time::Instant::now() + Duration::from_millis(500);
343 loop {
344 if let Some(result) = task_future.as_mut().now_or_never() {
345 let result = result.expect("terminal tool task should complete");
346
347 assert!(
348 handle.was_killed(),
349 "expected terminal handle to be killed on timeout"
350 );
351 assert!(
352 result.contains("partial output"),
353 "expected result to include terminal output, got: {result}"
354 );
355 return;
356 }
357
358 if std::time::Instant::now() >= deadline {
359 panic!("timed out waiting for terminal tool task to complete");
360 }
361
362 cx.run_until_parked();
363 cx.background_executor.timer(Duration::from_millis(1)).await;
364 }
365}
366
367#[gpui::test]
368#[ignore]
369async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
370 init_test(cx);
371 always_allow_tools(cx);
372
373 let fs = FakeFs::new(cx.executor());
374 let project = Project::test(fs, [], cx).await;
375
376 let environment = Rc::new(cx.update(|cx| {
377 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
378 }));
379 let handle = environment.terminal_handle.clone().unwrap();
380
381 #[allow(clippy::arc_with_non_send_sync)]
382 let tool = Arc::new(crate::TerminalTool::new(project, environment));
383 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
384
385 let _task = cx.update(|cx| {
386 tool.run(
387 ToolInput::resolved(crate::TerminalToolInput {
388 command: "sleep 1000".to_string(),
389 cd: ".".to_string(),
390 timeout_ms: None,
391 }),
392 event_stream,
393 cx,
394 )
395 });
396
397 let update = rx.expect_update_fields().await;
398 assert!(
399 update.content.iter().any(|blocks| {
400 blocks
401 .iter()
402 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
403 }),
404 "expected tool call update to include terminal content"
405 );
406
407 cx.background_executor
408 .timer(Duration::from_millis(25))
409 .await;
410
411 assert!(
412 !handle.was_killed(),
413 "did not expect terminal handle to be killed without a timeout"
414 );
415}
416
417#[gpui::test]
418async fn test_thinking(cx: &mut TestAppContext) {
419 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
420 let fake_model = model.as_fake();
421
422 let events = thread
423 .update(cx, |thread, cx| {
424 thread.send(
425 UserMessageId::new(),
426 [indoc! {"
427 Testing:
428
429 Generate a thinking step where you just think the word 'Think',
430 and have your final answer be 'Hello'
431 "}],
432 cx,
433 )
434 })
435 .unwrap();
436 cx.run_until_parked();
437 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
438 text: "Think".to_string(),
439 signature: None,
440 });
441 fake_model.send_last_completion_stream_text_chunk("Hello");
442 fake_model
443 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
444 fake_model.end_last_completion_stream();
445
446 let events = events.collect().await;
447 thread.update(cx, |thread, _cx| {
448 assert_eq!(
449 thread.last_received_or_pending_message().unwrap().role(),
450 Role::Assistant
451 );
452 assert_eq!(
453 thread
454 .last_received_or_pending_message()
455 .unwrap()
456 .to_markdown(),
457 indoc! {"
458 <think>Think</think>
459 Hello
460 "}
461 )
462 });
463 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
464}
465
466#[gpui::test]
467async fn test_system_prompt(cx: &mut TestAppContext) {
468 let ThreadTest {
469 model,
470 thread,
471 project_context,
472 ..
473 } = setup(cx, TestModel::Fake).await;
474 let fake_model = model.as_fake();
475
476 project_context.update(cx, |project_context, _cx| {
477 project_context.shell = "test-shell".into()
478 });
479 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
480 thread
481 .update(cx, |thread, cx| {
482 thread.send(UserMessageId::new(), ["abc"], cx)
483 })
484 .unwrap();
485 cx.run_until_parked();
486 let mut pending_completions = fake_model.pending_completions();
487 assert_eq!(
488 pending_completions.len(),
489 1,
490 "unexpected pending completions: {:?}",
491 pending_completions
492 );
493
494 let pending_completion = pending_completions.pop().unwrap();
495 assert_eq!(pending_completion.messages[0].role, Role::System);
496
497 let system_message = &pending_completion.messages[0];
498 let system_prompt = system_message.content[0].to_str().unwrap();
499 assert!(
500 system_prompt.contains("test-shell"),
501 "unexpected system message: {:?}",
502 system_message
503 );
504 assert!(
505 system_prompt.contains("## Fixing Diagnostics"),
506 "unexpected system message: {:?}",
507 system_message
508 );
509}
510
511#[gpui::test]
512async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
513 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
514 let fake_model = model.as_fake();
515
516 thread
517 .update(cx, |thread, cx| {
518 thread.send(UserMessageId::new(), ["abc"], cx)
519 })
520 .unwrap();
521 cx.run_until_parked();
522 let mut pending_completions = fake_model.pending_completions();
523 assert_eq!(
524 pending_completions.len(),
525 1,
526 "unexpected pending completions: {:?}",
527 pending_completions
528 );
529
530 let pending_completion = pending_completions.pop().unwrap();
531 assert_eq!(pending_completion.messages[0].role, Role::System);
532
533 let system_message = &pending_completion.messages[0];
534 let system_prompt = system_message.content[0].to_str().unwrap();
535 assert!(
536 !system_prompt.contains("## Tool Use"),
537 "unexpected system message: {:?}",
538 system_message
539 );
540 assert!(
541 !system_prompt.contains("## Fixing Diagnostics"),
542 "unexpected system message: {:?}",
543 system_message
544 );
545}
546
547#[gpui::test]
548async fn test_prompt_caching(cx: &mut TestAppContext) {
549 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
550 let fake_model = model.as_fake();
551
552 // Send initial user message and verify it's cached
553 thread
554 .update(cx, |thread, cx| {
555 thread.send(UserMessageId::new(), ["Message 1"], cx)
556 })
557 .unwrap();
558 cx.run_until_parked();
559
560 let completion = fake_model.pending_completions().pop().unwrap();
561 assert_eq!(
562 completion.messages[1..],
563 vec![LanguageModelRequestMessage {
564 role: Role::User,
565 content: vec!["Message 1".into()],
566 cache: true,
567 reasoning_details: None,
568 }]
569 );
570 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
571 "Response to Message 1".into(),
572 ));
573 fake_model.end_last_completion_stream();
574 cx.run_until_parked();
575
576 // Send another user message and verify only the latest is cached
577 thread
578 .update(cx, |thread, cx| {
579 thread.send(UserMessageId::new(), ["Message 2"], cx)
580 })
581 .unwrap();
582 cx.run_until_parked();
583
584 let completion = fake_model.pending_completions().pop().unwrap();
585 assert_eq!(
586 completion.messages[1..],
587 vec![
588 LanguageModelRequestMessage {
589 role: Role::User,
590 content: vec!["Message 1".into()],
591 cache: false,
592 reasoning_details: None,
593 },
594 LanguageModelRequestMessage {
595 role: Role::Assistant,
596 content: vec!["Response to Message 1".into()],
597 cache: false,
598 reasoning_details: None,
599 },
600 LanguageModelRequestMessage {
601 role: Role::User,
602 content: vec!["Message 2".into()],
603 cache: true,
604 reasoning_details: None,
605 }
606 ]
607 );
608 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
609 "Response to Message 2".into(),
610 ));
611 fake_model.end_last_completion_stream();
612 cx.run_until_parked();
613
614 // Simulate a tool call and verify that the latest tool result is cached
615 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
616 thread
617 .update(cx, |thread, cx| {
618 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
619 })
620 .unwrap();
621 cx.run_until_parked();
622
623 let tool_use = LanguageModelToolUse {
624 id: "tool_1".into(),
625 name: EchoTool::NAME.into(),
626 raw_input: json!({"text": "test"}).to_string(),
627 input: json!({"text": "test"}),
628 is_input_complete: true,
629 thought_signature: None,
630 };
631 fake_model
632 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
633 fake_model.end_last_completion_stream();
634 cx.run_until_parked();
635
636 let completion = fake_model.pending_completions().pop().unwrap();
637 let tool_result = LanguageModelToolResult {
638 tool_use_id: "tool_1".into(),
639 tool_name: EchoTool::NAME.into(),
640 is_error: false,
641 content: "test".into(),
642 output: Some("test".into()),
643 };
644 assert_eq!(
645 completion.messages[1..],
646 vec![
647 LanguageModelRequestMessage {
648 role: Role::User,
649 content: vec!["Message 1".into()],
650 cache: false,
651 reasoning_details: None,
652 },
653 LanguageModelRequestMessage {
654 role: Role::Assistant,
655 content: vec!["Response to Message 1".into()],
656 cache: false,
657 reasoning_details: None,
658 },
659 LanguageModelRequestMessage {
660 role: Role::User,
661 content: vec!["Message 2".into()],
662 cache: false,
663 reasoning_details: None,
664 },
665 LanguageModelRequestMessage {
666 role: Role::Assistant,
667 content: vec!["Response to Message 2".into()],
668 cache: false,
669 reasoning_details: None,
670 },
671 LanguageModelRequestMessage {
672 role: Role::User,
673 content: vec!["Use the echo tool".into()],
674 cache: false,
675 reasoning_details: None,
676 },
677 LanguageModelRequestMessage {
678 role: Role::Assistant,
679 content: vec![MessageContent::ToolUse(tool_use)],
680 cache: false,
681 reasoning_details: None,
682 },
683 LanguageModelRequestMessage {
684 role: Role::User,
685 content: vec![MessageContent::ToolResult(tool_result)],
686 cache: true,
687 reasoning_details: None,
688 }
689 ]
690 );
691}
692
693#[gpui::test]
694#[cfg_attr(not(feature = "e2e"), ignore)]
695async fn test_basic_tool_calls(cx: &mut TestAppContext) {
696 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
697
698 // Test a tool call that's likely to complete *before* streaming stops.
699 let events = thread
700 .update(cx, |thread, cx| {
701 thread.add_tool(EchoTool);
702 thread.send(
703 UserMessageId::new(),
704 ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
705 cx,
706 )
707 })
708 .unwrap()
709 .collect()
710 .await;
711 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
712
713 // Test a tool calls that's likely to complete *after* streaming stops.
714 let events = thread
715 .update(cx, |thread, cx| {
716 thread.remove_tool(&EchoTool::NAME);
717 thread.add_tool(DelayTool);
718 thread.send(
719 UserMessageId::new(),
720 [
721 "Now call the delay tool with 200ms.",
722 "When the timer goes off, then you echo the output of the tool.",
723 ],
724 cx,
725 )
726 })
727 .unwrap()
728 .collect()
729 .await;
730 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
731 thread.update(cx, |thread, _cx| {
732 assert!(
733 thread
734 .last_received_or_pending_message()
735 .unwrap()
736 .as_agent_message()
737 .unwrap()
738 .content
739 .iter()
740 .any(|content| {
741 if let AgentMessageContent::Text(text) = content {
742 text.contains("Ding")
743 } else {
744 false
745 }
746 }),
747 "{}",
748 thread.to_markdown()
749 );
750 });
751}
752
753#[gpui::test]
754#[cfg_attr(not(feature = "e2e"), ignore)]
755async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
756 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
757
758 // Test a tool call that's likely to complete *before* streaming stops.
759 let mut events = thread
760 .update(cx, |thread, cx| {
761 thread.add_tool(WordListTool);
762 thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
763 })
764 .unwrap();
765
766 let mut saw_partial_tool_use = false;
767 while let Some(event) = events.next().await {
768 if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
769 thread.update(cx, |thread, _cx| {
770 // Look for a tool use in the thread's last message
771 let message = thread.last_received_or_pending_message().unwrap();
772 let agent_message = message.as_agent_message().unwrap();
773 let last_content = agent_message.content.last().unwrap();
774 if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
775 assert_eq!(last_tool_use.name.as_ref(), "word_list");
776 if tool_call.status == acp::ToolCallStatus::Pending {
777 if !last_tool_use.is_input_complete
778 && last_tool_use.input.get("g").is_none()
779 {
780 saw_partial_tool_use = true;
781 }
782 } else {
783 last_tool_use
784 .input
785 .get("a")
786 .expect("'a' has streamed because input is now complete");
787 last_tool_use
788 .input
789 .get("g")
790 .expect("'g' has streamed because input is now complete");
791 }
792 } else {
793 panic!("last content should be a tool use");
794 }
795 });
796 }
797 }
798
799 assert!(
800 saw_partial_tool_use,
801 "should see at least one partially streamed tool use in the history"
802 );
803}
804
805#[gpui::test]
806async fn test_tool_authorization(cx: &mut TestAppContext) {
807 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
808 let fake_model = model.as_fake();
809
810 let mut events = thread
811 .update(cx, |thread, cx| {
812 thread.add_tool(ToolRequiringPermission);
813 thread.send(UserMessageId::new(), ["abc"], cx)
814 })
815 .unwrap();
816 cx.run_until_parked();
817 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
818 LanguageModelToolUse {
819 id: "tool_id_1".into(),
820 name: ToolRequiringPermission::NAME.into(),
821 raw_input: "{}".into(),
822 input: json!({}),
823 is_input_complete: true,
824 thought_signature: None,
825 },
826 ));
827 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
828 LanguageModelToolUse {
829 id: "tool_id_2".into(),
830 name: ToolRequiringPermission::NAME.into(),
831 raw_input: "{}".into(),
832 input: json!({}),
833 is_input_complete: true,
834 thought_signature: None,
835 },
836 ));
837 fake_model.end_last_completion_stream();
838 let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
839 let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
840
841 // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
842 tool_call_auth_1
843 .response
844 .send(acp::PermissionOptionId::new("allow").into())
845 .unwrap();
846 cx.run_until_parked();
847
848 // Reject the second - send "deny" option_id directly since Deny is now a button
849 tool_call_auth_2
850 .response
851 .send(acp::PermissionOptionId::new("deny").into())
852 .unwrap();
853 cx.run_until_parked();
854
855 let completion = fake_model.pending_completions().pop().unwrap();
856 let message = completion.messages.last().unwrap();
857 assert_eq!(
858 message.content,
859 vec![
860 language_model::MessageContent::ToolResult(LanguageModelToolResult {
861 tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
862 tool_name: ToolRequiringPermission::NAME.into(),
863 is_error: false,
864 content: "Allowed".into(),
865 output: Some("Allowed".into())
866 }),
867 language_model::MessageContent::ToolResult(LanguageModelToolResult {
868 tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
869 tool_name: ToolRequiringPermission::NAME.into(),
870 is_error: true,
871 content: "Permission to run tool denied by user".into(),
872 output: Some("Permission to run tool denied by user".into())
873 })
874 ]
875 );
876
877 // Simulate yet another tool call.
878 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
879 LanguageModelToolUse {
880 id: "tool_id_3".into(),
881 name: ToolRequiringPermission::NAME.into(),
882 raw_input: "{}".into(),
883 input: json!({}),
884 is_input_complete: true,
885 thought_signature: None,
886 },
887 ));
888 fake_model.end_last_completion_stream();
889
890 // Respond by always allowing tools - send transformed option_id
891 // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
892 let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
893 tool_call_auth_3
894 .response
895 .send(acp::PermissionOptionId::new("always_allow:tool_requiring_permission").into())
896 .unwrap();
897 cx.run_until_parked();
898 let completion = fake_model.pending_completions().pop().unwrap();
899 let message = completion.messages.last().unwrap();
900 assert_eq!(
901 message.content,
902 vec![language_model::MessageContent::ToolResult(
903 LanguageModelToolResult {
904 tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
905 tool_name: ToolRequiringPermission::NAME.into(),
906 is_error: false,
907 content: "Allowed".into(),
908 output: Some("Allowed".into())
909 }
910 )]
911 );
912
913 // Simulate a final tool call, ensuring we don't trigger authorization.
914 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
915 LanguageModelToolUse {
916 id: "tool_id_4".into(),
917 name: ToolRequiringPermission::NAME.into(),
918 raw_input: "{}".into(),
919 input: json!({}),
920 is_input_complete: true,
921 thought_signature: None,
922 },
923 ));
924 fake_model.end_last_completion_stream();
925 cx.run_until_parked();
926 let completion = fake_model.pending_completions().pop().unwrap();
927 let message = completion.messages.last().unwrap();
928 assert_eq!(
929 message.content,
930 vec![language_model::MessageContent::ToolResult(
931 LanguageModelToolResult {
932 tool_use_id: "tool_id_4".into(),
933 tool_name: ToolRequiringPermission::NAME.into(),
934 is_error: false,
935 content: "Allowed".into(),
936 output: Some("Allowed".into())
937 }
938 )]
939 );
940}
941
942#[gpui::test]
943async fn test_tool_hallucination(cx: &mut TestAppContext) {
944 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
945 let fake_model = model.as_fake();
946
947 let mut events = thread
948 .update(cx, |thread, cx| {
949 thread.send(UserMessageId::new(), ["abc"], cx)
950 })
951 .unwrap();
952 cx.run_until_parked();
953 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
954 LanguageModelToolUse {
955 id: "tool_id_1".into(),
956 name: "nonexistent_tool".into(),
957 raw_input: "{}".into(),
958 input: json!({}),
959 is_input_complete: true,
960 thought_signature: None,
961 },
962 ));
963 fake_model.end_last_completion_stream();
964
965 let tool_call = expect_tool_call(&mut events).await;
966 assert_eq!(tool_call.title, "nonexistent_tool");
967 assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
968 let update = expect_tool_call_update_fields(&mut events).await;
969 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
970}
971
972async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
973 let event = events
974 .next()
975 .await
976 .expect("no tool call authorization event received")
977 .unwrap();
978 match event {
979 ThreadEvent::ToolCall(tool_call) => tool_call,
980 event => {
981 panic!("Unexpected event {event:?}");
982 }
983 }
984}
985
986async fn expect_tool_call_update_fields(
987 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
988) -> acp::ToolCallUpdate {
989 let event = events
990 .next()
991 .await
992 .expect("no tool call authorization event received")
993 .unwrap();
994 match event {
995 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
996 event => {
997 panic!("Unexpected event {event:?}");
998 }
999 }
1000}
1001
1002async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1003 let event = events
1004 .next()
1005 .await
1006 .expect("no plan event received")
1007 .unwrap();
1008 match event {
1009 ThreadEvent::Plan(plan) => plan,
1010 event => {
1011 panic!("Unexpected event {event:?}");
1012 }
1013 }
1014}
1015
1016async fn next_tool_call_authorization(
1017 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1018) -> ToolCallAuthorization {
1019 loop {
1020 let event = events
1021 .next()
1022 .await
1023 .expect("no tool call authorization event received")
1024 .unwrap();
1025 if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1026 let permission_kinds = tool_call_authorization
1027 .options
1028 .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1029 .map(|option| option.kind);
1030 let allow_once = tool_call_authorization
1031 .options
1032 .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1033 .map(|option| option.kind);
1034
1035 assert_eq!(
1036 permission_kinds,
1037 Some(acp::PermissionOptionKind::AllowAlways)
1038 );
1039 assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1040 return tool_call_authorization;
1041 }
1042 }
1043}
1044
1045#[test]
1046fn test_permission_options_terminal_with_pattern() {
1047 let permission_options = ToolPermissionContext::new(
1048 TerminalTool::NAME,
1049 vec!["cargo build --release".to_string()],
1050 )
1051 .build_permission_options();
1052
1053 let PermissionOptions::Dropdown(choices) = permission_options else {
1054 panic!("Expected dropdown permission options");
1055 };
1056
1057 assert_eq!(choices.len(), 3);
1058 let labels: Vec<&str> = choices
1059 .iter()
1060 .map(|choice| choice.allow.name.as_ref())
1061 .collect();
1062 assert!(labels.contains(&"Always for terminal"));
1063 assert!(labels.contains(&"Always for `cargo build` commands"));
1064 assert!(labels.contains(&"Only this time"));
1065}
1066
1067#[test]
1068fn test_permission_options_terminal_command_with_flag_second_token() {
1069 let permission_options =
1070 ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1071 .build_permission_options();
1072
1073 let PermissionOptions::Dropdown(choices) = permission_options else {
1074 panic!("Expected dropdown permission options");
1075 };
1076
1077 assert_eq!(choices.len(), 3);
1078 let labels: Vec<&str> = choices
1079 .iter()
1080 .map(|choice| choice.allow.name.as_ref())
1081 .collect();
1082 assert!(labels.contains(&"Always for terminal"));
1083 assert!(labels.contains(&"Always for `ls` commands"));
1084 assert!(labels.contains(&"Only this time"));
1085}
1086
1087#[test]
1088fn test_permission_options_terminal_single_word_command() {
1089 let permission_options =
1090 ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1091 .build_permission_options();
1092
1093 let PermissionOptions::Dropdown(choices) = permission_options else {
1094 panic!("Expected dropdown permission options");
1095 };
1096
1097 assert_eq!(choices.len(), 3);
1098 let labels: Vec<&str> = choices
1099 .iter()
1100 .map(|choice| choice.allow.name.as_ref())
1101 .collect();
1102 assert!(labels.contains(&"Always for terminal"));
1103 assert!(labels.contains(&"Always for `whoami` commands"));
1104 assert!(labels.contains(&"Only this time"));
1105}
1106
1107#[test]
1108fn test_permission_options_edit_file_with_path_pattern() {
1109 let permission_options =
1110 ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1111 .build_permission_options();
1112
1113 let PermissionOptions::Dropdown(choices) = permission_options else {
1114 panic!("Expected dropdown permission options");
1115 };
1116
1117 let labels: Vec<&str> = choices
1118 .iter()
1119 .map(|choice| choice.allow.name.as_ref())
1120 .collect();
1121 assert!(labels.contains(&"Always for edit file"));
1122 assert!(labels.contains(&"Always for `src/`"));
1123}
1124
1125#[test]
1126fn test_permission_options_fetch_with_domain_pattern() {
1127 let permission_options =
1128 ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1129 .build_permission_options();
1130
1131 let PermissionOptions::Dropdown(choices) = permission_options else {
1132 panic!("Expected dropdown permission options");
1133 };
1134
1135 let labels: Vec<&str> = choices
1136 .iter()
1137 .map(|choice| choice.allow.name.as_ref())
1138 .collect();
1139 assert!(labels.contains(&"Always for fetch"));
1140 assert!(labels.contains(&"Always for `docs.rs`"));
1141}
1142
1143#[test]
1144fn test_permission_options_without_pattern() {
1145 let permission_options = ToolPermissionContext::new(
1146 TerminalTool::NAME,
1147 vec!["./deploy.sh --production".to_string()],
1148 )
1149 .build_permission_options();
1150
1151 let PermissionOptions::Dropdown(choices) = permission_options else {
1152 panic!("Expected dropdown permission options");
1153 };
1154
1155 assert_eq!(choices.len(), 2);
1156 let labels: Vec<&str> = choices
1157 .iter()
1158 .map(|choice| choice.allow.name.as_ref())
1159 .collect();
1160 assert!(labels.contains(&"Always for terminal"));
1161 assert!(labels.contains(&"Only this time"));
1162 assert!(!labels.iter().any(|label| label.contains("commands")));
1163}
1164
1165#[test]
1166fn test_permission_options_symlink_target_are_flat_once_only() {
1167 let permission_options =
1168 ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1169 .build_permission_options();
1170
1171 let PermissionOptions::Flat(options) = permission_options else {
1172 panic!("Expected flat permission options for symlink target authorization");
1173 };
1174
1175 assert_eq!(options.len(), 2);
1176 assert!(options.iter().any(|option| {
1177 option.option_id.0.as_ref() == "allow"
1178 && option.kind == acp::PermissionOptionKind::AllowOnce
1179 }));
1180 assert!(options.iter().any(|option| {
1181 option.option_id.0.as_ref() == "deny"
1182 && option.kind == acp::PermissionOptionKind::RejectOnce
1183 }));
1184}
1185
1186#[test]
1187fn test_permission_option_ids_for_terminal() {
1188 let permission_options = ToolPermissionContext::new(
1189 TerminalTool::NAME,
1190 vec!["cargo build --release".to_string()],
1191 )
1192 .build_permission_options();
1193
1194 let PermissionOptions::Dropdown(choices) = permission_options else {
1195 panic!("Expected dropdown permission options");
1196 };
1197
1198 // Expect 3 choices: always-tool, always-pattern, once
1199 assert_eq!(choices.len(), 3);
1200
1201 // First two choices both use the tool-level option IDs
1202 assert_eq!(
1203 choices[0].allow.option_id.0.as_ref(),
1204 "always_allow:terminal"
1205 );
1206 assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1207 assert!(choices[0].sub_patterns.is_empty());
1208
1209 assert_eq!(
1210 choices[1].allow.option_id.0.as_ref(),
1211 "always_allow:terminal"
1212 );
1213 assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1214 assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1215
1216 // Third choice is the one-time allow/deny
1217 assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1218 assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1219 assert!(choices[2].sub_patterns.is_empty());
1220}
1221
1222#[test]
1223fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1224 let permission_options = ToolPermissionContext::new(
1225 TerminalTool::NAME,
1226 vec!["cargo test 2>&1 | tail".to_string()],
1227 )
1228 .build_permission_options();
1229
1230 let PermissionOptions::DropdownWithPatterns {
1231 choices,
1232 patterns,
1233 tool_name,
1234 } = permission_options
1235 else {
1236 panic!("Expected DropdownWithPatterns permission options for pipeline command");
1237 };
1238
1239 assert_eq!(tool_name, TerminalTool::NAME);
1240
1241 // Should have "Always for terminal" and "Only this time" choices
1242 assert_eq!(choices.len(), 2);
1243 let labels: Vec<&str> = choices
1244 .iter()
1245 .map(|choice| choice.allow.name.as_ref())
1246 .collect();
1247 assert!(labels.contains(&"Always for terminal"));
1248 assert!(labels.contains(&"Only this time"));
1249
1250 // Should have per-command patterns for "cargo test" and "tail"
1251 assert_eq!(patterns.len(), 2);
1252 let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1253 assert!(pattern_names.contains(&"cargo test"));
1254 assert!(pattern_names.contains(&"tail"));
1255
1256 // Verify patterns are valid regex patterns
1257 let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1258 assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1259 assert!(regex_patterns.contains(&"^tail\\b"));
1260}
1261
1262#[test]
1263fn test_permission_options_terminal_pipeline_with_chaining() {
1264 let permission_options = ToolPermissionContext::new(
1265 TerminalTool::NAME,
1266 vec!["npm install && npm test | tail".to_string()],
1267 )
1268 .build_permission_options();
1269
1270 let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1271 panic!("Expected DropdownWithPatterns for chained pipeline command");
1272 };
1273
1274 // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1275 assert_eq!(patterns.len(), 3);
1276 let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1277 assert!(pattern_names.contains(&"npm install"));
1278 assert!(pattern_names.contains(&"npm test"));
1279 assert!(pattern_names.contains(&"tail"));
1280}
1281
1282#[gpui::test]
1283#[cfg_attr(not(feature = "e2e"), ignore)]
1284async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1285 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1286
1287 // Test concurrent tool calls with different delay times
1288 let events = thread
1289 .update(cx, |thread, cx| {
1290 thread.add_tool(DelayTool);
1291 thread.send(
1292 UserMessageId::new(),
1293 [
1294 "Call the delay tool twice in the same message.",
1295 "Once with 100ms. Once with 300ms.",
1296 "When both timers are complete, describe the outputs.",
1297 ],
1298 cx,
1299 )
1300 })
1301 .unwrap()
1302 .collect()
1303 .await;
1304
1305 let stop_reasons = stop_events(events);
1306 assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1307
1308 thread.update(cx, |thread, _cx| {
1309 let last_message = thread.last_received_or_pending_message().unwrap();
1310 let agent_message = last_message.as_agent_message().unwrap();
1311 let text = agent_message
1312 .content
1313 .iter()
1314 .filter_map(|content| {
1315 if let AgentMessageContent::Text(text) = content {
1316 Some(text.as_str())
1317 } else {
1318 None
1319 }
1320 })
1321 .collect::<String>();
1322
1323 assert!(text.contains("Ding"));
1324 });
1325}
1326
1327#[gpui::test]
1328async fn test_profiles(cx: &mut TestAppContext) {
1329 let ThreadTest {
1330 model, thread, fs, ..
1331 } = setup(cx, TestModel::Fake).await;
1332 let fake_model = model.as_fake();
1333
1334 thread.update(cx, |thread, _cx| {
1335 thread.add_tool(DelayTool);
1336 thread.add_tool(EchoTool);
1337 thread.add_tool(InfiniteTool);
1338 });
1339
1340 // Override profiles and wait for settings to be loaded.
1341 fs.insert_file(
1342 paths::settings_file(),
1343 json!({
1344 "agent": {
1345 "profiles": {
1346 "test-1": {
1347 "name": "Test Profile 1",
1348 "tools": {
1349 EchoTool::NAME: true,
1350 DelayTool::NAME: true,
1351 }
1352 },
1353 "test-2": {
1354 "name": "Test Profile 2",
1355 "tools": {
1356 InfiniteTool::NAME: true,
1357 }
1358 }
1359 }
1360 }
1361 })
1362 .to_string()
1363 .into_bytes(),
1364 )
1365 .await;
1366 cx.run_until_parked();
1367
1368 // Test that test-1 profile (default) has echo and delay tools
1369 thread
1370 .update(cx, |thread, cx| {
1371 thread.set_profile(AgentProfileId("test-1".into()), cx);
1372 thread.send(UserMessageId::new(), ["test"], cx)
1373 })
1374 .unwrap();
1375 cx.run_until_parked();
1376
1377 let mut pending_completions = fake_model.pending_completions();
1378 assert_eq!(pending_completions.len(), 1);
1379 let completion = pending_completions.pop().unwrap();
1380 let tool_names: Vec<String> = completion
1381 .tools
1382 .iter()
1383 .map(|tool| tool.name.clone())
1384 .collect();
1385 assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1386 fake_model.end_last_completion_stream();
1387
1388 // Switch to test-2 profile, and verify that it has only the infinite tool.
1389 thread
1390 .update(cx, |thread, cx| {
1391 thread.set_profile(AgentProfileId("test-2".into()), cx);
1392 thread.send(UserMessageId::new(), ["test2"], cx)
1393 })
1394 .unwrap();
1395 cx.run_until_parked();
1396 let mut pending_completions = fake_model.pending_completions();
1397 assert_eq!(pending_completions.len(), 1);
1398 let completion = pending_completions.pop().unwrap();
1399 let tool_names: Vec<String> = completion
1400 .tools
1401 .iter()
1402 .map(|tool| tool.name.clone())
1403 .collect();
1404 assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1405}
1406
1407#[gpui::test]
1408async fn test_mcp_tools(cx: &mut TestAppContext) {
1409 let ThreadTest {
1410 model,
1411 thread,
1412 context_server_store,
1413 fs,
1414 ..
1415 } = setup(cx, TestModel::Fake).await;
1416 let fake_model = model.as_fake();
1417
1418 // Override profiles and wait for settings to be loaded.
1419 fs.insert_file(
1420 paths::settings_file(),
1421 json!({
1422 "agent": {
1423 "tool_permissions": { "default": "allow" },
1424 "profiles": {
1425 "test": {
1426 "name": "Test Profile",
1427 "enable_all_context_servers": true,
1428 "tools": {
1429 EchoTool::NAME: true,
1430 }
1431 },
1432 }
1433 }
1434 })
1435 .to_string()
1436 .into_bytes(),
1437 )
1438 .await;
1439 cx.run_until_parked();
1440 thread.update(cx, |thread, cx| {
1441 thread.set_profile(AgentProfileId("test".into()), cx)
1442 });
1443
1444 let mut mcp_tool_calls = setup_context_server(
1445 "test_server",
1446 vec![context_server::types::Tool {
1447 name: "echo".into(),
1448 description: None,
1449 input_schema: serde_json::to_value(EchoTool::input_schema(
1450 LanguageModelToolSchemaFormat::JsonSchema,
1451 ))
1452 .unwrap(),
1453 output_schema: None,
1454 annotations: None,
1455 }],
1456 &context_server_store,
1457 cx,
1458 );
1459
1460 let events = thread.update(cx, |thread, cx| {
1461 thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1462 });
1463 cx.run_until_parked();
1464
1465 // Simulate the model calling the MCP tool.
1466 let completion = fake_model.pending_completions().pop().unwrap();
1467 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1468 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1469 LanguageModelToolUse {
1470 id: "tool_1".into(),
1471 name: "echo".into(),
1472 raw_input: json!({"text": "test"}).to_string(),
1473 input: json!({"text": "test"}),
1474 is_input_complete: true,
1475 thought_signature: None,
1476 },
1477 ));
1478 fake_model.end_last_completion_stream();
1479 cx.run_until_parked();
1480
1481 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1482 assert_eq!(tool_call_params.name, "echo");
1483 assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1484 tool_call_response
1485 .send(context_server::types::CallToolResponse {
1486 content: vec![context_server::types::ToolResponseContent::Text {
1487 text: "test".into(),
1488 }],
1489 is_error: None,
1490 meta: None,
1491 structured_content: None,
1492 })
1493 .unwrap();
1494 cx.run_until_parked();
1495
1496 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1497 fake_model.send_last_completion_stream_text_chunk("Done!");
1498 fake_model.end_last_completion_stream();
1499 events.collect::<Vec<_>>().await;
1500
1501 // Send again after adding the echo tool, ensuring the name collision is resolved.
1502 let events = thread.update(cx, |thread, cx| {
1503 thread.add_tool(EchoTool);
1504 thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1505 });
1506 cx.run_until_parked();
1507 let completion = fake_model.pending_completions().pop().unwrap();
1508 assert_eq!(
1509 tool_names_for_completion(&completion),
1510 vec!["echo", "test_server_echo"]
1511 );
1512 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1513 LanguageModelToolUse {
1514 id: "tool_2".into(),
1515 name: "test_server_echo".into(),
1516 raw_input: json!({"text": "mcp"}).to_string(),
1517 input: json!({"text": "mcp"}),
1518 is_input_complete: true,
1519 thought_signature: None,
1520 },
1521 ));
1522 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1523 LanguageModelToolUse {
1524 id: "tool_3".into(),
1525 name: "echo".into(),
1526 raw_input: json!({"text": "native"}).to_string(),
1527 input: json!({"text": "native"}),
1528 is_input_complete: true,
1529 thought_signature: None,
1530 },
1531 ));
1532 fake_model.end_last_completion_stream();
1533 cx.run_until_parked();
1534
1535 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1536 assert_eq!(tool_call_params.name, "echo");
1537 assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1538 tool_call_response
1539 .send(context_server::types::CallToolResponse {
1540 content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1541 is_error: None,
1542 meta: None,
1543 structured_content: None,
1544 })
1545 .unwrap();
1546 cx.run_until_parked();
1547
1548 // Ensure the tool results were inserted with the correct names.
1549 let completion = fake_model.pending_completions().pop().unwrap();
1550 assert_eq!(
1551 completion.messages.last().unwrap().content,
1552 vec![
1553 MessageContent::ToolResult(LanguageModelToolResult {
1554 tool_use_id: "tool_3".into(),
1555 tool_name: "echo".into(),
1556 is_error: false,
1557 content: "native".into(),
1558 output: Some("native".into()),
1559 },),
1560 MessageContent::ToolResult(LanguageModelToolResult {
1561 tool_use_id: "tool_2".into(),
1562 tool_name: "test_server_echo".into(),
1563 is_error: false,
1564 content: "mcp".into(),
1565 output: Some("mcp".into()),
1566 },),
1567 ]
1568 );
1569 fake_model.end_last_completion_stream();
1570 events.collect::<Vec<_>>().await;
1571}
1572
1573#[gpui::test]
1574async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1575 let ThreadTest {
1576 model,
1577 thread,
1578 context_server_store,
1579 fs,
1580 ..
1581 } = setup(cx, TestModel::Fake).await;
1582 let fake_model = model.as_fake();
1583
1584 // Setup settings to allow MCP tools
1585 fs.insert_file(
1586 paths::settings_file(),
1587 json!({
1588 "agent": {
1589 "always_allow_tool_actions": true,
1590 "profiles": {
1591 "test": {
1592 "name": "Test Profile",
1593 "enable_all_context_servers": true,
1594 "tools": {}
1595 },
1596 }
1597 }
1598 })
1599 .to_string()
1600 .into_bytes(),
1601 )
1602 .await;
1603 cx.run_until_parked();
1604 thread.update(cx, |thread, cx| {
1605 thread.set_profile(AgentProfileId("test".into()), cx)
1606 });
1607
1608 // Setup a context server with a tool
1609 let mut mcp_tool_calls = setup_context_server(
1610 "github_server",
1611 vec![context_server::types::Tool {
1612 name: "issue_read".into(),
1613 description: Some("Read a GitHub issue".into()),
1614 input_schema: json!({
1615 "type": "object",
1616 "properties": {
1617 "issue_url": { "type": "string" }
1618 }
1619 }),
1620 output_schema: None,
1621 annotations: None,
1622 }],
1623 &context_server_store,
1624 cx,
1625 );
1626
1627 // Send a message and have the model call the MCP tool
1628 let events = thread.update(cx, |thread, cx| {
1629 thread
1630 .send(UserMessageId::new(), ["Read issue #47404"], cx)
1631 .unwrap()
1632 });
1633 cx.run_until_parked();
1634
1635 // Verify the MCP tool is available to the model
1636 let completion = fake_model.pending_completions().pop().unwrap();
1637 assert_eq!(
1638 tool_names_for_completion(&completion),
1639 vec!["issue_read"],
1640 "MCP tool should be available"
1641 );
1642
1643 // Simulate the model calling the MCP tool
1644 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1645 LanguageModelToolUse {
1646 id: "tool_1".into(),
1647 name: "issue_read".into(),
1648 raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1649 .to_string(),
1650 input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1651 is_input_complete: true,
1652 thought_signature: None,
1653 },
1654 ));
1655 fake_model.end_last_completion_stream();
1656 cx.run_until_parked();
1657
1658 // The MCP server receives the tool call and responds with content
1659 let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1660 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1661 assert_eq!(tool_call_params.name, "issue_read");
1662 tool_call_response
1663 .send(context_server::types::CallToolResponse {
1664 content: vec![context_server::types::ToolResponseContent::Text {
1665 text: expected_tool_output.into(),
1666 }],
1667 is_error: None,
1668 meta: None,
1669 structured_content: None,
1670 })
1671 .unwrap();
1672 cx.run_until_parked();
1673
1674 // After tool completes, the model continues with a new completion request
1675 // that includes the tool results. We need to respond to this.
1676 let _completion = fake_model.pending_completions().pop().unwrap();
1677 fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1678 fake_model
1679 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1680 fake_model.end_last_completion_stream();
1681 events.collect::<Vec<_>>().await;
1682
1683 // Verify the tool result is stored in the thread by checking the markdown output.
1684 // The tool result is in the first assistant message (not the last one, which is
1685 // the model's response after the tool completed).
1686 thread.update(cx, |thread, _cx| {
1687 let markdown = thread.to_markdown();
1688 assert!(
1689 markdown.contains("**Tool Result**: issue_read"),
1690 "Thread should contain tool result header"
1691 );
1692 assert!(
1693 markdown.contains(expected_tool_output),
1694 "Thread should contain tool output: {}",
1695 expected_tool_output
1696 );
1697 });
1698
1699 // Simulate app restart: disconnect the MCP server.
1700 // After restart, the MCP server won't be connected yet when the thread is replayed.
1701 context_server_store.update(cx, |store, cx| {
1702 let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1703 });
1704 cx.run_until_parked();
1705
1706 // Replay the thread (this is what happens when loading a saved thread)
1707 let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1708
1709 let mut found_tool_call = None;
1710 let mut found_tool_call_update_with_output = None;
1711
1712 while let Some(event) = replay_events.next().await {
1713 let event = event.unwrap();
1714 match &event {
1715 ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1716 found_tool_call = Some(tc.clone());
1717 }
1718 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1719 if update.tool_call_id.to_string() == "tool_1" =>
1720 {
1721 if update.fields.raw_output.is_some() {
1722 found_tool_call_update_with_output = Some(update.clone());
1723 }
1724 }
1725 _ => {}
1726 }
1727 }
1728
1729 // The tool call should be found
1730 assert!(
1731 found_tool_call.is_some(),
1732 "Tool call should be emitted during replay"
1733 );
1734
1735 assert!(
1736 found_tool_call_update_with_output.is_some(),
1737 "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1738 );
1739
1740 let update = found_tool_call_update_with_output.unwrap();
1741 assert_eq!(
1742 update.fields.raw_output,
1743 Some(expected_tool_output.into()),
1744 "raw_output should contain the saved tool result"
1745 );
1746
1747 // Also verify the status is correct (completed, not failed)
1748 assert_eq!(
1749 update.fields.status,
1750 Some(acp::ToolCallStatus::Completed),
1751 "Tool call status should reflect the original completion status"
1752 );
1753}
1754
1755#[gpui::test]
1756async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1757 let ThreadTest {
1758 model,
1759 thread,
1760 context_server_store,
1761 fs,
1762 ..
1763 } = setup(cx, TestModel::Fake).await;
1764 let fake_model = model.as_fake();
1765
1766 // Set up a profile with all tools enabled
1767 fs.insert_file(
1768 paths::settings_file(),
1769 json!({
1770 "agent": {
1771 "profiles": {
1772 "test": {
1773 "name": "Test Profile",
1774 "enable_all_context_servers": true,
1775 "tools": {
1776 EchoTool::NAME: true,
1777 DelayTool::NAME: true,
1778 WordListTool::NAME: true,
1779 ToolRequiringPermission::NAME: true,
1780 InfiniteTool::NAME: true,
1781 }
1782 },
1783 }
1784 }
1785 })
1786 .to_string()
1787 .into_bytes(),
1788 )
1789 .await;
1790 cx.run_until_parked();
1791
1792 thread.update(cx, |thread, cx| {
1793 thread.set_profile(AgentProfileId("test".into()), cx);
1794 thread.add_tool(EchoTool);
1795 thread.add_tool(DelayTool);
1796 thread.add_tool(WordListTool);
1797 thread.add_tool(ToolRequiringPermission);
1798 thread.add_tool(InfiniteTool);
1799 });
1800
1801 // Set up multiple context servers with some overlapping tool names
1802 let _server1_calls = setup_context_server(
1803 "xxx",
1804 vec![
1805 context_server::types::Tool {
1806 name: "echo".into(), // Conflicts with native EchoTool
1807 description: None,
1808 input_schema: serde_json::to_value(EchoTool::input_schema(
1809 LanguageModelToolSchemaFormat::JsonSchema,
1810 ))
1811 .unwrap(),
1812 output_schema: None,
1813 annotations: None,
1814 },
1815 context_server::types::Tool {
1816 name: "unique_tool_1".into(),
1817 description: None,
1818 input_schema: json!({"type": "object", "properties": {}}),
1819 output_schema: None,
1820 annotations: None,
1821 },
1822 ],
1823 &context_server_store,
1824 cx,
1825 );
1826
1827 let _server2_calls = setup_context_server(
1828 "yyy",
1829 vec![
1830 context_server::types::Tool {
1831 name: "echo".into(), // Also conflicts with native EchoTool
1832 description: None,
1833 input_schema: serde_json::to_value(EchoTool::input_schema(
1834 LanguageModelToolSchemaFormat::JsonSchema,
1835 ))
1836 .unwrap(),
1837 output_schema: None,
1838 annotations: None,
1839 },
1840 context_server::types::Tool {
1841 name: "unique_tool_2".into(),
1842 description: None,
1843 input_schema: json!({"type": "object", "properties": {}}),
1844 output_schema: None,
1845 annotations: None,
1846 },
1847 context_server::types::Tool {
1848 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1849 description: None,
1850 input_schema: json!({"type": "object", "properties": {}}),
1851 output_schema: None,
1852 annotations: None,
1853 },
1854 context_server::types::Tool {
1855 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1856 description: None,
1857 input_schema: json!({"type": "object", "properties": {}}),
1858 output_schema: None,
1859 annotations: None,
1860 },
1861 ],
1862 &context_server_store,
1863 cx,
1864 );
1865 let _server3_calls = setup_context_server(
1866 "zzz",
1867 vec![
1868 context_server::types::Tool {
1869 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1870 description: None,
1871 input_schema: json!({"type": "object", "properties": {}}),
1872 output_schema: None,
1873 annotations: None,
1874 },
1875 context_server::types::Tool {
1876 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1877 description: None,
1878 input_schema: json!({"type": "object", "properties": {}}),
1879 output_schema: None,
1880 annotations: None,
1881 },
1882 context_server::types::Tool {
1883 name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1884 description: None,
1885 input_schema: json!({"type": "object", "properties": {}}),
1886 output_schema: None,
1887 annotations: None,
1888 },
1889 ],
1890 &context_server_store,
1891 cx,
1892 );
1893
1894 // Server with spaces in name - tests snake_case conversion for API compatibility
1895 let _server4_calls = setup_context_server(
1896 "Azure DevOps",
1897 vec![context_server::types::Tool {
1898 name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1899 description: None,
1900 input_schema: serde_json::to_value(EchoTool::input_schema(
1901 LanguageModelToolSchemaFormat::JsonSchema,
1902 ))
1903 .unwrap(),
1904 output_schema: None,
1905 annotations: None,
1906 }],
1907 &context_server_store,
1908 cx,
1909 );
1910
1911 thread
1912 .update(cx, |thread, cx| {
1913 thread.send(UserMessageId::new(), ["Go"], cx)
1914 })
1915 .unwrap();
1916 cx.run_until_parked();
1917 let completion = fake_model.pending_completions().pop().unwrap();
1918 assert_eq!(
1919 tool_names_for_completion(&completion),
1920 vec![
1921 "azure_dev_ops_echo",
1922 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1923 "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1924 "delay",
1925 "echo",
1926 "infinite",
1927 "tool_requiring_permission",
1928 "unique_tool_1",
1929 "unique_tool_2",
1930 "word_list",
1931 "xxx_echo",
1932 "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1933 "yyy_echo",
1934 "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1935 ]
1936 );
1937}
1938
1939#[gpui::test]
1940#[cfg_attr(not(feature = "e2e"), ignore)]
1941async fn test_cancellation(cx: &mut TestAppContext) {
1942 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1943
1944 let mut events = thread
1945 .update(cx, |thread, cx| {
1946 thread.add_tool(InfiniteTool);
1947 thread.add_tool(EchoTool);
1948 thread.send(
1949 UserMessageId::new(),
1950 ["Call the echo tool, then call the infinite tool, then explain their output"],
1951 cx,
1952 )
1953 })
1954 .unwrap();
1955
1956 // Wait until both tools are called.
1957 let mut expected_tools = vec!["Echo", "Infinite Tool"];
1958 let mut echo_id = None;
1959 let mut echo_completed = false;
1960 while let Some(event) = events.next().await {
1961 match event.unwrap() {
1962 ThreadEvent::ToolCall(tool_call) => {
1963 assert_eq!(tool_call.title, expected_tools.remove(0));
1964 if tool_call.title == "Echo" {
1965 echo_id = Some(tool_call.tool_call_id);
1966 }
1967 }
1968 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1969 acp::ToolCallUpdate {
1970 tool_call_id,
1971 fields:
1972 acp::ToolCallUpdateFields {
1973 status: Some(acp::ToolCallStatus::Completed),
1974 ..
1975 },
1976 ..
1977 },
1978 )) if Some(&tool_call_id) == echo_id.as_ref() => {
1979 echo_completed = true;
1980 }
1981 _ => {}
1982 }
1983
1984 if expected_tools.is_empty() && echo_completed {
1985 break;
1986 }
1987 }
1988
1989 // Cancel the current send and ensure that the event stream is closed, even
1990 // if one of the tools is still running.
1991 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1992 let events = events.collect::<Vec<_>>().await;
1993 let last_event = events.last();
1994 assert!(
1995 matches!(
1996 last_event,
1997 Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1998 ),
1999 "unexpected event {last_event:?}"
2000 );
2001
2002 // Ensure we can still send a new message after cancellation.
2003 let events = thread
2004 .update(cx, |thread, cx| {
2005 thread.send(
2006 UserMessageId::new(),
2007 ["Testing: reply with 'Hello' then stop."],
2008 cx,
2009 )
2010 })
2011 .unwrap()
2012 .collect::<Vec<_>>()
2013 .await;
2014 thread.update(cx, |thread, _cx| {
2015 let message = thread.last_received_or_pending_message().unwrap();
2016 let agent_message = message.as_agent_message().unwrap();
2017 assert_eq!(
2018 agent_message.content,
2019 vec![AgentMessageContent::Text("Hello".to_string())]
2020 );
2021 });
2022 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2023}
2024
2025#[gpui::test]
2026async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2027 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2028 always_allow_tools(cx);
2029 let fake_model = model.as_fake();
2030
2031 let environment = Rc::new(cx.update(|cx| {
2032 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2033 }));
2034 let handle = environment.terminal_handle.clone().unwrap();
2035
2036 let mut events = thread
2037 .update(cx, |thread, cx| {
2038 thread.add_tool(crate::TerminalTool::new(
2039 thread.project().clone(),
2040 environment,
2041 ));
2042 thread.send(UserMessageId::new(), ["run a command"], cx)
2043 })
2044 .unwrap();
2045
2046 cx.run_until_parked();
2047
2048 // Simulate the model calling the terminal tool
2049 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2050 LanguageModelToolUse {
2051 id: "terminal_tool_1".into(),
2052 name: TerminalTool::NAME.into(),
2053 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2054 input: json!({"command": "sleep 1000", "cd": "."}),
2055 is_input_complete: true,
2056 thought_signature: None,
2057 },
2058 ));
2059 fake_model.end_last_completion_stream();
2060
2061 // Wait for the terminal tool to start running
2062 wait_for_terminal_tool_started(&mut events, cx).await;
2063
2064 // Cancel the thread while the terminal is running
2065 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2066
2067 // Collect remaining events, driving the executor to let cancellation complete
2068 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2069
2070 // Verify the terminal was killed
2071 assert!(
2072 handle.was_killed(),
2073 "expected terminal handle to be killed on cancellation"
2074 );
2075
2076 // Verify we got a cancellation stop event
2077 assert_eq!(
2078 stop_events(remaining_events),
2079 vec![acp::StopReason::Cancelled],
2080 );
2081
2082 // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2083 thread.update(cx, |thread, _cx| {
2084 let message = thread.last_received_or_pending_message().unwrap();
2085 let agent_message = message.as_agent_message().unwrap();
2086
2087 let tool_use = agent_message
2088 .content
2089 .iter()
2090 .find_map(|content| match content {
2091 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2092 _ => None,
2093 })
2094 .expect("expected tool use in agent message");
2095
2096 let tool_result = agent_message
2097 .tool_results
2098 .get(&tool_use.id)
2099 .expect("expected tool result");
2100
2101 let result_text = match &tool_result.content {
2102 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2103 _ => panic!("expected text content in tool result"),
2104 };
2105
2106 // "partial output" comes from FakeTerminalHandle's output field
2107 assert!(
2108 result_text.contains("partial output"),
2109 "expected tool result to contain terminal output, got: {result_text}"
2110 );
2111 // Match the actual format from process_content in terminal_tool.rs
2112 assert!(
2113 result_text.contains("The user stopped this command"),
2114 "expected tool result to indicate user stopped, got: {result_text}"
2115 );
2116 });
2117
2118 // Verify we can send a new message after cancellation
2119 verify_thread_recovery(&thread, &fake_model, cx).await;
2120}
2121
2122#[gpui::test]
2123async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2124 // This test verifies that tools which properly handle cancellation via
2125 // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2126 // to cancellation and report that they were cancelled.
2127 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2128 always_allow_tools(cx);
2129 let fake_model = model.as_fake();
2130
2131 let (tool, was_cancelled) = CancellationAwareTool::new();
2132
2133 let mut events = thread
2134 .update(cx, |thread, cx| {
2135 thread.add_tool(tool);
2136 thread.send(
2137 UserMessageId::new(),
2138 ["call the cancellation aware tool"],
2139 cx,
2140 )
2141 })
2142 .unwrap();
2143
2144 cx.run_until_parked();
2145
2146 // Simulate the model calling the cancellation-aware tool
2147 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2148 LanguageModelToolUse {
2149 id: "cancellation_aware_1".into(),
2150 name: "cancellation_aware".into(),
2151 raw_input: r#"{}"#.into(),
2152 input: json!({}),
2153 is_input_complete: true,
2154 thought_signature: None,
2155 },
2156 ));
2157 fake_model.end_last_completion_stream();
2158
2159 cx.run_until_parked();
2160
2161 // Wait for the tool call to be reported
2162 let mut tool_started = false;
2163 let deadline = cx.executor().num_cpus() * 100;
2164 for _ in 0..deadline {
2165 cx.run_until_parked();
2166
2167 while let Some(Some(event)) = events.next().now_or_never() {
2168 if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2169 if tool_call.title == "Cancellation Aware Tool" {
2170 tool_started = true;
2171 break;
2172 }
2173 }
2174 }
2175
2176 if tool_started {
2177 break;
2178 }
2179
2180 cx.background_executor
2181 .timer(Duration::from_millis(10))
2182 .await;
2183 }
2184 assert!(tool_started, "expected cancellation aware tool to start");
2185
2186 // Cancel the thread and wait for it to complete
2187 let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2188
2189 // The cancel task should complete promptly because the tool handles cancellation
2190 let timeout = cx.background_executor.timer(Duration::from_secs(5));
2191 futures::select! {
2192 _ = cancel_task.fuse() => {}
2193 _ = timeout.fuse() => {
2194 panic!("cancel task timed out - tool did not respond to cancellation");
2195 }
2196 }
2197
2198 // Verify the tool detected cancellation via its flag
2199 assert!(
2200 was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2201 "tool should have detected cancellation via event_stream.cancelled_by_user()"
2202 );
2203
2204 // Collect remaining events
2205 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2206
2207 // Verify we got a cancellation stop event
2208 assert_eq!(
2209 stop_events(remaining_events),
2210 vec![acp::StopReason::Cancelled],
2211 );
2212
2213 // Verify we can send a new message after cancellation
2214 verify_thread_recovery(&thread, &fake_model, cx).await;
2215}
2216
2217/// Helper to verify thread can recover after cancellation by sending a simple message.
2218async fn verify_thread_recovery(
2219 thread: &Entity<Thread>,
2220 fake_model: &FakeLanguageModel,
2221 cx: &mut TestAppContext,
2222) {
2223 let events = thread
2224 .update(cx, |thread, cx| {
2225 thread.send(
2226 UserMessageId::new(),
2227 ["Testing: reply with 'Hello' then stop."],
2228 cx,
2229 )
2230 })
2231 .unwrap();
2232 cx.run_until_parked();
2233 fake_model.send_last_completion_stream_text_chunk("Hello");
2234 fake_model
2235 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2236 fake_model.end_last_completion_stream();
2237
2238 let events = events.collect::<Vec<_>>().await;
2239 thread.update(cx, |thread, _cx| {
2240 let message = thread.last_received_or_pending_message().unwrap();
2241 let agent_message = message.as_agent_message().unwrap();
2242 assert_eq!(
2243 agent_message.content,
2244 vec![AgentMessageContent::Text("Hello".to_string())]
2245 );
2246 });
2247 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2248}
2249
2250/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2251async fn wait_for_terminal_tool_started(
2252 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2253 cx: &mut TestAppContext,
2254) {
2255 let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2256 for _ in 0..deadline {
2257 cx.run_until_parked();
2258
2259 while let Some(Some(event)) = events.next().now_or_never() {
2260 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2261 update,
2262 ))) = &event
2263 {
2264 if update.fields.content.as_ref().is_some_and(|content| {
2265 content
2266 .iter()
2267 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2268 }) {
2269 return;
2270 }
2271 }
2272 }
2273
2274 cx.background_executor
2275 .timer(Duration::from_millis(10))
2276 .await;
2277 }
2278 panic!("terminal tool did not start within the expected time");
2279}
2280
2281/// Collects events until a Stop event is received, driving the executor to completion.
2282async fn collect_events_until_stop(
2283 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2284 cx: &mut TestAppContext,
2285) -> Vec<Result<ThreadEvent>> {
2286 let mut collected = Vec::new();
2287 let deadline = cx.executor().num_cpus() * 200;
2288
2289 for _ in 0..deadline {
2290 cx.executor().advance_clock(Duration::from_millis(10));
2291 cx.run_until_parked();
2292
2293 while let Some(Some(event)) = events.next().now_or_never() {
2294 let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2295 collected.push(event);
2296 if is_stop {
2297 return collected;
2298 }
2299 }
2300 }
2301 panic!(
2302 "did not receive Stop event within the expected time; collected {} events",
2303 collected.len()
2304 );
2305}
2306
2307#[gpui::test]
2308async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2309 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2310 always_allow_tools(cx);
2311 let fake_model = model.as_fake();
2312
2313 let environment = Rc::new(cx.update(|cx| {
2314 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2315 }));
2316 let handle = environment.terminal_handle.clone().unwrap();
2317
2318 let message_id = UserMessageId::new();
2319 let mut events = thread
2320 .update(cx, |thread, cx| {
2321 thread.add_tool(crate::TerminalTool::new(
2322 thread.project().clone(),
2323 environment,
2324 ));
2325 thread.send(message_id.clone(), ["run a command"], cx)
2326 })
2327 .unwrap();
2328
2329 cx.run_until_parked();
2330
2331 // Simulate the model calling the terminal tool
2332 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2333 LanguageModelToolUse {
2334 id: "terminal_tool_1".into(),
2335 name: TerminalTool::NAME.into(),
2336 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2337 input: json!({"command": "sleep 1000", "cd": "."}),
2338 is_input_complete: true,
2339 thought_signature: None,
2340 },
2341 ));
2342 fake_model.end_last_completion_stream();
2343
2344 // Wait for the terminal tool to start running
2345 wait_for_terminal_tool_started(&mut events, cx).await;
2346
2347 // Truncate the thread while the terminal is running
2348 thread
2349 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2350 .unwrap();
2351
2352 // Drive the executor to let cancellation complete
2353 let _ = collect_events_until_stop(&mut events, cx).await;
2354
2355 // Verify the terminal was killed
2356 assert!(
2357 handle.was_killed(),
2358 "expected terminal handle to be killed on truncate"
2359 );
2360
2361 // Verify the thread is empty after truncation
2362 thread.update(cx, |thread, _cx| {
2363 assert_eq!(
2364 thread.to_markdown(),
2365 "",
2366 "expected thread to be empty after truncating the only message"
2367 );
2368 });
2369
2370 // Verify we can send a new message after truncation
2371 verify_thread_recovery(&thread, &fake_model, cx).await;
2372}
2373
2374#[gpui::test]
2375async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2376 // Tests that cancellation properly kills all running terminal tools when multiple are active.
2377 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2378 always_allow_tools(cx);
2379 let fake_model = model.as_fake();
2380
2381 let environment = Rc::new(MultiTerminalEnvironment::new());
2382
2383 let mut events = thread
2384 .update(cx, |thread, cx| {
2385 thread.add_tool(crate::TerminalTool::new(
2386 thread.project().clone(),
2387 environment.clone(),
2388 ));
2389 thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2390 })
2391 .unwrap();
2392
2393 cx.run_until_parked();
2394
2395 // Simulate the model calling two terminal tools
2396 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2397 LanguageModelToolUse {
2398 id: "terminal_tool_1".into(),
2399 name: TerminalTool::NAME.into(),
2400 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2401 input: json!({"command": "sleep 1000", "cd": "."}),
2402 is_input_complete: true,
2403 thought_signature: None,
2404 },
2405 ));
2406 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2407 LanguageModelToolUse {
2408 id: "terminal_tool_2".into(),
2409 name: TerminalTool::NAME.into(),
2410 raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2411 input: json!({"command": "sleep 2000", "cd": "."}),
2412 is_input_complete: true,
2413 thought_signature: None,
2414 },
2415 ));
2416 fake_model.end_last_completion_stream();
2417
2418 // Wait for both terminal tools to start by counting terminal content updates
2419 let mut terminals_started = 0;
2420 let deadline = cx.executor().num_cpus() * 100;
2421 for _ in 0..deadline {
2422 cx.run_until_parked();
2423
2424 while let Some(Some(event)) = events.next().now_or_never() {
2425 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2426 update,
2427 ))) = &event
2428 {
2429 if update.fields.content.as_ref().is_some_and(|content| {
2430 content
2431 .iter()
2432 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2433 }) {
2434 terminals_started += 1;
2435 if terminals_started >= 2 {
2436 break;
2437 }
2438 }
2439 }
2440 }
2441 if terminals_started >= 2 {
2442 break;
2443 }
2444
2445 cx.background_executor
2446 .timer(Duration::from_millis(10))
2447 .await;
2448 }
2449 assert!(
2450 terminals_started >= 2,
2451 "expected 2 terminal tools to start, got {terminals_started}"
2452 );
2453
2454 // Cancel the thread while both terminals are running
2455 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2456
2457 // Collect remaining events
2458 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2459
2460 // Verify both terminal handles were killed
2461 let handles = environment.handles();
2462 assert_eq!(
2463 handles.len(),
2464 2,
2465 "expected 2 terminal handles to be created"
2466 );
2467 assert!(
2468 handles[0].was_killed(),
2469 "expected first terminal handle to be killed on cancellation"
2470 );
2471 assert!(
2472 handles[1].was_killed(),
2473 "expected second terminal handle to be killed on cancellation"
2474 );
2475
2476 // Verify we got a cancellation stop event
2477 assert_eq!(
2478 stop_events(remaining_events),
2479 vec![acp::StopReason::Cancelled],
2480 );
2481}
2482
2483#[gpui::test]
2484async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2485 // Tests that clicking the stop button on the terminal card (as opposed to the main
2486 // cancel button) properly reports user stopped via the was_stopped_by_user path.
2487 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2488 always_allow_tools(cx);
2489 let fake_model = model.as_fake();
2490
2491 let environment = Rc::new(cx.update(|cx| {
2492 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2493 }));
2494 let handle = environment.terminal_handle.clone().unwrap();
2495
2496 let mut events = thread
2497 .update(cx, |thread, cx| {
2498 thread.add_tool(crate::TerminalTool::new(
2499 thread.project().clone(),
2500 environment,
2501 ));
2502 thread.send(UserMessageId::new(), ["run a command"], cx)
2503 })
2504 .unwrap();
2505
2506 cx.run_until_parked();
2507
2508 // Simulate the model calling the terminal tool
2509 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2510 LanguageModelToolUse {
2511 id: "terminal_tool_1".into(),
2512 name: TerminalTool::NAME.into(),
2513 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2514 input: json!({"command": "sleep 1000", "cd": "."}),
2515 is_input_complete: true,
2516 thought_signature: None,
2517 },
2518 ));
2519 fake_model.end_last_completion_stream();
2520
2521 // Wait for the terminal tool to start running
2522 wait_for_terminal_tool_started(&mut events, cx).await;
2523
2524 // Simulate user clicking stop on the terminal card itself.
2525 // This sets the flag and signals exit (simulating what the real UI would do).
2526 handle.set_stopped_by_user(true);
2527 handle.killed.store(true, Ordering::SeqCst);
2528 handle.signal_exit();
2529
2530 // Wait for the tool to complete
2531 cx.run_until_parked();
2532
2533 // The thread continues after tool completion - simulate the model ending its turn
2534 fake_model
2535 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2536 fake_model.end_last_completion_stream();
2537
2538 // Collect remaining events
2539 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2540
2541 // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2542 assert_eq!(
2543 stop_events(remaining_events),
2544 vec![acp::StopReason::EndTurn],
2545 );
2546
2547 // Verify the tool result indicates user stopped
2548 thread.update(cx, |thread, _cx| {
2549 let message = thread.last_received_or_pending_message().unwrap();
2550 let agent_message = message.as_agent_message().unwrap();
2551
2552 let tool_use = agent_message
2553 .content
2554 .iter()
2555 .find_map(|content| match content {
2556 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2557 _ => None,
2558 })
2559 .expect("expected tool use in agent message");
2560
2561 let tool_result = agent_message
2562 .tool_results
2563 .get(&tool_use.id)
2564 .expect("expected tool result");
2565
2566 let result_text = match &tool_result.content {
2567 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2568 _ => panic!("expected text content in tool result"),
2569 };
2570
2571 assert!(
2572 result_text.contains("The user stopped this command"),
2573 "expected tool result to indicate user stopped, got: {result_text}"
2574 );
2575 });
2576}
2577
2578#[gpui::test]
2579async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2580 // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2581 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2582 always_allow_tools(cx);
2583 let fake_model = model.as_fake();
2584
2585 let environment = Rc::new(cx.update(|cx| {
2586 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2587 }));
2588 let handle = environment.terminal_handle.clone().unwrap();
2589
2590 let mut events = thread
2591 .update(cx, |thread, cx| {
2592 thread.add_tool(crate::TerminalTool::new(
2593 thread.project().clone(),
2594 environment,
2595 ));
2596 thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2597 })
2598 .unwrap();
2599
2600 cx.run_until_parked();
2601
2602 // Simulate the model calling the terminal tool with a short timeout
2603 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2604 LanguageModelToolUse {
2605 id: "terminal_tool_1".into(),
2606 name: TerminalTool::NAME.into(),
2607 raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2608 input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2609 is_input_complete: true,
2610 thought_signature: None,
2611 },
2612 ));
2613 fake_model.end_last_completion_stream();
2614
2615 // Wait for the terminal tool to start running
2616 wait_for_terminal_tool_started(&mut events, cx).await;
2617
2618 // Advance clock past the timeout
2619 cx.executor().advance_clock(Duration::from_millis(200));
2620 cx.run_until_parked();
2621
2622 // The thread continues after tool completion - simulate the model ending its turn
2623 fake_model
2624 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2625 fake_model.end_last_completion_stream();
2626
2627 // Collect remaining events
2628 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2629
2630 // Verify the terminal was killed due to timeout
2631 assert!(
2632 handle.was_killed(),
2633 "expected terminal handle to be killed on timeout"
2634 );
2635
2636 // Verify we got an EndTurn (the tool completed, just with timeout)
2637 assert_eq!(
2638 stop_events(remaining_events),
2639 vec![acp::StopReason::EndTurn],
2640 );
2641
2642 // Verify the tool result indicates timeout, not user stopped
2643 thread.update(cx, |thread, _cx| {
2644 let message = thread.last_received_or_pending_message().unwrap();
2645 let agent_message = message.as_agent_message().unwrap();
2646
2647 let tool_use = agent_message
2648 .content
2649 .iter()
2650 .find_map(|content| match content {
2651 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2652 _ => None,
2653 })
2654 .expect("expected tool use in agent message");
2655
2656 let tool_result = agent_message
2657 .tool_results
2658 .get(&tool_use.id)
2659 .expect("expected tool result");
2660
2661 let result_text = match &tool_result.content {
2662 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2663 _ => panic!("expected text content in tool result"),
2664 };
2665
2666 assert!(
2667 result_text.contains("timed out"),
2668 "expected tool result to indicate timeout, got: {result_text}"
2669 );
2670 assert!(
2671 !result_text.contains("The user stopped"),
2672 "tool result should not mention user stopped when it timed out, got: {result_text}"
2673 );
2674 });
2675}
2676
2677#[gpui::test]
2678async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2679 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2680 let fake_model = model.as_fake();
2681
2682 let events_1 = thread
2683 .update(cx, |thread, cx| {
2684 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2685 })
2686 .unwrap();
2687 cx.run_until_parked();
2688 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2689 cx.run_until_parked();
2690
2691 let events_2 = thread
2692 .update(cx, |thread, cx| {
2693 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2694 })
2695 .unwrap();
2696 cx.run_until_parked();
2697 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2698 fake_model
2699 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2700 fake_model.end_last_completion_stream();
2701
2702 let events_1 = events_1.collect::<Vec<_>>().await;
2703 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2704 let events_2 = events_2.collect::<Vec<_>>().await;
2705 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2706}
2707
2708#[gpui::test]
2709async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2710 // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2711 // the retry loop waits on a timer. If the user switches models and sends a new message
2712 // during that delay, the old turn should exit immediately instead of retrying with the
2713 // stale model.
2714 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2715 let model_a = model.as_fake();
2716
2717 // Start a turn with model_a.
2718 let events_1 = thread
2719 .update(cx, |thread, cx| {
2720 thread.send(UserMessageId::new(), ["Hello"], cx)
2721 })
2722 .unwrap();
2723 cx.run_until_parked();
2724 assert_eq!(model_a.completion_count(), 1);
2725
2726 // Model returns a retryable upstream 500. The turn enters the retry delay.
2727 model_a.send_last_completion_stream_error(
2728 LanguageModelCompletionError::UpstreamProviderError {
2729 message: "Internal server error".to_string(),
2730 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2731 retry_after: None,
2732 },
2733 );
2734 model_a.end_last_completion_stream();
2735 cx.run_until_parked();
2736
2737 // The old completion was consumed; model_a has no pending requests yet because the
2738 // retry timer hasn't fired.
2739 assert_eq!(model_a.completion_count(), 0);
2740
2741 // Switch to model_b and send a new message. This cancels the old turn.
2742 let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2743 "fake", "model-b", "Model B", false,
2744 ));
2745 thread.update(cx, |thread, cx| {
2746 thread.set_model(model_b.clone(), cx);
2747 });
2748 let events_2 = thread
2749 .update(cx, |thread, cx| {
2750 thread.send(UserMessageId::new(), ["Continue"], cx)
2751 })
2752 .unwrap();
2753 cx.run_until_parked();
2754
2755 // model_b should have received its completion request.
2756 assert_eq!(model_b.as_fake().completion_count(), 1);
2757
2758 // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2759 cx.executor().advance_clock(Duration::from_secs(10));
2760 cx.run_until_parked();
2761
2762 // model_a must NOT have received another completion request — the cancelled turn
2763 // should have exited during the retry delay rather than retrying with the old model.
2764 assert_eq!(
2765 model_a.completion_count(),
2766 0,
2767 "old model should not receive a retry request after cancellation"
2768 );
2769
2770 // Complete model_b's turn.
2771 model_b
2772 .as_fake()
2773 .send_last_completion_stream_text_chunk("Done!");
2774 model_b
2775 .as_fake()
2776 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2777 model_b.as_fake().end_last_completion_stream();
2778
2779 let events_1 = events_1.collect::<Vec<_>>().await;
2780 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2781
2782 let events_2 = events_2.collect::<Vec<_>>().await;
2783 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2784}
2785
2786#[gpui::test]
2787async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2788 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2789 let fake_model = model.as_fake();
2790
2791 let events_1 = thread
2792 .update(cx, |thread, cx| {
2793 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2794 })
2795 .unwrap();
2796 cx.run_until_parked();
2797 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2798 fake_model
2799 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2800 fake_model.end_last_completion_stream();
2801 let events_1 = events_1.collect::<Vec<_>>().await;
2802
2803 let events_2 = thread
2804 .update(cx, |thread, cx| {
2805 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2806 })
2807 .unwrap();
2808 cx.run_until_parked();
2809 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2810 fake_model
2811 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2812 fake_model.end_last_completion_stream();
2813 let events_2 = events_2.collect::<Vec<_>>().await;
2814
2815 assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2816 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2817}
2818
2819#[gpui::test]
2820async fn test_refusal(cx: &mut TestAppContext) {
2821 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2822 let fake_model = model.as_fake();
2823
2824 let events = thread
2825 .update(cx, |thread, cx| {
2826 thread.send(UserMessageId::new(), ["Hello"], cx)
2827 })
2828 .unwrap();
2829 cx.run_until_parked();
2830 thread.read_with(cx, |thread, _| {
2831 assert_eq!(
2832 thread.to_markdown(),
2833 indoc! {"
2834 ## User
2835
2836 Hello
2837 "}
2838 );
2839 });
2840
2841 fake_model.send_last_completion_stream_text_chunk("Hey!");
2842 cx.run_until_parked();
2843 thread.read_with(cx, |thread, _| {
2844 assert_eq!(
2845 thread.to_markdown(),
2846 indoc! {"
2847 ## User
2848
2849 Hello
2850
2851 ## Assistant
2852
2853 Hey!
2854 "}
2855 );
2856 });
2857
2858 // If the model refuses to continue, the thread should remove all the messages after the last user message.
2859 fake_model
2860 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2861 let events = events.collect::<Vec<_>>().await;
2862 assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2863 thread.read_with(cx, |thread, _| {
2864 assert_eq!(thread.to_markdown(), "");
2865 });
2866}
2867
2868#[gpui::test]
2869async fn test_truncate_first_message(cx: &mut TestAppContext) {
2870 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2871 let fake_model = model.as_fake();
2872
2873 let message_id = UserMessageId::new();
2874 thread
2875 .update(cx, |thread, cx| {
2876 thread.send(message_id.clone(), ["Hello"], cx)
2877 })
2878 .unwrap();
2879 cx.run_until_parked();
2880 thread.read_with(cx, |thread, _| {
2881 assert_eq!(
2882 thread.to_markdown(),
2883 indoc! {"
2884 ## User
2885
2886 Hello
2887 "}
2888 );
2889 assert_eq!(thread.latest_token_usage(), None);
2890 });
2891
2892 fake_model.send_last_completion_stream_text_chunk("Hey!");
2893 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2894 language_model::TokenUsage {
2895 input_tokens: 32_000,
2896 output_tokens: 16_000,
2897 cache_creation_input_tokens: 0,
2898 cache_read_input_tokens: 0,
2899 },
2900 ));
2901 cx.run_until_parked();
2902 thread.read_with(cx, |thread, _| {
2903 assert_eq!(
2904 thread.to_markdown(),
2905 indoc! {"
2906 ## User
2907
2908 Hello
2909
2910 ## Assistant
2911
2912 Hey!
2913 "}
2914 );
2915 assert_eq!(
2916 thread.latest_token_usage(),
2917 Some(acp_thread::TokenUsage {
2918 used_tokens: 32_000 + 16_000,
2919 max_tokens: 1_000_000,
2920 max_output_tokens: None,
2921 input_tokens: 32_000,
2922 output_tokens: 16_000,
2923 })
2924 );
2925 });
2926
2927 thread
2928 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2929 .unwrap();
2930 cx.run_until_parked();
2931 thread.read_with(cx, |thread, _| {
2932 assert_eq!(thread.to_markdown(), "");
2933 assert_eq!(thread.latest_token_usage(), None);
2934 });
2935
2936 // Ensure we can still send a new message after truncation.
2937 thread
2938 .update(cx, |thread, cx| {
2939 thread.send(UserMessageId::new(), ["Hi"], cx)
2940 })
2941 .unwrap();
2942 thread.update(cx, |thread, _cx| {
2943 assert_eq!(
2944 thread.to_markdown(),
2945 indoc! {"
2946 ## User
2947
2948 Hi
2949 "}
2950 );
2951 });
2952 cx.run_until_parked();
2953 fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2954 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2955 language_model::TokenUsage {
2956 input_tokens: 40_000,
2957 output_tokens: 20_000,
2958 cache_creation_input_tokens: 0,
2959 cache_read_input_tokens: 0,
2960 },
2961 ));
2962 cx.run_until_parked();
2963 thread.read_with(cx, |thread, _| {
2964 assert_eq!(
2965 thread.to_markdown(),
2966 indoc! {"
2967 ## User
2968
2969 Hi
2970
2971 ## Assistant
2972
2973 Ahoy!
2974 "}
2975 );
2976
2977 assert_eq!(
2978 thread.latest_token_usage(),
2979 Some(acp_thread::TokenUsage {
2980 used_tokens: 40_000 + 20_000,
2981 max_tokens: 1_000_000,
2982 max_output_tokens: None,
2983 input_tokens: 40_000,
2984 output_tokens: 20_000,
2985 })
2986 );
2987 });
2988}
2989
2990#[gpui::test]
2991async fn test_truncate_second_message(cx: &mut TestAppContext) {
2992 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2993 let fake_model = model.as_fake();
2994
2995 thread
2996 .update(cx, |thread, cx| {
2997 thread.send(UserMessageId::new(), ["Message 1"], cx)
2998 })
2999 .unwrap();
3000 cx.run_until_parked();
3001 fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3002 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3003 language_model::TokenUsage {
3004 input_tokens: 32_000,
3005 output_tokens: 16_000,
3006 cache_creation_input_tokens: 0,
3007 cache_read_input_tokens: 0,
3008 },
3009 ));
3010 fake_model.end_last_completion_stream();
3011 cx.run_until_parked();
3012
3013 let assert_first_message_state = |cx: &mut TestAppContext| {
3014 thread.clone().read_with(cx, |thread, _| {
3015 assert_eq!(
3016 thread.to_markdown(),
3017 indoc! {"
3018 ## User
3019
3020 Message 1
3021
3022 ## Assistant
3023
3024 Message 1 response
3025 "}
3026 );
3027
3028 assert_eq!(
3029 thread.latest_token_usage(),
3030 Some(acp_thread::TokenUsage {
3031 used_tokens: 32_000 + 16_000,
3032 max_tokens: 1_000_000,
3033 max_output_tokens: None,
3034 input_tokens: 32_000,
3035 output_tokens: 16_000,
3036 })
3037 );
3038 });
3039 };
3040
3041 assert_first_message_state(cx);
3042
3043 let second_message_id = UserMessageId::new();
3044 thread
3045 .update(cx, |thread, cx| {
3046 thread.send(second_message_id.clone(), ["Message 2"], cx)
3047 })
3048 .unwrap();
3049 cx.run_until_parked();
3050
3051 fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3052 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3053 language_model::TokenUsage {
3054 input_tokens: 40_000,
3055 output_tokens: 20_000,
3056 cache_creation_input_tokens: 0,
3057 cache_read_input_tokens: 0,
3058 },
3059 ));
3060 fake_model.end_last_completion_stream();
3061 cx.run_until_parked();
3062
3063 thread.read_with(cx, |thread, _| {
3064 assert_eq!(
3065 thread.to_markdown(),
3066 indoc! {"
3067 ## User
3068
3069 Message 1
3070
3071 ## Assistant
3072
3073 Message 1 response
3074
3075 ## User
3076
3077 Message 2
3078
3079 ## Assistant
3080
3081 Message 2 response
3082 "}
3083 );
3084
3085 assert_eq!(
3086 thread.latest_token_usage(),
3087 Some(acp_thread::TokenUsage {
3088 used_tokens: 40_000 + 20_000,
3089 max_tokens: 1_000_000,
3090 max_output_tokens: None,
3091 input_tokens: 40_000,
3092 output_tokens: 20_000,
3093 })
3094 );
3095 });
3096
3097 thread
3098 .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3099 .unwrap();
3100 cx.run_until_parked();
3101
3102 assert_first_message_state(cx);
3103}
3104
3105#[gpui::test]
3106async fn test_title_generation(cx: &mut TestAppContext) {
3107 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3108 let fake_model = model.as_fake();
3109
3110 let summary_model = Arc::new(FakeLanguageModel::default());
3111 thread.update(cx, |thread, cx| {
3112 thread.set_summarization_model(Some(summary_model.clone()), cx)
3113 });
3114
3115 let send = thread
3116 .update(cx, |thread, cx| {
3117 thread.send(UserMessageId::new(), ["Hello"], cx)
3118 })
3119 .unwrap();
3120 cx.run_until_parked();
3121
3122 fake_model.send_last_completion_stream_text_chunk("Hey!");
3123 fake_model.end_last_completion_stream();
3124 cx.run_until_parked();
3125 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), None));
3126
3127 // Ensure the summary model has been invoked to generate a title.
3128 summary_model.send_last_completion_stream_text_chunk("Hello ");
3129 summary_model.send_last_completion_stream_text_chunk("world\nG");
3130 summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3131 summary_model.end_last_completion_stream();
3132 send.collect::<Vec<_>>().await;
3133 cx.run_until_parked();
3134 thread.read_with(cx, |thread, _| {
3135 assert_eq!(thread.title(), Some("Hello world".into()))
3136 });
3137
3138 // Send another message, ensuring no title is generated this time.
3139 let send = thread
3140 .update(cx, |thread, cx| {
3141 thread.send(UserMessageId::new(), ["Hello again"], cx)
3142 })
3143 .unwrap();
3144 cx.run_until_parked();
3145 fake_model.send_last_completion_stream_text_chunk("Hey again!");
3146 fake_model.end_last_completion_stream();
3147 cx.run_until_parked();
3148 assert_eq!(summary_model.pending_completions(), Vec::new());
3149 send.collect::<Vec<_>>().await;
3150 thread.read_with(cx, |thread, _| {
3151 assert_eq!(thread.title(), Some("Hello world".into()))
3152 });
3153}
3154
3155#[gpui::test]
3156async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3157 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3158 let fake_model = model.as_fake();
3159
3160 let _events = thread
3161 .update(cx, |thread, cx| {
3162 thread.add_tool(ToolRequiringPermission);
3163 thread.add_tool(EchoTool);
3164 thread.send(UserMessageId::new(), ["Hey!"], cx)
3165 })
3166 .unwrap();
3167 cx.run_until_parked();
3168
3169 let permission_tool_use = LanguageModelToolUse {
3170 id: "tool_id_1".into(),
3171 name: ToolRequiringPermission::NAME.into(),
3172 raw_input: "{}".into(),
3173 input: json!({}),
3174 is_input_complete: true,
3175 thought_signature: None,
3176 };
3177 let echo_tool_use = LanguageModelToolUse {
3178 id: "tool_id_2".into(),
3179 name: EchoTool::NAME.into(),
3180 raw_input: json!({"text": "test"}).to_string(),
3181 input: json!({"text": "test"}),
3182 is_input_complete: true,
3183 thought_signature: None,
3184 };
3185 fake_model.send_last_completion_stream_text_chunk("Hi!");
3186 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3187 permission_tool_use,
3188 ));
3189 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3190 echo_tool_use.clone(),
3191 ));
3192 fake_model.end_last_completion_stream();
3193 cx.run_until_parked();
3194
3195 // Ensure pending tools are skipped when building a request.
3196 let request = thread
3197 .read_with(cx, |thread, cx| {
3198 thread.build_completion_request(CompletionIntent::EditFile, cx)
3199 })
3200 .unwrap();
3201 assert_eq!(
3202 request.messages[1..],
3203 vec![
3204 LanguageModelRequestMessage {
3205 role: Role::User,
3206 content: vec!["Hey!".into()],
3207 cache: true,
3208 reasoning_details: None,
3209 },
3210 LanguageModelRequestMessage {
3211 role: Role::Assistant,
3212 content: vec![
3213 MessageContent::Text("Hi!".into()),
3214 MessageContent::ToolUse(echo_tool_use.clone())
3215 ],
3216 cache: false,
3217 reasoning_details: None,
3218 },
3219 LanguageModelRequestMessage {
3220 role: Role::User,
3221 content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3222 tool_use_id: echo_tool_use.id.clone(),
3223 tool_name: echo_tool_use.name,
3224 is_error: false,
3225 content: "test".into(),
3226 output: Some("test".into())
3227 })],
3228 cache: false,
3229 reasoning_details: None,
3230 },
3231 ],
3232 );
3233}
3234
3235#[gpui::test]
3236async fn test_agent_connection(cx: &mut TestAppContext) {
3237 cx.update(settings::init);
3238 let templates = Templates::new();
3239
3240 // Initialize language model system with test provider
3241 cx.update(|cx| {
3242 gpui_tokio::init(cx);
3243
3244 let http_client = FakeHttpClient::with_404_response();
3245 let clock = Arc::new(clock::FakeSystemClock::new());
3246 let client = Client::new(clock, http_client, cx);
3247 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3248 language_model::init(user_store.clone(), client.clone(), cx);
3249 language_models::init(user_store, client.clone(), cx);
3250 LanguageModelRegistry::test(cx);
3251 });
3252 cx.executor().forbid_parking();
3253
3254 // Create a project for new_thread
3255 let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3256 fake_fs.insert_tree(path!("/test"), json!({})).await;
3257 let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3258 let cwd = PathList::new(&[Path::new("/test")]);
3259 let thread_store = cx.new(|cx| ThreadStore::new(cx));
3260
3261 // Create agent and connection
3262 let agent = cx
3263 .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3264 let connection = NativeAgentConnection(agent.clone());
3265
3266 // Create a thread using new_thread
3267 let connection_rc = Rc::new(connection.clone());
3268 let acp_thread = cx
3269 .update(|cx| connection_rc.new_session(project, cwd, cx))
3270 .await
3271 .expect("new_thread should succeed");
3272
3273 // Get the session_id from the AcpThread
3274 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3275
3276 // Test model_selector returns Some
3277 let selector_opt = connection.model_selector(&session_id);
3278 assert!(
3279 selector_opt.is_some(),
3280 "agent should always support ModelSelector"
3281 );
3282 let selector = selector_opt.unwrap();
3283
3284 // Test list_models
3285 let listed_models = cx
3286 .update(|cx| selector.list_models(cx))
3287 .await
3288 .expect("list_models should succeed");
3289 let AgentModelList::Grouped(listed_models) = listed_models else {
3290 panic!("Unexpected model list type");
3291 };
3292 assert!(!listed_models.is_empty(), "should have at least one model");
3293 assert_eq!(
3294 listed_models[&AgentModelGroupName("Fake".into())][0]
3295 .id
3296 .0
3297 .as_ref(),
3298 "fake/fake"
3299 );
3300
3301 // Test selected_model returns the default
3302 let model = cx
3303 .update(|cx| selector.selected_model(cx))
3304 .await
3305 .expect("selected_model should succeed");
3306 let model = cx
3307 .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3308 .unwrap();
3309 let model = model.as_fake();
3310 assert_eq!(model.id().0, "fake", "should return default model");
3311
3312 let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3313 cx.run_until_parked();
3314 model.send_last_completion_stream_text_chunk("def");
3315 cx.run_until_parked();
3316 acp_thread.read_with(cx, |thread, cx| {
3317 assert_eq!(
3318 thread.to_markdown(cx),
3319 indoc! {"
3320 ## User
3321
3322 abc
3323
3324 ## Assistant
3325
3326 def
3327
3328 "}
3329 )
3330 });
3331
3332 // Test cancel
3333 cx.update(|cx| connection.cancel(&session_id, cx));
3334 request.await.expect("prompt should fail gracefully");
3335
3336 // Explicitly close the session and drop the ACP thread.
3337 cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3338 .await
3339 .unwrap();
3340 drop(acp_thread);
3341 let result = cx
3342 .update(|cx| {
3343 connection.prompt(
3344 Some(acp_thread::UserMessageId::new()),
3345 acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3346 cx,
3347 )
3348 })
3349 .await;
3350 assert_eq!(
3351 result.as_ref().unwrap_err().to_string(),
3352 "Session not found",
3353 "unexpected result: {:?}",
3354 result
3355 );
3356}
3357
3358#[gpui::test]
3359async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3360 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3361 thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3362 let fake_model = model.as_fake();
3363
3364 let mut events = thread
3365 .update(cx, |thread, cx| {
3366 thread.send(UserMessageId::new(), ["Echo something"], cx)
3367 })
3368 .unwrap();
3369 cx.run_until_parked();
3370
3371 // Simulate streaming partial input.
3372 let input = json!({});
3373 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3374 LanguageModelToolUse {
3375 id: "1".into(),
3376 name: EchoTool::NAME.into(),
3377 raw_input: input.to_string(),
3378 input,
3379 is_input_complete: false,
3380 thought_signature: None,
3381 },
3382 ));
3383
3384 // Input streaming completed
3385 let input = json!({ "text": "Hello!" });
3386 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3387 LanguageModelToolUse {
3388 id: "1".into(),
3389 name: "echo".into(),
3390 raw_input: input.to_string(),
3391 input,
3392 is_input_complete: true,
3393 thought_signature: None,
3394 },
3395 ));
3396 fake_model.end_last_completion_stream();
3397 cx.run_until_parked();
3398
3399 let tool_call = expect_tool_call(&mut events).await;
3400 assert_eq!(
3401 tool_call,
3402 acp::ToolCall::new("1", "Echo")
3403 .raw_input(json!({}))
3404 .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3405 );
3406 let update = expect_tool_call_update_fields(&mut events).await;
3407 assert_eq!(
3408 update,
3409 acp::ToolCallUpdate::new(
3410 "1",
3411 acp::ToolCallUpdateFields::new()
3412 .title("Echo")
3413 .kind(acp::ToolKind::Other)
3414 .raw_input(json!({ "text": "Hello!"}))
3415 )
3416 );
3417 let update = expect_tool_call_update_fields(&mut events).await;
3418 assert_eq!(
3419 update,
3420 acp::ToolCallUpdate::new(
3421 "1",
3422 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3423 )
3424 );
3425 let update = expect_tool_call_update_fields(&mut events).await;
3426 assert_eq!(
3427 update,
3428 acp::ToolCallUpdate::new(
3429 "1",
3430 acp::ToolCallUpdateFields::new()
3431 .status(acp::ToolCallStatus::Completed)
3432 .raw_output("Hello!")
3433 )
3434 );
3435}
3436
3437#[gpui::test]
3438async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3439 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3440 thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3441 let fake_model = model.as_fake();
3442
3443 let mut events = thread
3444 .update(cx, |thread, cx| {
3445 thread.send(UserMessageId::new(), ["Make a plan"], cx)
3446 })
3447 .unwrap();
3448 cx.run_until_parked();
3449
3450 let input = json!({
3451 "plan": [
3452 {
3453 "step": "Inspect the code",
3454 "status": "completed",
3455 "priority": "high"
3456 },
3457 {
3458 "step": "Implement the tool",
3459 "status": "in_progress"
3460 },
3461 {
3462 "step": "Run tests",
3463 "status": "pending",
3464 "priority": "low"
3465 }
3466 ]
3467 });
3468 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3469 LanguageModelToolUse {
3470 id: "plan_1".into(),
3471 name: UpdatePlanTool::NAME.into(),
3472 raw_input: input.to_string(),
3473 input,
3474 is_input_complete: true,
3475 thought_signature: None,
3476 },
3477 ));
3478 fake_model.end_last_completion_stream();
3479 cx.run_until_parked();
3480
3481 let tool_call = expect_tool_call(&mut events).await;
3482 assert_eq!(
3483 tool_call,
3484 acp::ToolCall::new("plan_1", "Update plan")
3485 .kind(acp::ToolKind::Think)
3486 .raw_input(json!({
3487 "plan": [
3488 {
3489 "step": "Inspect the code",
3490 "status": "completed",
3491 "priority": "high"
3492 },
3493 {
3494 "step": "Implement the tool",
3495 "status": "in_progress"
3496 },
3497 {
3498 "step": "Run tests",
3499 "status": "pending",
3500 "priority": "low"
3501 }
3502 ]
3503 }))
3504 .meta(acp::Meta::from_iter([(
3505 "tool_name".into(),
3506 "update_plan".into()
3507 )]))
3508 );
3509
3510 let update = expect_tool_call_update_fields(&mut events).await;
3511 assert_eq!(
3512 update,
3513 acp::ToolCallUpdate::new(
3514 "plan_1",
3515 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3516 )
3517 );
3518
3519 let plan = expect_plan(&mut events).await;
3520 assert_eq!(
3521 plan,
3522 acp::Plan::new(vec![
3523 acp::PlanEntry::new(
3524 "Inspect the code",
3525 acp::PlanEntryPriority::High,
3526 acp::PlanEntryStatus::Completed,
3527 ),
3528 acp::PlanEntry::new(
3529 "Implement the tool",
3530 acp::PlanEntryPriority::Medium,
3531 acp::PlanEntryStatus::InProgress,
3532 ),
3533 acp::PlanEntry::new(
3534 "Run tests",
3535 acp::PlanEntryPriority::Low,
3536 acp::PlanEntryStatus::Pending,
3537 ),
3538 ])
3539 );
3540
3541 let update = expect_tool_call_update_fields(&mut events).await;
3542 assert_eq!(
3543 update,
3544 acp::ToolCallUpdate::new(
3545 "plan_1",
3546 acp::ToolCallUpdateFields::new()
3547 .status(acp::ToolCallStatus::Completed)
3548 .raw_output("Plan updated")
3549 )
3550 );
3551}
3552
3553#[gpui::test]
3554async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3555 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3556 let fake_model = model.as_fake();
3557
3558 let mut events = thread
3559 .update(cx, |thread, cx| {
3560 thread.send(UserMessageId::new(), ["Hello!"], cx)
3561 })
3562 .unwrap();
3563 cx.run_until_parked();
3564
3565 fake_model.send_last_completion_stream_text_chunk("Hey!");
3566 fake_model.end_last_completion_stream();
3567
3568 let mut retry_events = Vec::new();
3569 while let Some(Ok(event)) = events.next().await {
3570 match event {
3571 ThreadEvent::Retry(retry_status) => {
3572 retry_events.push(retry_status);
3573 }
3574 ThreadEvent::Stop(..) => break,
3575 _ => {}
3576 }
3577 }
3578
3579 assert_eq!(retry_events.len(), 0);
3580 thread.read_with(cx, |thread, _cx| {
3581 assert_eq!(
3582 thread.to_markdown(),
3583 indoc! {"
3584 ## User
3585
3586 Hello!
3587
3588 ## Assistant
3589
3590 Hey!
3591 "}
3592 )
3593 });
3594}
3595
3596#[gpui::test]
3597async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3598 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3599 let fake_model = model.as_fake();
3600
3601 let mut events = thread
3602 .update(cx, |thread, cx| {
3603 thread.send(UserMessageId::new(), ["Hello!"], cx)
3604 })
3605 .unwrap();
3606 cx.run_until_parked();
3607
3608 fake_model.send_last_completion_stream_text_chunk("Hey,");
3609 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3610 provider: LanguageModelProviderName::new("Anthropic"),
3611 retry_after: Some(Duration::from_secs(3)),
3612 });
3613 fake_model.end_last_completion_stream();
3614
3615 cx.executor().advance_clock(Duration::from_secs(3));
3616 cx.run_until_parked();
3617
3618 fake_model.send_last_completion_stream_text_chunk("there!");
3619 fake_model.end_last_completion_stream();
3620 cx.run_until_parked();
3621
3622 let mut retry_events = Vec::new();
3623 while let Some(Ok(event)) = events.next().await {
3624 match event {
3625 ThreadEvent::Retry(retry_status) => {
3626 retry_events.push(retry_status);
3627 }
3628 ThreadEvent::Stop(..) => break,
3629 _ => {}
3630 }
3631 }
3632
3633 assert_eq!(retry_events.len(), 1);
3634 assert!(matches!(
3635 retry_events[0],
3636 acp_thread::RetryStatus { attempt: 1, .. }
3637 ));
3638 thread.read_with(cx, |thread, _cx| {
3639 assert_eq!(
3640 thread.to_markdown(),
3641 indoc! {"
3642 ## User
3643
3644 Hello!
3645
3646 ## Assistant
3647
3648 Hey,
3649
3650 [resume]
3651
3652 ## Assistant
3653
3654 there!
3655 "}
3656 )
3657 });
3658}
3659
3660#[gpui::test]
3661async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3662 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3663 let fake_model = model.as_fake();
3664
3665 let events = thread
3666 .update(cx, |thread, cx| {
3667 thread.add_tool(EchoTool);
3668 thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3669 })
3670 .unwrap();
3671 cx.run_until_parked();
3672
3673 let tool_use_1 = LanguageModelToolUse {
3674 id: "tool_1".into(),
3675 name: EchoTool::NAME.into(),
3676 raw_input: json!({"text": "test"}).to_string(),
3677 input: json!({"text": "test"}),
3678 is_input_complete: true,
3679 thought_signature: None,
3680 };
3681 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3682 tool_use_1.clone(),
3683 ));
3684 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3685 provider: LanguageModelProviderName::new("Anthropic"),
3686 retry_after: Some(Duration::from_secs(3)),
3687 });
3688 fake_model.end_last_completion_stream();
3689
3690 cx.executor().advance_clock(Duration::from_secs(3));
3691 let completion = fake_model.pending_completions().pop().unwrap();
3692 assert_eq!(
3693 completion.messages[1..],
3694 vec![
3695 LanguageModelRequestMessage {
3696 role: Role::User,
3697 content: vec!["Call the echo tool!".into()],
3698 cache: false,
3699 reasoning_details: None,
3700 },
3701 LanguageModelRequestMessage {
3702 role: Role::Assistant,
3703 content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3704 cache: false,
3705 reasoning_details: None,
3706 },
3707 LanguageModelRequestMessage {
3708 role: Role::User,
3709 content: vec![language_model::MessageContent::ToolResult(
3710 LanguageModelToolResult {
3711 tool_use_id: tool_use_1.id.clone(),
3712 tool_name: tool_use_1.name.clone(),
3713 is_error: false,
3714 content: "test".into(),
3715 output: Some("test".into())
3716 }
3717 )],
3718 cache: true,
3719 reasoning_details: None,
3720 },
3721 ]
3722 );
3723
3724 fake_model.send_last_completion_stream_text_chunk("Done");
3725 fake_model.end_last_completion_stream();
3726 cx.run_until_parked();
3727 events.collect::<Vec<_>>().await;
3728 thread.read_with(cx, |thread, _cx| {
3729 assert_eq!(
3730 thread.last_received_or_pending_message(),
3731 Some(Message::Agent(AgentMessage {
3732 content: vec![AgentMessageContent::Text("Done".into())],
3733 tool_results: IndexMap::default(),
3734 reasoning_details: None,
3735 }))
3736 );
3737 })
3738}
3739
3740#[gpui::test]
3741async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3742 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3743 let fake_model = model.as_fake();
3744
3745 let mut events = thread
3746 .update(cx, |thread, cx| {
3747 thread.send(UserMessageId::new(), ["Hello!"], cx)
3748 })
3749 .unwrap();
3750 cx.run_until_parked();
3751
3752 for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3753 fake_model.send_last_completion_stream_error(
3754 LanguageModelCompletionError::ServerOverloaded {
3755 provider: LanguageModelProviderName::new("Anthropic"),
3756 retry_after: Some(Duration::from_secs(3)),
3757 },
3758 );
3759 fake_model.end_last_completion_stream();
3760 cx.executor().advance_clock(Duration::from_secs(3));
3761 cx.run_until_parked();
3762 }
3763
3764 let mut errors = Vec::new();
3765 let mut retry_events = Vec::new();
3766 while let Some(event) = events.next().await {
3767 match event {
3768 Ok(ThreadEvent::Retry(retry_status)) => {
3769 retry_events.push(retry_status);
3770 }
3771 Ok(ThreadEvent::Stop(..)) => break,
3772 Err(error) => errors.push(error),
3773 _ => {}
3774 }
3775 }
3776
3777 assert_eq!(
3778 retry_events.len(),
3779 crate::thread::MAX_RETRY_ATTEMPTS as usize
3780 );
3781 for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3782 assert_eq!(retry_events[i].attempt, i + 1);
3783 }
3784 assert_eq!(errors.len(), 1);
3785 let error = errors[0]
3786 .downcast_ref::<LanguageModelCompletionError>()
3787 .unwrap();
3788 assert!(matches!(
3789 error,
3790 LanguageModelCompletionError::ServerOverloaded { .. }
3791 ));
3792}
3793
3794#[gpui::test]
3795async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3796 cx: &mut TestAppContext,
3797) {
3798 init_test(cx);
3799 always_allow_tools(cx);
3800
3801 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3802 let fake_model = model.as_fake();
3803
3804 thread.update(cx, |thread, _cx| {
3805 thread.add_tool(StreamingEchoTool::new());
3806 });
3807
3808 let _events = thread
3809 .update(cx, |thread, cx| {
3810 thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3811 })
3812 .unwrap();
3813 cx.run_until_parked();
3814
3815 // Send a partial tool use (is_input_complete = false), simulating the LLM
3816 // streaming input for a tool.
3817 let tool_use = LanguageModelToolUse {
3818 id: "tool_1".into(),
3819 name: "streaming_echo".into(),
3820 raw_input: r#"{"text": "partial"}"#.into(),
3821 input: json!({"text": "partial"}),
3822 is_input_complete: false,
3823 thought_signature: None,
3824 };
3825 fake_model
3826 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3827 cx.run_until_parked();
3828
3829 // Send a stream error WITHOUT ever sending is_input_complete = true.
3830 // Before the fix, this would deadlock: the tool waits for more partials
3831 // (or cancellation), run_turn_internal waits for the tool, and the sender
3832 // keeping the channel open lives inside RunningTurn.
3833 fake_model.send_last_completion_stream_error(
3834 LanguageModelCompletionError::UpstreamProviderError {
3835 message: "Internal server error".to_string(),
3836 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3837 retry_after: None,
3838 },
3839 );
3840 fake_model.end_last_completion_stream();
3841
3842 // Advance past the retry delay so run_turn_internal retries.
3843 cx.executor().advance_clock(Duration::from_secs(5));
3844 cx.run_until_parked();
3845
3846 // The retry request should contain the streaming tool's error result,
3847 // proving the tool terminated and its result was forwarded.
3848 let completion = fake_model
3849 .pending_completions()
3850 .pop()
3851 .expect("No running turn");
3852 assert_eq!(
3853 completion.messages[1..],
3854 vec![
3855 LanguageModelRequestMessage {
3856 role: Role::User,
3857 content: vec!["Use the streaming_echo tool".into()],
3858 cache: false,
3859 reasoning_details: None,
3860 },
3861 LanguageModelRequestMessage {
3862 role: Role::Assistant,
3863 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3864 cache: false,
3865 reasoning_details: None,
3866 },
3867 LanguageModelRequestMessage {
3868 role: Role::User,
3869 content: vec![language_model::MessageContent::ToolResult(
3870 LanguageModelToolResult {
3871 tool_use_id: tool_use.id.clone(),
3872 tool_name: tool_use.name,
3873 is_error: true,
3874 content: "Failed to receive tool input: tool input was not fully received"
3875 .into(),
3876 output: Some(
3877 "Failed to receive tool input: tool input was not fully received"
3878 .into()
3879 ),
3880 }
3881 )],
3882 cache: true,
3883 reasoning_details: None,
3884 },
3885 ]
3886 );
3887
3888 // Finish the retry round so the turn completes cleanly.
3889 fake_model.send_last_completion_stream_text_chunk("Done");
3890 fake_model.end_last_completion_stream();
3891 cx.run_until_parked();
3892
3893 thread.read_with(cx, |thread, _cx| {
3894 assert!(
3895 thread.is_turn_complete(),
3896 "Thread should not be stuck; the turn should have completed",
3897 );
3898 });
3899}
3900
3901/// Filters out the stop events for asserting against in tests
3902fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3903 result_events
3904 .into_iter()
3905 .filter_map(|event| match event.unwrap() {
3906 ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3907 _ => None,
3908 })
3909 .collect()
3910}
3911
3912struct ThreadTest {
3913 model: Arc<dyn LanguageModel>,
3914 thread: Entity<Thread>,
3915 project_context: Entity<ProjectContext>,
3916 context_server_store: Entity<ContextServerStore>,
3917 fs: Arc<FakeFs>,
3918}
3919
3920enum TestModel {
3921 Sonnet4,
3922 Fake,
3923}
3924
3925impl TestModel {
3926 fn id(&self) -> LanguageModelId {
3927 match self {
3928 TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3929 TestModel::Fake => unreachable!(),
3930 }
3931 }
3932}
3933
3934async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3935 cx.executor().allow_parking();
3936
3937 let fs = FakeFs::new(cx.background_executor.clone());
3938 fs.create_dir(paths::settings_file().parent().unwrap())
3939 .await
3940 .unwrap();
3941 fs.insert_file(
3942 paths::settings_file(),
3943 json!({
3944 "agent": {
3945 "default_profile": "test-profile",
3946 "profiles": {
3947 "test-profile": {
3948 "name": "Test Profile",
3949 "tools": {
3950 EchoTool::NAME: true,
3951 DelayTool::NAME: true,
3952 WordListTool::NAME: true,
3953 ToolRequiringPermission::NAME: true,
3954 InfiniteTool::NAME: true,
3955 CancellationAwareTool::NAME: true,
3956 StreamingEchoTool::NAME: true,
3957 StreamingFailingEchoTool::NAME: true,
3958 TerminalTool::NAME: true,
3959 UpdatePlanTool::NAME: true,
3960 }
3961 }
3962 }
3963 }
3964 })
3965 .to_string()
3966 .into_bytes(),
3967 )
3968 .await;
3969
3970 cx.update(|cx| {
3971 settings::init(cx);
3972
3973 match model {
3974 TestModel::Fake => {}
3975 TestModel::Sonnet4 => {
3976 gpui_tokio::init(cx);
3977 let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3978 cx.set_http_client(Arc::new(http_client));
3979 let client = Client::production(cx);
3980 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3981 language_model::init(user_store.clone(), client.clone(), cx);
3982 language_models::init(user_store, client.clone(), cx);
3983 }
3984 };
3985
3986 watch_settings(fs.clone(), cx);
3987 });
3988
3989 let templates = Templates::new();
3990
3991 fs.insert_tree(path!("/test"), json!({})).await;
3992 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3993
3994 let model = cx
3995 .update(|cx| {
3996 if let TestModel::Fake = model {
3997 Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3998 } else {
3999 let model_id = model.id();
4000 let models = LanguageModelRegistry::read_global(cx);
4001 let model = models
4002 .available_models(cx)
4003 .find(|model| model.id() == model_id)
4004 .unwrap();
4005
4006 let provider = models.provider(&model.provider_id()).unwrap();
4007 let authenticated = provider.authenticate(cx);
4008
4009 cx.spawn(async move |_cx| {
4010 authenticated.await.unwrap();
4011 model
4012 })
4013 }
4014 })
4015 .await;
4016
4017 let project_context = cx.new(|_cx| ProjectContext::default());
4018 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4019 let context_server_registry =
4020 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4021 let thread = cx.new(|cx| {
4022 Thread::new(
4023 project,
4024 project_context.clone(),
4025 context_server_registry,
4026 templates,
4027 Some(model.clone()),
4028 cx,
4029 )
4030 });
4031 ThreadTest {
4032 model,
4033 thread,
4034 project_context,
4035 context_server_store,
4036 fs,
4037 }
4038}
4039
4040#[cfg(test)]
4041#[ctor::ctor]
4042fn init_logger() {
4043 if std::env::var("RUST_LOG").is_ok() {
4044 env_logger::init();
4045 }
4046}
4047
4048fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4049 let fs = fs.clone();
4050 cx.spawn({
4051 async move |cx| {
4052 let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4053 cx.background_executor(),
4054 fs,
4055 paths::settings_file().clone(),
4056 );
4057 let _watcher_task = watcher_task;
4058
4059 while let Some(new_settings_content) = new_settings_content_rx.next().await {
4060 cx.update(|cx| {
4061 SettingsStore::update_global(cx, |settings, cx| {
4062 settings.set_user_settings(&new_settings_content, cx)
4063 })
4064 })
4065 .ok();
4066 }
4067 }
4068 })
4069 .detach();
4070}
4071
4072fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4073 completion
4074 .tools
4075 .iter()
4076 .map(|tool| tool.name.clone())
4077 .collect()
4078}
4079
4080fn setup_context_server(
4081 name: &'static str,
4082 tools: Vec<context_server::types::Tool>,
4083 context_server_store: &Entity<ContextServerStore>,
4084 cx: &mut TestAppContext,
4085) -> mpsc::UnboundedReceiver<(
4086 context_server::types::CallToolParams,
4087 oneshot::Sender<context_server::types::CallToolResponse>,
4088)> {
4089 cx.update(|cx| {
4090 let mut settings = ProjectSettings::get_global(cx).clone();
4091 settings.context_servers.insert(
4092 name.into(),
4093 project::project_settings::ContextServerSettings::Stdio {
4094 enabled: true,
4095 remote: false,
4096 command: ContextServerCommand {
4097 path: "somebinary".into(),
4098 args: Vec::new(),
4099 env: None,
4100 timeout: None,
4101 },
4102 },
4103 );
4104 ProjectSettings::override_global(settings, cx);
4105 });
4106
4107 let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4108 let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4109 .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4110 context_server::types::InitializeResponse {
4111 protocol_version: context_server::types::ProtocolVersion(
4112 context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4113 ),
4114 server_info: context_server::types::Implementation {
4115 name: name.into(),
4116 version: "1.0.0".to_string(),
4117 },
4118 capabilities: context_server::types::ServerCapabilities {
4119 tools: Some(context_server::types::ToolsCapabilities {
4120 list_changed: Some(true),
4121 }),
4122 ..Default::default()
4123 },
4124 meta: None,
4125 }
4126 })
4127 .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4128 let tools = tools.clone();
4129 async move {
4130 context_server::types::ListToolsResponse {
4131 tools,
4132 next_cursor: None,
4133 meta: None,
4134 }
4135 }
4136 })
4137 .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4138 let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4139 async move {
4140 let (response_tx, response_rx) = oneshot::channel();
4141 mcp_tool_calls_tx
4142 .unbounded_send((params, response_tx))
4143 .unwrap();
4144 response_rx.await.unwrap()
4145 }
4146 });
4147 context_server_store.update(cx, |store, cx| {
4148 store.start_server(
4149 Arc::new(ContextServer::new(
4150 ContextServerId(name.into()),
4151 Arc::new(fake_transport),
4152 )),
4153 cx,
4154 );
4155 });
4156 cx.run_until_parked();
4157 mcp_tool_calls_rx
4158}
4159
4160#[gpui::test]
4161async fn test_tokens_before_message(cx: &mut TestAppContext) {
4162 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4163 let fake_model = model.as_fake();
4164
4165 // First message
4166 let message_1_id = UserMessageId::new();
4167 thread
4168 .update(cx, |thread, cx| {
4169 thread.send(message_1_id.clone(), ["First message"], cx)
4170 })
4171 .unwrap();
4172 cx.run_until_parked();
4173
4174 // Before any response, tokens_before_message should return None for first message
4175 thread.read_with(cx, |thread, _| {
4176 assert_eq!(
4177 thread.tokens_before_message(&message_1_id),
4178 None,
4179 "First message should have no tokens before it"
4180 );
4181 });
4182
4183 // Complete first message with usage
4184 fake_model.send_last_completion_stream_text_chunk("Response 1");
4185 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4186 language_model::TokenUsage {
4187 input_tokens: 100,
4188 output_tokens: 50,
4189 cache_creation_input_tokens: 0,
4190 cache_read_input_tokens: 0,
4191 },
4192 ));
4193 fake_model.end_last_completion_stream();
4194 cx.run_until_parked();
4195
4196 // First message still has no tokens before it
4197 thread.read_with(cx, |thread, _| {
4198 assert_eq!(
4199 thread.tokens_before_message(&message_1_id),
4200 None,
4201 "First message should still have no tokens before it after response"
4202 );
4203 });
4204
4205 // Second message
4206 let message_2_id = UserMessageId::new();
4207 thread
4208 .update(cx, |thread, cx| {
4209 thread.send(message_2_id.clone(), ["Second message"], cx)
4210 })
4211 .unwrap();
4212 cx.run_until_parked();
4213
4214 // Second message should have first message's input tokens before it
4215 thread.read_with(cx, |thread, _| {
4216 assert_eq!(
4217 thread.tokens_before_message(&message_2_id),
4218 Some(100),
4219 "Second message should have 100 tokens before it (from first request)"
4220 );
4221 });
4222
4223 // Complete second message
4224 fake_model.send_last_completion_stream_text_chunk("Response 2");
4225 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4226 language_model::TokenUsage {
4227 input_tokens: 250, // Total for this request (includes previous context)
4228 output_tokens: 75,
4229 cache_creation_input_tokens: 0,
4230 cache_read_input_tokens: 0,
4231 },
4232 ));
4233 fake_model.end_last_completion_stream();
4234 cx.run_until_parked();
4235
4236 // Third message
4237 let message_3_id = UserMessageId::new();
4238 thread
4239 .update(cx, |thread, cx| {
4240 thread.send(message_3_id.clone(), ["Third message"], cx)
4241 })
4242 .unwrap();
4243 cx.run_until_parked();
4244
4245 // Third message should have second message's input tokens (250) before it
4246 thread.read_with(cx, |thread, _| {
4247 assert_eq!(
4248 thread.tokens_before_message(&message_3_id),
4249 Some(250),
4250 "Third message should have 250 tokens before it (from second request)"
4251 );
4252 // Second message should still have 100
4253 assert_eq!(
4254 thread.tokens_before_message(&message_2_id),
4255 Some(100),
4256 "Second message should still have 100 tokens before it"
4257 );
4258 // First message still has none
4259 assert_eq!(
4260 thread.tokens_before_message(&message_1_id),
4261 None,
4262 "First message should still have no tokens before it"
4263 );
4264 });
4265}
4266
4267#[gpui::test]
4268async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4269 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4270 let fake_model = model.as_fake();
4271
4272 // Set up three messages with responses
4273 let message_1_id = UserMessageId::new();
4274 thread
4275 .update(cx, |thread, cx| {
4276 thread.send(message_1_id.clone(), ["Message 1"], cx)
4277 })
4278 .unwrap();
4279 cx.run_until_parked();
4280 fake_model.send_last_completion_stream_text_chunk("Response 1");
4281 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4282 language_model::TokenUsage {
4283 input_tokens: 100,
4284 output_tokens: 50,
4285 cache_creation_input_tokens: 0,
4286 cache_read_input_tokens: 0,
4287 },
4288 ));
4289 fake_model.end_last_completion_stream();
4290 cx.run_until_parked();
4291
4292 let message_2_id = UserMessageId::new();
4293 thread
4294 .update(cx, |thread, cx| {
4295 thread.send(message_2_id.clone(), ["Message 2"], cx)
4296 })
4297 .unwrap();
4298 cx.run_until_parked();
4299 fake_model.send_last_completion_stream_text_chunk("Response 2");
4300 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4301 language_model::TokenUsage {
4302 input_tokens: 250,
4303 output_tokens: 75,
4304 cache_creation_input_tokens: 0,
4305 cache_read_input_tokens: 0,
4306 },
4307 ));
4308 fake_model.end_last_completion_stream();
4309 cx.run_until_parked();
4310
4311 // Verify initial state
4312 thread.read_with(cx, |thread, _| {
4313 assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4314 });
4315
4316 // Truncate at message 2 (removes message 2 and everything after)
4317 thread
4318 .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4319 .unwrap();
4320 cx.run_until_parked();
4321
4322 // After truncation, message_2_id no longer exists, so lookup should return None
4323 thread.read_with(cx, |thread, _| {
4324 assert_eq!(
4325 thread.tokens_before_message(&message_2_id),
4326 None,
4327 "After truncation, message 2 no longer exists"
4328 );
4329 // Message 1 still exists but has no tokens before it
4330 assert_eq!(
4331 thread.tokens_before_message(&message_1_id),
4332 None,
4333 "First message still has no tokens before it"
4334 );
4335 });
4336}
4337
4338#[gpui::test]
4339async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4340 init_test(cx);
4341
4342 let fs = FakeFs::new(cx.executor());
4343 fs.insert_tree("/root", json!({})).await;
4344 let project = Project::test(fs, ["/root".as_ref()], cx).await;
4345
4346 // Test 1: Deny rule blocks command
4347 {
4348 let environment = Rc::new(cx.update(|cx| {
4349 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4350 }));
4351
4352 cx.update(|cx| {
4353 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4354 settings.tool_permissions.tools.insert(
4355 TerminalTool::NAME.into(),
4356 agent_settings::ToolRules {
4357 default: Some(settings::ToolPermissionMode::Confirm),
4358 always_allow: vec![],
4359 always_deny: vec![
4360 agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4361 ],
4362 always_confirm: vec![],
4363 invalid_patterns: vec![],
4364 },
4365 );
4366 agent_settings::AgentSettings::override_global(settings, cx);
4367 });
4368
4369 #[allow(clippy::arc_with_non_send_sync)]
4370 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4371 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4372
4373 let task = cx.update(|cx| {
4374 tool.run(
4375 ToolInput::resolved(crate::TerminalToolInput {
4376 command: "rm -rf /".to_string(),
4377 cd: ".".to_string(),
4378 timeout_ms: None,
4379 }),
4380 event_stream,
4381 cx,
4382 )
4383 });
4384
4385 let result = task.await;
4386 assert!(
4387 result.is_err(),
4388 "expected command to be blocked by deny rule"
4389 );
4390 let err_msg = result.unwrap_err().to_lowercase();
4391 assert!(
4392 err_msg.contains("blocked"),
4393 "error should mention the command was blocked"
4394 );
4395 }
4396
4397 // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4398 {
4399 let environment = Rc::new(cx.update(|cx| {
4400 FakeThreadEnvironment::default()
4401 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4402 }));
4403
4404 cx.update(|cx| {
4405 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4406 settings.tool_permissions.tools.insert(
4407 TerminalTool::NAME.into(),
4408 agent_settings::ToolRules {
4409 default: Some(settings::ToolPermissionMode::Deny),
4410 always_allow: vec![
4411 agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4412 ],
4413 always_deny: vec![],
4414 always_confirm: vec![],
4415 invalid_patterns: vec![],
4416 },
4417 );
4418 agent_settings::AgentSettings::override_global(settings, cx);
4419 });
4420
4421 #[allow(clippy::arc_with_non_send_sync)]
4422 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4423 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4424
4425 let task = cx.update(|cx| {
4426 tool.run(
4427 ToolInput::resolved(crate::TerminalToolInput {
4428 command: "echo hello".to_string(),
4429 cd: ".".to_string(),
4430 timeout_ms: None,
4431 }),
4432 event_stream,
4433 cx,
4434 )
4435 });
4436
4437 let update = rx.expect_update_fields().await;
4438 assert!(
4439 update.content.iter().any(|blocks| {
4440 blocks
4441 .iter()
4442 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4443 }),
4444 "expected terminal content (allow rule should skip confirmation and override default deny)"
4445 );
4446
4447 let result = task.await;
4448 assert!(
4449 result.is_ok(),
4450 "expected command to succeed without confirmation"
4451 );
4452 }
4453
4454 // Test 3: global default: allow does NOT override always_confirm patterns
4455 {
4456 let environment = Rc::new(cx.update(|cx| {
4457 FakeThreadEnvironment::default()
4458 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4459 }));
4460
4461 cx.update(|cx| {
4462 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4463 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4464 settings.tool_permissions.tools.insert(
4465 TerminalTool::NAME.into(),
4466 agent_settings::ToolRules {
4467 default: Some(settings::ToolPermissionMode::Allow),
4468 always_allow: vec![],
4469 always_deny: vec![],
4470 always_confirm: vec![
4471 agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4472 ],
4473 invalid_patterns: vec![],
4474 },
4475 );
4476 agent_settings::AgentSettings::override_global(settings, cx);
4477 });
4478
4479 #[allow(clippy::arc_with_non_send_sync)]
4480 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4481 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4482
4483 let _task = cx.update(|cx| {
4484 tool.run(
4485 ToolInput::resolved(crate::TerminalToolInput {
4486 command: "sudo rm file".to_string(),
4487 cd: ".".to_string(),
4488 timeout_ms: None,
4489 }),
4490 event_stream,
4491 cx,
4492 )
4493 });
4494
4495 // With global default: allow, confirm patterns are still respected
4496 // The expect_authorization() call will panic if no authorization is requested,
4497 // which validates that the confirm pattern still triggers confirmation
4498 let _auth = rx.expect_authorization().await;
4499
4500 drop(_task);
4501 }
4502
4503 // Test 4: tool-specific default: deny is respected even with global default: allow
4504 {
4505 let environment = Rc::new(cx.update(|cx| {
4506 FakeThreadEnvironment::default()
4507 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4508 }));
4509
4510 cx.update(|cx| {
4511 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4512 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4513 settings.tool_permissions.tools.insert(
4514 TerminalTool::NAME.into(),
4515 agent_settings::ToolRules {
4516 default: Some(settings::ToolPermissionMode::Deny),
4517 always_allow: vec![],
4518 always_deny: vec![],
4519 always_confirm: vec![],
4520 invalid_patterns: vec![],
4521 },
4522 );
4523 agent_settings::AgentSettings::override_global(settings, cx);
4524 });
4525
4526 #[allow(clippy::arc_with_non_send_sync)]
4527 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4528 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4529
4530 let task = cx.update(|cx| {
4531 tool.run(
4532 ToolInput::resolved(crate::TerminalToolInput {
4533 command: "echo hello".to_string(),
4534 cd: ".".to_string(),
4535 timeout_ms: None,
4536 }),
4537 event_stream,
4538 cx,
4539 )
4540 });
4541
4542 // tool-specific default: deny is respected even with global default: allow
4543 let result = task.await;
4544 assert!(
4545 result.is_err(),
4546 "expected command to be blocked by tool-specific deny default"
4547 );
4548 let err_msg = result.unwrap_err().to_lowercase();
4549 assert!(
4550 err_msg.contains("disabled"),
4551 "error should mention the tool is disabled, got: {err_msg}"
4552 );
4553 }
4554}
4555
4556#[gpui::test]
4557async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4558 init_test(cx);
4559 cx.update(|cx| {
4560 LanguageModelRegistry::test(cx);
4561 });
4562 cx.update(|cx| {
4563 cx.update_flags(true, vec!["subagents".to_string()]);
4564 });
4565
4566 let fs = FakeFs::new(cx.executor());
4567 fs.insert_tree(
4568 "/",
4569 json!({
4570 "a": {
4571 "b.md": "Lorem"
4572 }
4573 }),
4574 )
4575 .await;
4576 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4577 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4578 let agent = cx.update(|cx| {
4579 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4580 });
4581 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4582
4583 let acp_thread = cx
4584 .update(|cx| {
4585 connection
4586 .clone()
4587 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4588 })
4589 .await
4590 .unwrap();
4591 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4592 let thread = agent.read_with(cx, |agent, _| {
4593 agent.sessions.get(&session_id).unwrap().thread.clone()
4594 });
4595 let model = Arc::new(FakeLanguageModel::default());
4596
4597 // Ensure empty threads are not saved, even if they get mutated.
4598 thread.update(cx, |thread, cx| {
4599 thread.set_model(model.clone(), cx);
4600 });
4601 cx.run_until_parked();
4602
4603 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4604 cx.run_until_parked();
4605 model.send_last_completion_stream_text_chunk("spawning subagent");
4606 let subagent_tool_input = SpawnAgentToolInput {
4607 label: "label".to_string(),
4608 message: "subagent task prompt".to_string(),
4609 session_id: None,
4610 };
4611 let subagent_tool_use = LanguageModelToolUse {
4612 id: "subagent_1".into(),
4613 name: SpawnAgentTool::NAME.into(),
4614 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4615 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4616 is_input_complete: true,
4617 thought_signature: None,
4618 };
4619 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4620 subagent_tool_use,
4621 ));
4622 model.end_last_completion_stream();
4623
4624 cx.run_until_parked();
4625
4626 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4627 thread
4628 .running_subagent_ids(cx)
4629 .get(0)
4630 .expect("subagent thread should be running")
4631 .clone()
4632 });
4633
4634 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4635 agent
4636 .sessions
4637 .get(&subagent_session_id)
4638 .expect("subagent session should exist")
4639 .acp_thread
4640 .clone()
4641 });
4642
4643 model.send_last_completion_stream_text_chunk("subagent task response");
4644 model.end_last_completion_stream();
4645
4646 cx.run_until_parked();
4647
4648 assert_eq!(
4649 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4650 indoc! {"
4651 ## User
4652
4653 subagent task prompt
4654
4655 ## Assistant
4656
4657 subagent task response
4658
4659 "}
4660 );
4661
4662 model.send_last_completion_stream_text_chunk("Response");
4663 model.end_last_completion_stream();
4664
4665 send.await.unwrap();
4666
4667 assert_eq!(
4668 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4669 indoc! {r#"
4670 ## User
4671
4672 Prompt
4673
4674 ## Assistant
4675
4676 spawning subagent
4677
4678 **Tool Call: label**
4679 Status: Completed
4680
4681 subagent task response
4682
4683 ## Assistant
4684
4685 Response
4686
4687 "#},
4688 );
4689}
4690
4691#[gpui::test]
4692async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4693 init_test(cx);
4694 cx.update(|cx| {
4695 LanguageModelRegistry::test(cx);
4696 });
4697 cx.update(|cx| {
4698 cx.update_flags(true, vec!["subagents".to_string()]);
4699 });
4700
4701 let fs = FakeFs::new(cx.executor());
4702 fs.insert_tree(
4703 "/",
4704 json!({
4705 "a": {
4706 "b.md": "Lorem"
4707 }
4708 }),
4709 )
4710 .await;
4711 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4712 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4713 let agent = cx.update(|cx| {
4714 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4715 });
4716 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4717
4718 let acp_thread = cx
4719 .update(|cx| {
4720 connection
4721 .clone()
4722 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4723 })
4724 .await
4725 .unwrap();
4726 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4727 let thread = agent.read_with(cx, |agent, _| {
4728 agent.sessions.get(&session_id).unwrap().thread.clone()
4729 });
4730 let model = Arc::new(FakeLanguageModel::default());
4731
4732 // Ensure empty threads are not saved, even if they get mutated.
4733 thread.update(cx, |thread, cx| {
4734 thread.set_model(model.clone(), cx);
4735 });
4736 cx.run_until_parked();
4737
4738 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4739 cx.run_until_parked();
4740 model.send_last_completion_stream_text_chunk("spawning subagent");
4741 let subagent_tool_input = SpawnAgentToolInput {
4742 label: "label".to_string(),
4743 message: "subagent task prompt".to_string(),
4744 session_id: None,
4745 };
4746 let subagent_tool_use = LanguageModelToolUse {
4747 id: "subagent_1".into(),
4748 name: SpawnAgentTool::NAME.into(),
4749 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4750 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4751 is_input_complete: true,
4752 thought_signature: None,
4753 };
4754 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4755 subagent_tool_use,
4756 ));
4757 model.end_last_completion_stream();
4758
4759 cx.run_until_parked();
4760
4761 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4762 thread
4763 .running_subagent_ids(cx)
4764 .get(0)
4765 .expect("subagent thread should be running")
4766 .clone()
4767 });
4768
4769 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4770 agent
4771 .sessions
4772 .get(&subagent_session_id)
4773 .expect("subagent session should exist")
4774 .acp_thread
4775 .clone()
4776 });
4777
4778 model.send_last_completion_stream_text_chunk("subagent task response 1");
4779 model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4780 text: "thinking more about the subagent task".into(),
4781 signature: None,
4782 });
4783 model.send_last_completion_stream_text_chunk("subagent task response 2");
4784 model.end_last_completion_stream();
4785
4786 cx.run_until_parked();
4787
4788 assert_eq!(
4789 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4790 indoc! {"
4791 ## User
4792
4793 subagent task prompt
4794
4795 ## Assistant
4796
4797 subagent task response 1
4798
4799 <thinking>
4800 thinking more about the subagent task
4801 </thinking>
4802
4803 subagent task response 2
4804
4805 "}
4806 );
4807
4808 model.send_last_completion_stream_text_chunk("Response");
4809 model.end_last_completion_stream();
4810
4811 send.await.unwrap();
4812
4813 assert_eq!(
4814 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4815 indoc! {r#"
4816 ## User
4817
4818 Prompt
4819
4820 ## Assistant
4821
4822 spawning subagent
4823
4824 **Tool Call: label**
4825 Status: Completed
4826
4827 subagent task response 1
4828
4829 subagent task response 2
4830
4831 ## Assistant
4832
4833 Response
4834
4835 "#},
4836 );
4837}
4838
4839#[gpui::test]
4840async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4841 init_test(cx);
4842 cx.update(|cx| {
4843 LanguageModelRegistry::test(cx);
4844 });
4845 cx.update(|cx| {
4846 cx.update_flags(true, vec!["subagents".to_string()]);
4847 });
4848
4849 let fs = FakeFs::new(cx.executor());
4850 fs.insert_tree(
4851 "/",
4852 json!({
4853 "a": {
4854 "b.md": "Lorem"
4855 }
4856 }),
4857 )
4858 .await;
4859 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4860 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4861 let agent = cx.update(|cx| {
4862 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4863 });
4864 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4865
4866 let acp_thread = cx
4867 .update(|cx| {
4868 connection
4869 .clone()
4870 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4871 })
4872 .await
4873 .unwrap();
4874 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4875 let thread = agent.read_with(cx, |agent, _| {
4876 agent.sessions.get(&session_id).unwrap().thread.clone()
4877 });
4878 let model = Arc::new(FakeLanguageModel::default());
4879
4880 // Ensure empty threads are not saved, even if they get mutated.
4881 thread.update(cx, |thread, cx| {
4882 thread.set_model(model.clone(), cx);
4883 });
4884 cx.run_until_parked();
4885
4886 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4887 cx.run_until_parked();
4888 model.send_last_completion_stream_text_chunk("spawning subagent");
4889 let subagent_tool_input = SpawnAgentToolInput {
4890 label: "label".to_string(),
4891 message: "subagent task prompt".to_string(),
4892 session_id: None,
4893 };
4894 let subagent_tool_use = LanguageModelToolUse {
4895 id: "subagent_1".into(),
4896 name: SpawnAgentTool::NAME.into(),
4897 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4898 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4899 is_input_complete: true,
4900 thought_signature: None,
4901 };
4902 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4903 subagent_tool_use,
4904 ));
4905 model.end_last_completion_stream();
4906
4907 cx.run_until_parked();
4908
4909 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4910 thread
4911 .running_subagent_ids(cx)
4912 .get(0)
4913 .expect("subagent thread should be running")
4914 .clone()
4915 });
4916 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4917 agent
4918 .sessions
4919 .get(&subagent_session_id)
4920 .expect("subagent session should exist")
4921 .acp_thread
4922 .clone()
4923 });
4924
4925 // model.send_last_completion_stream_text_chunk("subagent task response");
4926 // model.end_last_completion_stream();
4927
4928 // cx.run_until_parked();
4929
4930 acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4931
4932 cx.run_until_parked();
4933
4934 send.await.unwrap();
4935
4936 acp_thread.read_with(cx, |thread, cx| {
4937 assert_eq!(thread.status(), ThreadStatus::Idle);
4938 assert_eq!(
4939 thread.to_markdown(cx),
4940 indoc! {"
4941 ## User
4942
4943 Prompt
4944
4945 ## Assistant
4946
4947 spawning subagent
4948
4949 **Tool Call: label**
4950 Status: Canceled
4951
4952 "}
4953 );
4954 });
4955 subagent_acp_thread.read_with(cx, |thread, cx| {
4956 assert_eq!(thread.status(), ThreadStatus::Idle);
4957 assert_eq!(
4958 thread.to_markdown(cx),
4959 indoc! {"
4960 ## User
4961
4962 subagent task prompt
4963
4964 "}
4965 );
4966 });
4967}
4968
4969#[gpui::test]
4970async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4971 init_test(cx);
4972 cx.update(|cx| {
4973 LanguageModelRegistry::test(cx);
4974 });
4975 cx.update(|cx| {
4976 cx.update_flags(true, vec!["subagents".to_string()]);
4977 });
4978
4979 let fs = FakeFs::new(cx.executor());
4980 fs.insert_tree(
4981 "/",
4982 json!({
4983 "a": {
4984 "b.md": "Lorem"
4985 }
4986 }),
4987 )
4988 .await;
4989 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4990 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4991 let agent = cx.update(|cx| {
4992 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4993 });
4994 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4995
4996 let acp_thread = cx
4997 .update(|cx| {
4998 connection
4999 .clone()
5000 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5001 })
5002 .await
5003 .unwrap();
5004 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5005 let thread = agent.read_with(cx, |agent, _| {
5006 agent.sessions.get(&session_id).unwrap().thread.clone()
5007 });
5008 let model = Arc::new(FakeLanguageModel::default());
5009
5010 thread.update(cx, |thread, cx| {
5011 thread.set_model(model.clone(), cx);
5012 });
5013 cx.run_until_parked();
5014
5015 // === First turn: create subagent ===
5016 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5017 cx.run_until_parked();
5018 model.send_last_completion_stream_text_chunk("spawning subagent");
5019 let subagent_tool_input = SpawnAgentToolInput {
5020 label: "initial task".to_string(),
5021 message: "do the first task".to_string(),
5022 session_id: None,
5023 };
5024 let subagent_tool_use = LanguageModelToolUse {
5025 id: "subagent_1".into(),
5026 name: SpawnAgentTool::NAME.into(),
5027 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5028 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5029 is_input_complete: true,
5030 thought_signature: None,
5031 };
5032 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5033 subagent_tool_use,
5034 ));
5035 model.end_last_completion_stream();
5036
5037 cx.run_until_parked();
5038
5039 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5040 thread
5041 .running_subagent_ids(cx)
5042 .get(0)
5043 .expect("subagent thread should be running")
5044 .clone()
5045 });
5046
5047 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5048 agent
5049 .sessions
5050 .get(&subagent_session_id)
5051 .expect("subagent session should exist")
5052 .acp_thread
5053 .clone()
5054 });
5055
5056 // Subagent responds
5057 model.send_last_completion_stream_text_chunk("first task response");
5058 model.end_last_completion_stream();
5059
5060 cx.run_until_parked();
5061
5062 // Parent model responds to complete first turn
5063 model.send_last_completion_stream_text_chunk("First response");
5064 model.end_last_completion_stream();
5065
5066 send.await.unwrap();
5067
5068 // Verify subagent is no longer running
5069 thread.read_with(cx, |thread, cx| {
5070 assert!(
5071 thread.running_subagent_ids(cx).is_empty(),
5072 "subagent should not be running after completion"
5073 );
5074 });
5075
5076 // === Second turn: resume subagent with session_id ===
5077 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5078 cx.run_until_parked();
5079 model.send_last_completion_stream_text_chunk("resuming subagent");
5080 let resume_tool_input = SpawnAgentToolInput {
5081 label: "follow-up task".to_string(),
5082 message: "do the follow-up task".to_string(),
5083 session_id: Some(subagent_session_id.clone()),
5084 };
5085 let resume_tool_use = LanguageModelToolUse {
5086 id: "subagent_2".into(),
5087 name: SpawnAgentTool::NAME.into(),
5088 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5089 input: serde_json::to_value(&resume_tool_input).unwrap(),
5090 is_input_complete: true,
5091 thought_signature: None,
5092 };
5093 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5094 model.end_last_completion_stream();
5095
5096 cx.run_until_parked();
5097
5098 // Subagent should be running again with the same session
5099 thread.read_with(cx, |thread, cx| {
5100 let running = thread.running_subagent_ids(cx);
5101 assert_eq!(running.len(), 1, "subagent should be running");
5102 assert_eq!(running[0], subagent_session_id, "should be same session");
5103 });
5104
5105 // Subagent responds to follow-up
5106 model.send_last_completion_stream_text_chunk("follow-up task response");
5107 model.end_last_completion_stream();
5108
5109 cx.run_until_parked();
5110
5111 // Parent model responds to complete second turn
5112 model.send_last_completion_stream_text_chunk("Second response");
5113 model.end_last_completion_stream();
5114
5115 send2.await.unwrap();
5116
5117 // Verify subagent is no longer running
5118 thread.read_with(cx, |thread, cx| {
5119 assert!(
5120 thread.running_subagent_ids(cx).is_empty(),
5121 "subagent should not be running after resume completion"
5122 );
5123 });
5124
5125 // Verify the subagent's acp thread has both conversation turns
5126 assert_eq!(
5127 subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5128 indoc! {"
5129 ## User
5130
5131 do the first task
5132
5133 ## Assistant
5134
5135 first task response
5136
5137 ## User
5138
5139 do the follow-up task
5140
5141 ## Assistant
5142
5143 follow-up task response
5144
5145 "}
5146 );
5147}
5148
5149#[gpui::test]
5150async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5151 init_test(cx);
5152
5153 cx.update(|cx| {
5154 cx.update_flags(true, vec!["subagents".to_string()]);
5155 });
5156
5157 let fs = FakeFs::new(cx.executor());
5158 fs.insert_tree(path!("/test"), json!({})).await;
5159 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5160 let project_context = cx.new(|_cx| ProjectContext::default());
5161 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5162 let context_server_registry =
5163 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5164 let model = Arc::new(FakeLanguageModel::default());
5165
5166 let parent_thread = cx.new(|cx| {
5167 Thread::new(
5168 project.clone(),
5169 project_context,
5170 context_server_registry,
5171 Templates::new(),
5172 Some(model.clone()),
5173 cx,
5174 )
5175 });
5176
5177 let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5178 subagent_thread.read_with(cx, |subagent_thread, cx| {
5179 assert!(subagent_thread.is_subagent());
5180 assert_eq!(subagent_thread.depth(), 1);
5181 assert_eq!(
5182 subagent_thread.model().map(|model| model.id()),
5183 Some(model.id())
5184 );
5185 assert_eq!(
5186 subagent_thread.parent_thread_id(),
5187 Some(parent_thread.read(cx).id().clone())
5188 );
5189 });
5190}
5191
5192#[gpui::test]
5193async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5194 init_test(cx);
5195
5196 cx.update(|cx| {
5197 cx.update_flags(true, vec!["subagents".to_string()]);
5198 });
5199
5200 let fs = FakeFs::new(cx.executor());
5201 fs.insert_tree(path!("/test"), json!({})).await;
5202 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5203 let project_context = cx.new(|_cx| ProjectContext::default());
5204 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5205 let context_server_registry =
5206 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5207 let model = Arc::new(FakeLanguageModel::default());
5208 let environment = Rc::new(cx.update(|cx| {
5209 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5210 }));
5211
5212 let deep_parent_thread = cx.new(|cx| {
5213 let mut thread = Thread::new(
5214 project.clone(),
5215 project_context,
5216 context_server_registry,
5217 Templates::new(),
5218 Some(model.clone()),
5219 cx,
5220 );
5221 thread.set_subagent_context(SubagentContext {
5222 parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5223 depth: MAX_SUBAGENT_DEPTH - 1,
5224 });
5225 thread
5226 });
5227 let deep_subagent_thread = cx.new(|cx| {
5228 let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5229 thread.add_default_tools(environment, cx);
5230 thread
5231 });
5232
5233 deep_subagent_thread.read_with(cx, |thread, _| {
5234 assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5235 assert!(
5236 !thread.has_registered_tool(SpawnAgentTool::NAME),
5237 "subagent tool should not be present at max depth"
5238 );
5239 });
5240}
5241
5242#[gpui::test]
5243async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5244 init_test(cx);
5245
5246 cx.update(|cx| {
5247 cx.update_flags(true, vec!["subagents".to_string()]);
5248 });
5249
5250 let fs = FakeFs::new(cx.executor());
5251 fs.insert_tree(path!("/test"), json!({})).await;
5252 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5253 let project_context = cx.new(|_cx| ProjectContext::default());
5254 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5255 let context_server_registry =
5256 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5257 let model = Arc::new(FakeLanguageModel::default());
5258
5259 let parent = cx.new(|cx| {
5260 Thread::new(
5261 project.clone(),
5262 project_context.clone(),
5263 context_server_registry.clone(),
5264 Templates::new(),
5265 Some(model.clone()),
5266 cx,
5267 )
5268 });
5269
5270 let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5271
5272 parent.update(cx, |thread, _cx| {
5273 thread.register_running_subagent(subagent.downgrade());
5274 });
5275
5276 subagent
5277 .update(cx, |thread, cx| {
5278 thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5279 })
5280 .unwrap();
5281 cx.run_until_parked();
5282
5283 subagent.read_with(cx, |thread, _| {
5284 assert!(!thread.is_turn_complete(), "subagent should be running");
5285 });
5286
5287 parent.update(cx, |thread, cx| {
5288 thread.cancel(cx).detach();
5289 });
5290
5291 subagent.read_with(cx, |thread, _| {
5292 assert!(
5293 thread.is_turn_complete(),
5294 "subagent should be cancelled when parent cancels"
5295 );
5296 });
5297}
5298
5299#[gpui::test]
5300async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5301 init_test(cx);
5302 cx.update(|cx| {
5303 LanguageModelRegistry::test(cx);
5304 });
5305 cx.update(|cx| {
5306 cx.update_flags(true, vec!["subagents".to_string()]);
5307 });
5308
5309 let fs = FakeFs::new(cx.executor());
5310 fs.insert_tree(
5311 "/",
5312 json!({
5313 "a": {
5314 "b.md": "Lorem"
5315 }
5316 }),
5317 )
5318 .await;
5319 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5320 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5321 let agent = cx.update(|cx| {
5322 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5323 });
5324 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5325
5326 let acp_thread = cx
5327 .update(|cx| {
5328 connection
5329 .clone()
5330 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5331 })
5332 .await
5333 .unwrap();
5334 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5335 let thread = agent.read_with(cx, |agent, _| {
5336 agent.sessions.get(&session_id).unwrap().thread.clone()
5337 });
5338 let model = Arc::new(FakeLanguageModel::default());
5339
5340 thread.update(cx, |thread, cx| {
5341 thread.set_model(model.clone(), cx);
5342 });
5343 cx.run_until_parked();
5344
5345 // Start the parent turn
5346 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5347 cx.run_until_parked();
5348 model.send_last_completion_stream_text_chunk("spawning subagent");
5349 let subagent_tool_input = SpawnAgentToolInput {
5350 label: "label".to_string(),
5351 message: "subagent task prompt".to_string(),
5352 session_id: None,
5353 };
5354 let subagent_tool_use = LanguageModelToolUse {
5355 id: "subagent_1".into(),
5356 name: SpawnAgentTool::NAME.into(),
5357 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5358 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5359 is_input_complete: true,
5360 thought_signature: None,
5361 };
5362 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5363 subagent_tool_use,
5364 ));
5365 model.end_last_completion_stream();
5366
5367 cx.run_until_parked();
5368
5369 // Verify subagent is running
5370 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5371 thread
5372 .running_subagent_ids(cx)
5373 .get(0)
5374 .expect("subagent thread should be running")
5375 .clone()
5376 });
5377
5378 // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5379 model.send_last_completion_stream_text_chunk("partial work");
5380 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5381 TokenUsage {
5382 input_tokens: 850_000,
5383 output_tokens: 0,
5384 cache_creation_input_tokens: 0,
5385 cache_read_input_tokens: 0,
5386 },
5387 ));
5388
5389 cx.run_until_parked();
5390
5391 // The subagent should no longer be running
5392 thread.read_with(cx, |thread, cx| {
5393 assert!(
5394 thread.running_subagent_ids(cx).is_empty(),
5395 "subagent should be stopped after context window warning"
5396 );
5397 });
5398
5399 // The parent model should get a new completion request to respond to the tool error
5400 model.send_last_completion_stream_text_chunk("Response after warning");
5401 model.end_last_completion_stream();
5402
5403 send.await.unwrap();
5404
5405 // Verify the parent thread shows the warning error in the tool call
5406 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5407 assert!(
5408 markdown.contains("nearing the end of its context window"),
5409 "tool output should contain context window warning message, got:\n{markdown}"
5410 );
5411 assert!(
5412 markdown.contains("Status: Failed"),
5413 "tool call should have Failed status, got:\n{markdown}"
5414 );
5415
5416 // Verify the subagent session still exists (can be resumed)
5417 agent.read_with(cx, |agent, _cx| {
5418 assert!(
5419 agent.sessions.contains_key(&subagent_session_id),
5420 "subagent session should still exist for potential resume"
5421 );
5422 });
5423}
5424
5425#[gpui::test]
5426async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5427 init_test(cx);
5428 cx.update(|cx| {
5429 LanguageModelRegistry::test(cx);
5430 });
5431 cx.update(|cx| {
5432 cx.update_flags(true, vec!["subagents".to_string()]);
5433 });
5434
5435 let fs = FakeFs::new(cx.executor());
5436 fs.insert_tree(
5437 "/",
5438 json!({
5439 "a": {
5440 "b.md": "Lorem"
5441 }
5442 }),
5443 )
5444 .await;
5445 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5446 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5447 let agent = cx.update(|cx| {
5448 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5449 });
5450 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5451
5452 let acp_thread = cx
5453 .update(|cx| {
5454 connection
5455 .clone()
5456 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5457 })
5458 .await
5459 .unwrap();
5460 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5461 let thread = agent.read_with(cx, |agent, _| {
5462 agent.sessions.get(&session_id).unwrap().thread.clone()
5463 });
5464 let model = Arc::new(FakeLanguageModel::default());
5465
5466 thread.update(cx, |thread, cx| {
5467 thread.set_model(model.clone(), cx);
5468 });
5469 cx.run_until_parked();
5470
5471 // === First turn: create subagent, trigger context window warning ===
5472 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5473 cx.run_until_parked();
5474 model.send_last_completion_stream_text_chunk("spawning subagent");
5475 let subagent_tool_input = SpawnAgentToolInput {
5476 label: "initial task".to_string(),
5477 message: "do the first task".to_string(),
5478 session_id: None,
5479 };
5480 let subagent_tool_use = LanguageModelToolUse {
5481 id: "subagent_1".into(),
5482 name: SpawnAgentTool::NAME.into(),
5483 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5484 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5485 is_input_complete: true,
5486 thought_signature: None,
5487 };
5488 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5489 subagent_tool_use,
5490 ));
5491 model.end_last_completion_stream();
5492
5493 cx.run_until_parked();
5494
5495 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5496 thread
5497 .running_subagent_ids(cx)
5498 .get(0)
5499 .expect("subagent thread should be running")
5500 .clone()
5501 });
5502
5503 // Subagent sends a usage update that crosses the warning threshold.
5504 // This triggers Normal→Warning, stopping the subagent.
5505 model.send_last_completion_stream_text_chunk("partial work");
5506 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5507 TokenUsage {
5508 input_tokens: 850_000,
5509 output_tokens: 0,
5510 cache_creation_input_tokens: 0,
5511 cache_read_input_tokens: 0,
5512 },
5513 ));
5514
5515 cx.run_until_parked();
5516
5517 // Verify the first turn was stopped with a context window warning
5518 thread.read_with(cx, |thread, cx| {
5519 assert!(
5520 thread.running_subagent_ids(cx).is_empty(),
5521 "subagent should be stopped after context window warning"
5522 );
5523 });
5524
5525 // Parent model responds to complete first turn
5526 model.send_last_completion_stream_text_chunk("First response");
5527 model.end_last_completion_stream();
5528
5529 send.await.unwrap();
5530
5531 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5532 assert!(
5533 markdown.contains("nearing the end of its context window"),
5534 "first turn should have context window warning, got:\n{markdown}"
5535 );
5536
5537 // === Second turn: resume the same subagent (now at Warning level) ===
5538 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5539 cx.run_until_parked();
5540 model.send_last_completion_stream_text_chunk("resuming subagent");
5541 let resume_tool_input = SpawnAgentToolInput {
5542 label: "follow-up task".to_string(),
5543 message: "do the follow-up task".to_string(),
5544 session_id: Some(subagent_session_id.clone()),
5545 };
5546 let resume_tool_use = LanguageModelToolUse {
5547 id: "subagent_2".into(),
5548 name: SpawnAgentTool::NAME.into(),
5549 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5550 input: serde_json::to_value(&resume_tool_input).unwrap(),
5551 is_input_complete: true,
5552 thought_signature: None,
5553 };
5554 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5555 model.end_last_completion_stream();
5556
5557 cx.run_until_parked();
5558
5559 // Subagent responds with tokens still at warning level (no worse).
5560 // Since ratio_before_prompt was already Warning, this should NOT
5561 // trigger the context window warning again.
5562 model.send_last_completion_stream_text_chunk("follow-up task response");
5563 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5564 TokenUsage {
5565 input_tokens: 870_000,
5566 output_tokens: 0,
5567 cache_creation_input_tokens: 0,
5568 cache_read_input_tokens: 0,
5569 },
5570 ));
5571 model.end_last_completion_stream();
5572
5573 cx.run_until_parked();
5574
5575 // Parent model responds to complete second turn
5576 model.send_last_completion_stream_text_chunk("Second response");
5577 model.end_last_completion_stream();
5578
5579 send2.await.unwrap();
5580
5581 // The resumed subagent should have completed normally since the ratio
5582 // didn't transition (it was Warning before and stayed at Warning)
5583 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5584 assert!(
5585 markdown.contains("follow-up task response"),
5586 "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5587 );
5588 // The second tool call should NOT have a context window warning
5589 let second_tool_pos = markdown
5590 .find("follow-up task")
5591 .expect("should find follow-up tool call");
5592 let after_second_tool = &markdown[second_tool_pos..];
5593 assert!(
5594 !after_second_tool.contains("nearing the end of its context window"),
5595 "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5596 );
5597}
5598
5599#[gpui::test]
5600async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5601 init_test(cx);
5602 cx.update(|cx| {
5603 LanguageModelRegistry::test(cx);
5604 });
5605 cx.update(|cx| {
5606 cx.update_flags(true, vec!["subagents".to_string()]);
5607 });
5608
5609 let fs = FakeFs::new(cx.executor());
5610 fs.insert_tree(
5611 "/",
5612 json!({
5613 "a": {
5614 "b.md": "Lorem"
5615 }
5616 }),
5617 )
5618 .await;
5619 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5620 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5621 let agent = cx.update(|cx| {
5622 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5623 });
5624 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5625
5626 let acp_thread = cx
5627 .update(|cx| {
5628 connection
5629 .clone()
5630 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5631 })
5632 .await
5633 .unwrap();
5634 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5635 let thread = agent.read_with(cx, |agent, _| {
5636 agent.sessions.get(&session_id).unwrap().thread.clone()
5637 });
5638 let model = Arc::new(FakeLanguageModel::default());
5639
5640 thread.update(cx, |thread, cx| {
5641 thread.set_model(model.clone(), cx);
5642 });
5643 cx.run_until_parked();
5644
5645 // Start the parent turn
5646 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5647 cx.run_until_parked();
5648 model.send_last_completion_stream_text_chunk("spawning subagent");
5649 let subagent_tool_input = SpawnAgentToolInput {
5650 label: "label".to_string(),
5651 message: "subagent task prompt".to_string(),
5652 session_id: None,
5653 };
5654 let subagent_tool_use = LanguageModelToolUse {
5655 id: "subagent_1".into(),
5656 name: SpawnAgentTool::NAME.into(),
5657 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5658 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5659 is_input_complete: true,
5660 thought_signature: None,
5661 };
5662 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5663 subagent_tool_use,
5664 ));
5665 model.end_last_completion_stream();
5666
5667 cx.run_until_parked();
5668
5669 // Verify subagent is running
5670 thread.read_with(cx, |thread, cx| {
5671 assert!(
5672 !thread.running_subagent_ids(cx).is_empty(),
5673 "subagent should be running"
5674 );
5675 });
5676
5677 // The subagent's model returns a non-retryable error
5678 model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5679 tokens: None,
5680 });
5681
5682 cx.run_until_parked();
5683
5684 // The subagent should no longer be running
5685 thread.read_with(cx, |thread, cx| {
5686 assert!(
5687 thread.running_subagent_ids(cx).is_empty(),
5688 "subagent should not be running after error"
5689 );
5690 });
5691
5692 // The parent model should get a new completion request to respond to the tool error
5693 model.send_last_completion_stream_text_chunk("Response after error");
5694 model.end_last_completion_stream();
5695
5696 send.await.unwrap();
5697
5698 // Verify the parent thread shows the error in the tool call
5699 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5700 assert!(
5701 markdown.contains("Status: Failed"),
5702 "tool call should have Failed status after model error, got:\n{markdown}"
5703 );
5704}
5705
5706#[gpui::test]
5707async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5708 init_test(cx);
5709
5710 let fs = FakeFs::new(cx.executor());
5711 fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5712 .await;
5713 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5714
5715 cx.update(|cx| {
5716 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5717 settings.tool_permissions.tools.insert(
5718 EditFileTool::NAME.into(),
5719 agent_settings::ToolRules {
5720 default: Some(settings::ToolPermissionMode::Allow),
5721 always_allow: vec![],
5722 always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5723 always_confirm: vec![],
5724 invalid_patterns: vec![],
5725 },
5726 );
5727 agent_settings::AgentSettings::override_global(settings, cx);
5728 });
5729
5730 let context_server_registry =
5731 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5732 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5733 let templates = crate::Templates::new();
5734 let thread = cx.new(|cx| {
5735 crate::Thread::new(
5736 project.clone(),
5737 cx.new(|_cx| prompt_store::ProjectContext::default()),
5738 context_server_registry,
5739 templates.clone(),
5740 None,
5741 cx,
5742 )
5743 });
5744
5745 #[allow(clippy::arc_with_non_send_sync)]
5746 let tool = Arc::new(crate::EditFileTool::new(
5747 project.clone(),
5748 thread.downgrade(),
5749 language_registry,
5750 templates,
5751 ));
5752 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5753
5754 let task = cx.update(|cx| {
5755 tool.run(
5756 ToolInput::resolved(crate::EditFileToolInput {
5757 display_description: "Edit sensitive file".to_string(),
5758 path: "root/sensitive_config.txt".into(),
5759 mode: crate::EditFileMode::Edit,
5760 }),
5761 event_stream,
5762 cx,
5763 )
5764 });
5765
5766 let result = task.await;
5767 assert!(result.is_err(), "expected edit to be blocked");
5768 assert!(
5769 result.unwrap_err().to_string().contains("blocked"),
5770 "error should mention the edit was blocked"
5771 );
5772}
5773
5774#[gpui::test]
5775async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5776 init_test(cx);
5777
5778 let fs = FakeFs::new(cx.executor());
5779 fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5780 .await;
5781 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5782
5783 cx.update(|cx| {
5784 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5785 settings.tool_permissions.tools.insert(
5786 DeletePathTool::NAME.into(),
5787 agent_settings::ToolRules {
5788 default: Some(settings::ToolPermissionMode::Allow),
5789 always_allow: vec![],
5790 always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5791 always_confirm: vec![],
5792 invalid_patterns: vec![],
5793 },
5794 );
5795 agent_settings::AgentSettings::override_global(settings, cx);
5796 });
5797
5798 let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5799
5800 #[allow(clippy::arc_with_non_send_sync)]
5801 let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5802 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5803
5804 let task = cx.update(|cx| {
5805 tool.run(
5806 ToolInput::resolved(crate::DeletePathToolInput {
5807 path: "root/important_data.txt".to_string(),
5808 }),
5809 event_stream,
5810 cx,
5811 )
5812 });
5813
5814 let result = task.await;
5815 assert!(result.is_err(), "expected deletion to be blocked");
5816 assert!(
5817 result.unwrap_err().contains("blocked"),
5818 "error should mention the deletion was blocked"
5819 );
5820}
5821
5822#[gpui::test]
5823async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5824 init_test(cx);
5825
5826 let fs = FakeFs::new(cx.executor());
5827 fs.insert_tree(
5828 "/root",
5829 json!({
5830 "safe.txt": "content",
5831 "protected": {}
5832 }),
5833 )
5834 .await;
5835 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5836
5837 cx.update(|cx| {
5838 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5839 settings.tool_permissions.tools.insert(
5840 MovePathTool::NAME.into(),
5841 agent_settings::ToolRules {
5842 default: Some(settings::ToolPermissionMode::Allow),
5843 always_allow: vec![],
5844 always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5845 always_confirm: vec![],
5846 invalid_patterns: vec![],
5847 },
5848 );
5849 agent_settings::AgentSettings::override_global(settings, cx);
5850 });
5851
5852 #[allow(clippy::arc_with_non_send_sync)]
5853 let tool = Arc::new(crate::MovePathTool::new(project));
5854 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5855
5856 let task = cx.update(|cx| {
5857 tool.run(
5858 ToolInput::resolved(crate::MovePathToolInput {
5859 source_path: "root/safe.txt".to_string(),
5860 destination_path: "root/protected/safe.txt".to_string(),
5861 }),
5862 event_stream,
5863 cx,
5864 )
5865 });
5866
5867 let result = task.await;
5868 assert!(
5869 result.is_err(),
5870 "expected move to be blocked due to destination path"
5871 );
5872 assert!(
5873 result.unwrap_err().contains("blocked"),
5874 "error should mention the move was blocked"
5875 );
5876}
5877
5878#[gpui::test]
5879async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5880 init_test(cx);
5881
5882 let fs = FakeFs::new(cx.executor());
5883 fs.insert_tree(
5884 "/root",
5885 json!({
5886 "secret.txt": "secret content",
5887 "public": {}
5888 }),
5889 )
5890 .await;
5891 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5892
5893 cx.update(|cx| {
5894 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5895 settings.tool_permissions.tools.insert(
5896 MovePathTool::NAME.into(),
5897 agent_settings::ToolRules {
5898 default: Some(settings::ToolPermissionMode::Allow),
5899 always_allow: vec![],
5900 always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5901 always_confirm: vec![],
5902 invalid_patterns: vec![],
5903 },
5904 );
5905 agent_settings::AgentSettings::override_global(settings, cx);
5906 });
5907
5908 #[allow(clippy::arc_with_non_send_sync)]
5909 let tool = Arc::new(crate::MovePathTool::new(project));
5910 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5911
5912 let task = cx.update(|cx| {
5913 tool.run(
5914 ToolInput::resolved(crate::MovePathToolInput {
5915 source_path: "root/secret.txt".to_string(),
5916 destination_path: "root/public/not_secret.txt".to_string(),
5917 }),
5918 event_stream,
5919 cx,
5920 )
5921 });
5922
5923 let result = task.await;
5924 assert!(
5925 result.is_err(),
5926 "expected move to be blocked due to source path"
5927 );
5928 assert!(
5929 result.unwrap_err().contains("blocked"),
5930 "error should mention the move was blocked"
5931 );
5932}
5933
5934#[gpui::test]
5935async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5936 init_test(cx);
5937
5938 let fs = FakeFs::new(cx.executor());
5939 fs.insert_tree(
5940 "/root",
5941 json!({
5942 "confidential.txt": "confidential data",
5943 "dest": {}
5944 }),
5945 )
5946 .await;
5947 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5948
5949 cx.update(|cx| {
5950 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5951 settings.tool_permissions.tools.insert(
5952 CopyPathTool::NAME.into(),
5953 agent_settings::ToolRules {
5954 default: Some(settings::ToolPermissionMode::Allow),
5955 always_allow: vec![],
5956 always_deny: vec![
5957 agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5958 ],
5959 always_confirm: vec![],
5960 invalid_patterns: vec![],
5961 },
5962 );
5963 agent_settings::AgentSettings::override_global(settings, cx);
5964 });
5965
5966 #[allow(clippy::arc_with_non_send_sync)]
5967 let tool = Arc::new(crate::CopyPathTool::new(project));
5968 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5969
5970 let task = cx.update(|cx| {
5971 tool.run(
5972 ToolInput::resolved(crate::CopyPathToolInput {
5973 source_path: "root/confidential.txt".to_string(),
5974 destination_path: "root/dest/copy.txt".to_string(),
5975 }),
5976 event_stream,
5977 cx,
5978 )
5979 });
5980
5981 let result = task.await;
5982 assert!(result.is_err(), "expected copy to be blocked");
5983 assert!(
5984 result.unwrap_err().contains("blocked"),
5985 "error should mention the copy was blocked"
5986 );
5987}
5988
5989#[gpui::test]
5990async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5991 init_test(cx);
5992
5993 let fs = FakeFs::new(cx.executor());
5994 fs.insert_tree(
5995 "/root",
5996 json!({
5997 "normal.txt": "normal content",
5998 "readonly": {
5999 "config.txt": "readonly content"
6000 }
6001 }),
6002 )
6003 .await;
6004 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6005
6006 cx.update(|cx| {
6007 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6008 settings.tool_permissions.tools.insert(
6009 SaveFileTool::NAME.into(),
6010 agent_settings::ToolRules {
6011 default: Some(settings::ToolPermissionMode::Allow),
6012 always_allow: vec![],
6013 always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6014 always_confirm: vec![],
6015 invalid_patterns: vec![],
6016 },
6017 );
6018 agent_settings::AgentSettings::override_global(settings, cx);
6019 });
6020
6021 #[allow(clippy::arc_with_non_send_sync)]
6022 let tool = Arc::new(crate::SaveFileTool::new(project));
6023 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6024
6025 let task = cx.update(|cx| {
6026 tool.run(
6027 ToolInput::resolved(crate::SaveFileToolInput {
6028 paths: vec![
6029 std::path::PathBuf::from("root/normal.txt"),
6030 std::path::PathBuf::from("root/readonly/config.txt"),
6031 ],
6032 }),
6033 event_stream,
6034 cx,
6035 )
6036 });
6037
6038 let result = task.await;
6039 assert!(
6040 result.is_err(),
6041 "expected save to be blocked due to denied path"
6042 );
6043 assert!(
6044 result.unwrap_err().contains("blocked"),
6045 "error should mention the save was blocked"
6046 );
6047}
6048
6049#[gpui::test]
6050async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6051 init_test(cx);
6052
6053 let fs = FakeFs::new(cx.executor());
6054 fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6055 .await;
6056 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6057
6058 cx.update(|cx| {
6059 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6060 settings.tool_permissions.tools.insert(
6061 SaveFileTool::NAME.into(),
6062 agent_settings::ToolRules {
6063 default: Some(settings::ToolPermissionMode::Allow),
6064 always_allow: vec![],
6065 always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6066 always_confirm: vec![],
6067 invalid_patterns: vec![],
6068 },
6069 );
6070 agent_settings::AgentSettings::override_global(settings, cx);
6071 });
6072
6073 #[allow(clippy::arc_with_non_send_sync)]
6074 let tool = Arc::new(crate::SaveFileTool::new(project));
6075 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6076
6077 let task = cx.update(|cx| {
6078 tool.run(
6079 ToolInput::resolved(crate::SaveFileToolInput {
6080 paths: vec![std::path::PathBuf::from("root/config.secret")],
6081 }),
6082 event_stream,
6083 cx,
6084 )
6085 });
6086
6087 let result = task.await;
6088 assert!(result.is_err(), "expected save to be blocked");
6089 assert!(
6090 result.unwrap_err().contains("blocked"),
6091 "error should mention the save was blocked"
6092 );
6093}
6094
6095#[gpui::test]
6096async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6097 init_test(cx);
6098
6099 cx.update(|cx| {
6100 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6101 settings.tool_permissions.tools.insert(
6102 WebSearchTool::NAME.into(),
6103 agent_settings::ToolRules {
6104 default: Some(settings::ToolPermissionMode::Allow),
6105 always_allow: vec![],
6106 always_deny: vec![
6107 agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6108 ],
6109 always_confirm: vec![],
6110 invalid_patterns: vec![],
6111 },
6112 );
6113 agent_settings::AgentSettings::override_global(settings, cx);
6114 });
6115
6116 #[allow(clippy::arc_with_non_send_sync)]
6117 let tool = Arc::new(crate::WebSearchTool);
6118 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6119
6120 let input: crate::WebSearchToolInput =
6121 serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6122
6123 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6124
6125 let result = task.await;
6126 assert!(result.is_err(), "expected search to be blocked");
6127 match result.unwrap_err() {
6128 crate::WebSearchToolOutput::Error { error } => {
6129 assert!(
6130 error.contains("blocked"),
6131 "error should mention the search was blocked"
6132 );
6133 }
6134 other => panic!("expected Error variant, got: {other:?}"),
6135 }
6136}
6137
6138#[gpui::test]
6139async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6140 init_test(cx);
6141
6142 let fs = FakeFs::new(cx.executor());
6143 fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6144 .await;
6145 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6146
6147 cx.update(|cx| {
6148 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6149 settings.tool_permissions.tools.insert(
6150 EditFileTool::NAME.into(),
6151 agent_settings::ToolRules {
6152 default: Some(settings::ToolPermissionMode::Confirm),
6153 always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6154 always_deny: vec![],
6155 always_confirm: vec![],
6156 invalid_patterns: vec![],
6157 },
6158 );
6159 agent_settings::AgentSettings::override_global(settings, cx);
6160 });
6161
6162 let context_server_registry =
6163 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6164 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6165 let templates = crate::Templates::new();
6166 let thread = cx.new(|cx| {
6167 crate::Thread::new(
6168 project.clone(),
6169 cx.new(|_cx| prompt_store::ProjectContext::default()),
6170 context_server_registry,
6171 templates.clone(),
6172 None,
6173 cx,
6174 )
6175 });
6176
6177 #[allow(clippy::arc_with_non_send_sync)]
6178 let tool = Arc::new(crate::EditFileTool::new(
6179 project,
6180 thread.downgrade(),
6181 language_registry,
6182 templates,
6183 ));
6184 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6185
6186 let _task = cx.update(|cx| {
6187 tool.run(
6188 ToolInput::resolved(crate::EditFileToolInput {
6189 display_description: "Edit README".to_string(),
6190 path: "root/README.md".into(),
6191 mode: crate::EditFileMode::Edit,
6192 }),
6193 event_stream,
6194 cx,
6195 )
6196 });
6197
6198 cx.run_until_parked();
6199
6200 let event = rx.try_next();
6201 assert!(
6202 !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6203 "expected no authorization request for allowed .md file"
6204 );
6205}
6206
6207#[gpui::test]
6208async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6209 init_test(cx);
6210
6211 let fs = FakeFs::new(cx.executor());
6212 fs.insert_tree(
6213 "/root",
6214 json!({
6215 ".zed": {
6216 "settings.json": "{}"
6217 },
6218 "README.md": "# Hello"
6219 }),
6220 )
6221 .await;
6222 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6223
6224 cx.update(|cx| {
6225 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6226 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6227 agent_settings::AgentSettings::override_global(settings, cx);
6228 });
6229
6230 let context_server_registry =
6231 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6232 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6233 let templates = crate::Templates::new();
6234 let thread = cx.new(|cx| {
6235 crate::Thread::new(
6236 project.clone(),
6237 cx.new(|_cx| prompt_store::ProjectContext::default()),
6238 context_server_registry,
6239 templates.clone(),
6240 None,
6241 cx,
6242 )
6243 });
6244
6245 #[allow(clippy::arc_with_non_send_sync)]
6246 let tool = Arc::new(crate::EditFileTool::new(
6247 project,
6248 thread.downgrade(),
6249 language_registry,
6250 templates,
6251 ));
6252
6253 // Editing a file inside .zed/ should still prompt even with global default: allow,
6254 // because local settings paths are sensitive and require confirmation regardless.
6255 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6256 let _task = cx.update(|cx| {
6257 tool.run(
6258 ToolInput::resolved(crate::EditFileToolInput {
6259 display_description: "Edit local settings".to_string(),
6260 path: "root/.zed/settings.json".into(),
6261 mode: crate::EditFileMode::Edit,
6262 }),
6263 event_stream,
6264 cx,
6265 )
6266 });
6267
6268 let _update = rx.expect_update_fields().await;
6269 let _auth = rx.expect_authorization().await;
6270}
6271
6272#[gpui::test]
6273async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6274 init_test(cx);
6275
6276 cx.update(|cx| {
6277 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6278 settings.tool_permissions.tools.insert(
6279 FetchTool::NAME.into(),
6280 agent_settings::ToolRules {
6281 default: Some(settings::ToolPermissionMode::Allow),
6282 always_allow: vec![],
6283 always_deny: vec![
6284 agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6285 ],
6286 always_confirm: vec![],
6287 invalid_patterns: vec![],
6288 },
6289 );
6290 agent_settings::AgentSettings::override_global(settings, cx);
6291 });
6292
6293 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6294
6295 #[allow(clippy::arc_with_non_send_sync)]
6296 let tool = Arc::new(crate::FetchTool::new(http_client));
6297 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6298
6299 let input: crate::FetchToolInput =
6300 serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6301
6302 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6303
6304 let result = task.await;
6305 assert!(result.is_err(), "expected fetch to be blocked");
6306 assert!(
6307 result.unwrap_err().contains("blocked"),
6308 "error should mention the fetch was blocked"
6309 );
6310}
6311
6312#[gpui::test]
6313async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6314 init_test(cx);
6315
6316 cx.update(|cx| {
6317 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6318 settings.tool_permissions.tools.insert(
6319 FetchTool::NAME.into(),
6320 agent_settings::ToolRules {
6321 default: Some(settings::ToolPermissionMode::Confirm),
6322 always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6323 always_deny: vec![],
6324 always_confirm: vec![],
6325 invalid_patterns: vec![],
6326 },
6327 );
6328 agent_settings::AgentSettings::override_global(settings, cx);
6329 });
6330
6331 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6332
6333 #[allow(clippy::arc_with_non_send_sync)]
6334 let tool = Arc::new(crate::FetchTool::new(http_client));
6335 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6336
6337 let input: crate::FetchToolInput =
6338 serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6339
6340 let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6341
6342 cx.run_until_parked();
6343
6344 let event = rx.try_next();
6345 assert!(
6346 !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6347 "expected no authorization request for allowed docs.rs URL"
6348 );
6349}
6350
6351#[gpui::test]
6352async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6353 init_test(cx);
6354 always_allow_tools(cx);
6355
6356 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6357 let fake_model = model.as_fake();
6358
6359 // Add a tool so we can simulate tool calls
6360 thread.update(cx, |thread, _cx| {
6361 thread.add_tool(EchoTool);
6362 });
6363
6364 // Start a turn by sending a message
6365 let mut events = thread
6366 .update(cx, |thread, cx| {
6367 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6368 })
6369 .unwrap();
6370 cx.run_until_parked();
6371
6372 // Simulate the model making a tool call
6373 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6374 LanguageModelToolUse {
6375 id: "tool_1".into(),
6376 name: "echo".into(),
6377 raw_input: r#"{"text": "hello"}"#.into(),
6378 input: json!({"text": "hello"}),
6379 is_input_complete: true,
6380 thought_signature: None,
6381 },
6382 ));
6383 fake_model
6384 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6385
6386 // Signal that a message is queued before ending the stream
6387 thread.update(cx, |thread, _cx| {
6388 thread.set_has_queued_message(true);
6389 });
6390
6391 // Now end the stream - tool will run, and the boundary check should see the queue
6392 fake_model.end_last_completion_stream();
6393
6394 // Collect all events until the turn stops
6395 let all_events = collect_events_until_stop(&mut events, cx).await;
6396
6397 // Verify we received the tool call event
6398 let tool_call_ids: Vec<_> = all_events
6399 .iter()
6400 .filter_map(|e| match e {
6401 Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6402 _ => None,
6403 })
6404 .collect();
6405 assert_eq!(
6406 tool_call_ids,
6407 vec!["tool_1"],
6408 "Should have received a tool call event for our echo tool"
6409 );
6410
6411 // The turn should have stopped with EndTurn
6412 let stop_reasons = stop_events(all_events);
6413 assert_eq!(
6414 stop_reasons,
6415 vec![acp::StopReason::EndTurn],
6416 "Turn should have ended after tool completion due to queued message"
6417 );
6418
6419 // Verify the queued message flag is still set
6420 thread.update(cx, |thread, _cx| {
6421 assert!(
6422 thread.has_queued_message(),
6423 "Should still have queued message flag set"
6424 );
6425 });
6426
6427 // Thread should be idle now
6428 thread.update(cx, |thread, _cx| {
6429 assert!(
6430 thread.is_turn_complete(),
6431 "Thread should not be running after turn ends"
6432 );
6433 });
6434}
6435
6436#[gpui::test]
6437async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6438 init_test(cx);
6439 always_allow_tools(cx);
6440
6441 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6442 let fake_model = model.as_fake();
6443
6444 thread.update(cx, |thread, _cx| {
6445 thread.add_tool(StreamingFailingEchoTool {
6446 receive_chunks_until_failure: 1,
6447 });
6448 });
6449
6450 let _events = thread
6451 .update(cx, |thread, cx| {
6452 thread.send(
6453 UserMessageId::new(),
6454 ["Use the streaming_failing_echo tool"],
6455 cx,
6456 )
6457 })
6458 .unwrap();
6459 cx.run_until_parked();
6460
6461 let tool_use = LanguageModelToolUse {
6462 id: "call_1".into(),
6463 name: StreamingFailingEchoTool::NAME.into(),
6464 raw_input: "hello".into(),
6465 input: json!({}),
6466 is_input_complete: false,
6467 thought_signature: None,
6468 };
6469
6470 fake_model
6471 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6472
6473 cx.run_until_parked();
6474
6475 let completions = fake_model.pending_completions();
6476 let last_completion = completions.last().unwrap();
6477
6478 assert_eq!(
6479 last_completion.messages[1..],
6480 vec![
6481 LanguageModelRequestMessage {
6482 role: Role::User,
6483 content: vec!["Use the streaming_failing_echo tool".into()],
6484 cache: false,
6485 reasoning_details: None,
6486 },
6487 LanguageModelRequestMessage {
6488 role: Role::Assistant,
6489 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6490 cache: false,
6491 reasoning_details: None,
6492 },
6493 LanguageModelRequestMessage {
6494 role: Role::User,
6495 content: vec![language_model::MessageContent::ToolResult(
6496 LanguageModelToolResult {
6497 tool_use_id: tool_use.id.clone(),
6498 tool_name: tool_use.name,
6499 is_error: true,
6500 content: "failed".into(),
6501 output: Some("failed".into()),
6502 }
6503 )],
6504 cache: true,
6505 reasoning_details: None,
6506 },
6507 ]
6508 );
6509}
6510
6511#[gpui::test]
6512async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6513 init_test(cx);
6514 always_allow_tools(cx);
6515
6516 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6517 let fake_model = model.as_fake();
6518
6519 let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6520 oneshot::channel();
6521
6522 thread.update(cx, |thread, _cx| {
6523 thread.add_tool(
6524 StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6525 );
6526 thread.add_tool(StreamingFailingEchoTool {
6527 receive_chunks_until_failure: 1,
6528 });
6529 });
6530
6531 let _events = thread
6532 .update(cx, |thread, cx| {
6533 thread.send(
6534 UserMessageId::new(),
6535 ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6536 cx,
6537 )
6538 })
6539 .unwrap();
6540 cx.run_until_parked();
6541
6542 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6543 LanguageModelToolUse {
6544 id: "call_1".into(),
6545 name: StreamingEchoTool::NAME.into(),
6546 raw_input: "hello".into(),
6547 input: json!({ "text": "hello" }),
6548 is_input_complete: false,
6549 thought_signature: None,
6550 },
6551 ));
6552 let first_tool_use = LanguageModelToolUse {
6553 id: "call_1".into(),
6554 name: StreamingEchoTool::NAME.into(),
6555 raw_input: "hello world".into(),
6556 input: json!({ "text": "hello world" }),
6557 is_input_complete: true,
6558 thought_signature: None,
6559 };
6560 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6561 first_tool_use.clone(),
6562 ));
6563 let second_tool_use = LanguageModelToolUse {
6564 name: StreamingFailingEchoTool::NAME.into(),
6565 raw_input: "hello".into(),
6566 input: json!({ "text": "hello" }),
6567 is_input_complete: false,
6568 thought_signature: None,
6569 id: "call_2".into(),
6570 };
6571 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6572 second_tool_use.clone(),
6573 ));
6574
6575 cx.run_until_parked();
6576
6577 complete_streaming_echo_tool_call_tx.send(()).unwrap();
6578
6579 cx.run_until_parked();
6580
6581 let completions = fake_model.pending_completions();
6582 let last_completion = completions.last().unwrap();
6583
6584 assert_eq!(
6585 last_completion.messages[1..],
6586 vec![
6587 LanguageModelRequestMessage {
6588 role: Role::User,
6589 content: vec![
6590 "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6591 ],
6592 cache: false,
6593 reasoning_details: None,
6594 },
6595 LanguageModelRequestMessage {
6596 role: Role::Assistant,
6597 content: vec![
6598 language_model::MessageContent::ToolUse(first_tool_use.clone()),
6599 language_model::MessageContent::ToolUse(second_tool_use.clone())
6600 ],
6601 cache: false,
6602 reasoning_details: None,
6603 },
6604 LanguageModelRequestMessage {
6605 role: Role::User,
6606 content: vec![
6607 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6608 tool_use_id: second_tool_use.id.clone(),
6609 tool_name: second_tool_use.name,
6610 is_error: true,
6611 content: "failed".into(),
6612 output: Some("failed".into()),
6613 }),
6614 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6615 tool_use_id: first_tool_use.id.clone(),
6616 tool_name: first_tool_use.name,
6617 is_error: false,
6618 content: "hello world".into(),
6619 output: Some("hello world".into()),
6620 }),
6621 ],
6622 cache: true,
6623 reasoning_details: None,
6624 },
6625 ]
6626 );
6627}
6628
6629#[gpui::test]
6630async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6631 let ThreadTest {
6632 model, thread, fs, ..
6633 } = setup(cx, TestModel::Fake).await;
6634 let fake_model_a = model.as_fake();
6635
6636 thread.update(cx, |thread, _cx| {
6637 thread.add_tool(EchoTool);
6638 thread.add_tool(DelayTool);
6639 });
6640
6641 // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6642 fs.insert_file(
6643 paths::settings_file(),
6644 json!({
6645 "agent": {
6646 "profiles": {
6647 "profile-a": {
6648 "name": "Profile A",
6649 "tools": {
6650 EchoTool::NAME: true,
6651 DelayTool::NAME: true,
6652 }
6653 },
6654 "profile-b": {
6655 "name": "Profile B",
6656 "tools": {
6657 DelayTool::NAME: true,
6658 }
6659 }
6660 }
6661 }
6662 })
6663 .to_string()
6664 .into_bytes(),
6665 )
6666 .await;
6667 cx.run_until_parked();
6668
6669 thread.update(cx, |thread, cx| {
6670 thread.set_profile(AgentProfileId("profile-a".into()), cx);
6671 thread.set_thinking_enabled(false, cx);
6672 });
6673
6674 // Send a message — first iteration starts with model A, profile-a, thinking off.
6675 thread
6676 .update(cx, |thread, cx| {
6677 thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6678 })
6679 .unwrap();
6680 cx.run_until_parked();
6681
6682 // Verify first request has both tools and thinking disabled.
6683 let completions = fake_model_a.pending_completions();
6684 assert_eq!(completions.len(), 1);
6685 let first_tools = tool_names_for_completion(&completions[0]);
6686 assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6687 assert!(!completions[0].thinking_allowed);
6688
6689 // Model A responds with an echo tool call.
6690 fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6691 LanguageModelToolUse {
6692 id: "tool_1".into(),
6693 name: "echo".into(),
6694 raw_input: r#"{"text":"hello"}"#.into(),
6695 input: json!({"text": "hello"}),
6696 is_input_complete: true,
6697 thought_signature: None,
6698 },
6699 ));
6700 fake_model_a.end_last_completion_stream();
6701
6702 // Before the next iteration runs, switch to profile-b (only DelayTool),
6703 // swap in a new model, and enable thinking.
6704 let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6705 "test-provider",
6706 "model-b",
6707 "Model B",
6708 true,
6709 ));
6710 thread.update(cx, |thread, cx| {
6711 thread.set_profile(AgentProfileId("profile-b".into()), cx);
6712 thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6713 thread.set_thinking_enabled(true, cx);
6714 });
6715
6716 // Run until parked — processes the echo tool call, loops back, picks up
6717 // the new model/profile/thinking, and makes a second request to model B.
6718 cx.run_until_parked();
6719
6720 // The second request should have gone to model B.
6721 let model_b_completions = fake_model_b.pending_completions();
6722 assert_eq!(
6723 model_b_completions.len(),
6724 1,
6725 "second request should go to model B"
6726 );
6727
6728 // Profile-b only has DelayTool, so echo should be gone.
6729 let second_tools = tool_names_for_completion(&model_b_completions[0]);
6730 assert_eq!(second_tools, vec![DelayTool::NAME]);
6731
6732 // Thinking should now be enabled.
6733 assert!(model_b_completions[0].thinking_allowed);
6734}