1use super::*;
2use acp_thread::{
3 AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
4 UserMessageId,
5};
6use agent_client_protocol::{self as acp};
7use agent_settings::AgentProfileId;
8use anyhow::Result;
9use client::{Client, UserStore};
10use cloud_llm_client::CompletionIntent;
11use collections::IndexMap;
12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
13use feature_flags::FeatureFlagAppExt as _;
14use fs::{FakeFs, Fs};
15use futures::{
16 FutureExt as _, StreamExt,
17 channel::{
18 mpsc::{self, UnboundedReceiver},
19 oneshot,
20 },
21 future::{Fuse, Shared},
22};
23use gpui::{
24 App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
25 http_client::FakeHttpClient,
26};
27use indoc::indoc;
28use language_model::{
29 LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
30 LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
31 LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
32 LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
33 fake_provider::FakeLanguageModel,
34};
35use pretty_assertions::assert_eq;
36use project::{
37 Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
38};
39use prompt_store::ProjectContext;
40use reqwest_client::ReqwestClient;
41use schemars::JsonSchema;
42use serde::{Deserialize, Serialize};
43use serde_json::json;
44use settings::{Settings, SettingsStore};
45use std::{
46 path::Path,
47 pin::Pin,
48 rc::Rc,
49 sync::{
50 Arc,
51 atomic::{AtomicBool, Ordering},
52 },
53 time::Duration,
54};
55use util::path;
56
57mod edit_file_thread_test;
58mod test_tools;
59use test_tools::*;
60
61fn init_test(cx: &mut TestAppContext) {
62 cx.update(|cx| {
63 let settings_store = SettingsStore::test(cx);
64 cx.set_global(settings_store);
65 });
66}
67
68struct FakeTerminalHandle {
69 killed: Arc<AtomicBool>,
70 stopped_by_user: Arc<AtomicBool>,
71 exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
72 wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
73 output: acp::TerminalOutputResponse,
74 id: acp::TerminalId,
75}
76
77impl FakeTerminalHandle {
78 fn new_never_exits(cx: &mut App) -> Self {
79 let killed = Arc::new(AtomicBool::new(false));
80 let stopped_by_user = Arc::new(AtomicBool::new(false));
81
82 let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
83
84 let wait_for_exit = cx
85 .spawn(async move |_cx| {
86 // Wait for the exit signal (sent when kill() is called)
87 let _ = exit_receiver.await;
88 acp::TerminalExitStatus::new()
89 })
90 .shared();
91
92 Self {
93 killed,
94 stopped_by_user,
95 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
96 wait_for_exit,
97 output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
98 id: acp::TerminalId::new("fake_terminal".to_string()),
99 }
100 }
101
102 fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
103 let killed = Arc::new(AtomicBool::new(false));
104 let stopped_by_user = Arc::new(AtomicBool::new(false));
105 let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
106
107 let wait_for_exit = cx
108 .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
109 .shared();
110
111 Self {
112 killed,
113 stopped_by_user,
114 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
115 wait_for_exit,
116 output: acp::TerminalOutputResponse::new("command output".to_string(), false),
117 id: acp::TerminalId::new("fake_terminal".to_string()),
118 }
119 }
120
121 fn was_killed(&self) -> bool {
122 self.killed.load(Ordering::SeqCst)
123 }
124
125 fn set_stopped_by_user(&self, stopped: bool) {
126 self.stopped_by_user.store(stopped, Ordering::SeqCst);
127 }
128
129 fn signal_exit(&self) {
130 if let Some(sender) = self.exit_sender.borrow_mut().take() {
131 let _ = sender.send(());
132 }
133 }
134}
135
136impl crate::TerminalHandle for FakeTerminalHandle {
137 fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
138 Ok(self.id.clone())
139 }
140
141 fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
142 Ok(self.output.clone())
143 }
144
145 fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
146 Ok(self.wait_for_exit.clone())
147 }
148
149 fn kill(&self, _cx: &AsyncApp) -> Result<()> {
150 self.killed.store(true, Ordering::SeqCst);
151 self.signal_exit();
152 Ok(())
153 }
154
155 fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
156 Ok(self.stopped_by_user.load(Ordering::SeqCst))
157 }
158}
159
160struct FakeSubagentHandle {
161 session_id: acp::SessionId,
162 send_task: Shared<Task<String>>,
163}
164
165impl SubagentHandle for FakeSubagentHandle {
166 fn id(&self) -> acp::SessionId {
167 self.session_id.clone()
168 }
169
170 fn num_entries(&self, _cx: &App) -> usize {
171 unimplemented!()
172 }
173
174 fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
175 let task = self.send_task.clone();
176 cx.background_spawn(async move { Ok(task.await) })
177 }
178}
179
180#[derive(Default)]
181struct FakeThreadEnvironment {
182 terminal_handle: Option<Rc<FakeTerminalHandle>>,
183 subagent_handle: Option<Rc<FakeSubagentHandle>>,
184}
185
186impl FakeThreadEnvironment {
187 pub fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
188 Self {
189 terminal_handle: Some(terminal_handle.into()),
190 ..self
191 }
192 }
193}
194
195impl crate::ThreadEnvironment for FakeThreadEnvironment {
196 fn create_terminal(
197 &self,
198 _command: String,
199 _cwd: Option<std::path::PathBuf>,
200 _output_byte_limit: Option<u64>,
201 _cx: &mut AsyncApp,
202 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
203 let handle = self
204 .terminal_handle
205 .clone()
206 .expect("Terminal handle not available on FakeThreadEnvironment");
207 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
208 }
209
210 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
211 Ok(self
212 .subagent_handle
213 .clone()
214 .expect("Subagent handle not available on FakeThreadEnvironment")
215 as Rc<dyn SubagentHandle>)
216 }
217}
218
219/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
220struct MultiTerminalEnvironment {
221 handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
222}
223
224impl MultiTerminalEnvironment {
225 fn new() -> Self {
226 Self {
227 handles: std::cell::RefCell::new(Vec::new()),
228 }
229 }
230
231 fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
232 self.handles.borrow().clone()
233 }
234}
235
236impl crate::ThreadEnvironment for MultiTerminalEnvironment {
237 fn create_terminal(
238 &self,
239 _command: String,
240 _cwd: Option<std::path::PathBuf>,
241 _output_byte_limit: Option<u64>,
242 cx: &mut AsyncApp,
243 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
244 let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
245 self.handles.borrow_mut().push(handle.clone());
246 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
247 }
248
249 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
250 unimplemented!()
251 }
252}
253
254fn always_allow_tools(cx: &mut TestAppContext) {
255 cx.update(|cx| {
256 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
257 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
258 agent_settings::AgentSettings::override_global(settings, cx);
259 });
260}
261
262#[gpui::test]
263async fn test_echo(cx: &mut TestAppContext) {
264 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
265 let fake_model = model.as_fake();
266
267 let events = thread
268 .update(cx, |thread, cx| {
269 thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
270 })
271 .unwrap();
272 cx.run_until_parked();
273 fake_model.send_last_completion_stream_text_chunk("Hello");
274 fake_model
275 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
276 fake_model.end_last_completion_stream();
277
278 let events = events.collect().await;
279 thread.update(cx, |thread, _cx| {
280 assert_eq!(
281 thread.last_received_or_pending_message().unwrap().role(),
282 Role::Assistant
283 );
284 assert_eq!(
285 thread
286 .last_received_or_pending_message()
287 .unwrap()
288 .to_markdown(),
289 "Hello\n"
290 )
291 });
292 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
293}
294
295#[gpui::test]
296async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
297 init_test(cx);
298 always_allow_tools(cx);
299
300 let fs = FakeFs::new(cx.executor());
301 let project = Project::test(fs, [], cx).await;
302
303 let environment = Rc::new(cx.update(|cx| {
304 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
305 }));
306 let handle = environment.terminal_handle.clone().unwrap();
307
308 #[allow(clippy::arc_with_non_send_sync)]
309 let tool = Arc::new(crate::TerminalTool::new(project, environment));
310 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
311
312 let task = cx.update(|cx| {
313 tool.run(
314 ToolInput::resolved(crate::TerminalToolInput {
315 command: "sleep 1000".to_string(),
316 cd: ".".to_string(),
317 timeout_ms: Some(5),
318 }),
319 event_stream,
320 cx,
321 )
322 });
323
324 let update = rx.expect_update_fields().await;
325 assert!(
326 update.content.iter().any(|blocks| {
327 blocks
328 .iter()
329 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
330 }),
331 "expected tool call update to include terminal content"
332 );
333
334 let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
335
336 let deadline = std::time::Instant::now() + Duration::from_millis(500);
337 loop {
338 if let Some(result) = task_future.as_mut().now_or_never() {
339 let result = result.expect("terminal tool task should complete");
340
341 assert!(
342 handle.was_killed(),
343 "expected terminal handle to be killed on timeout"
344 );
345 assert!(
346 result.contains("partial output"),
347 "expected result to include terminal output, got: {result}"
348 );
349 return;
350 }
351
352 if std::time::Instant::now() >= deadline {
353 panic!("timed out waiting for terminal tool task to complete");
354 }
355
356 cx.run_until_parked();
357 cx.background_executor.timer(Duration::from_millis(1)).await;
358 }
359}
360
361#[gpui::test]
362#[ignore]
363async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
364 init_test(cx);
365 always_allow_tools(cx);
366
367 let fs = FakeFs::new(cx.executor());
368 let project = Project::test(fs, [], cx).await;
369
370 let environment = Rc::new(cx.update(|cx| {
371 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
372 }));
373 let handle = environment.terminal_handle.clone().unwrap();
374
375 #[allow(clippy::arc_with_non_send_sync)]
376 let tool = Arc::new(crate::TerminalTool::new(project, environment));
377 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
378
379 let _task = cx.update(|cx| {
380 tool.run(
381 ToolInput::resolved(crate::TerminalToolInput {
382 command: "sleep 1000".to_string(),
383 cd: ".".to_string(),
384 timeout_ms: None,
385 }),
386 event_stream,
387 cx,
388 )
389 });
390
391 let update = rx.expect_update_fields().await;
392 assert!(
393 update.content.iter().any(|blocks| {
394 blocks
395 .iter()
396 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
397 }),
398 "expected tool call update to include terminal content"
399 );
400
401 cx.background_executor
402 .timer(Duration::from_millis(25))
403 .await;
404
405 assert!(
406 !handle.was_killed(),
407 "did not expect terminal handle to be killed without a timeout"
408 );
409}
410
411#[gpui::test]
412async fn test_thinking(cx: &mut TestAppContext) {
413 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
414 let fake_model = model.as_fake();
415
416 let events = thread
417 .update(cx, |thread, cx| {
418 thread.send(
419 UserMessageId::new(),
420 [indoc! {"
421 Testing:
422
423 Generate a thinking step where you just think the word 'Think',
424 and have your final answer be 'Hello'
425 "}],
426 cx,
427 )
428 })
429 .unwrap();
430 cx.run_until_parked();
431 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
432 text: "Think".to_string(),
433 signature: None,
434 });
435 fake_model.send_last_completion_stream_text_chunk("Hello");
436 fake_model
437 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
438 fake_model.end_last_completion_stream();
439
440 let events = events.collect().await;
441 thread.update(cx, |thread, _cx| {
442 assert_eq!(
443 thread.last_received_or_pending_message().unwrap().role(),
444 Role::Assistant
445 );
446 assert_eq!(
447 thread
448 .last_received_or_pending_message()
449 .unwrap()
450 .to_markdown(),
451 indoc! {"
452 <think>Think</think>
453 Hello
454 "}
455 )
456 });
457 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
458}
459
460#[gpui::test]
461async fn test_system_prompt(cx: &mut TestAppContext) {
462 let ThreadTest {
463 model,
464 thread,
465 project_context,
466 ..
467 } = setup(cx, TestModel::Fake).await;
468 let fake_model = model.as_fake();
469
470 project_context.update(cx, |project_context, _cx| {
471 project_context.shell = "test-shell".into()
472 });
473 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
474 thread
475 .update(cx, |thread, cx| {
476 thread.send(UserMessageId::new(), ["abc"], cx)
477 })
478 .unwrap();
479 cx.run_until_parked();
480 let mut pending_completions = fake_model.pending_completions();
481 assert_eq!(
482 pending_completions.len(),
483 1,
484 "unexpected pending completions: {:?}",
485 pending_completions
486 );
487
488 let pending_completion = pending_completions.pop().unwrap();
489 assert_eq!(pending_completion.messages[0].role, Role::System);
490
491 let system_message = &pending_completion.messages[0];
492 let system_prompt = system_message.content[0].to_str().unwrap();
493 assert!(
494 system_prompt.contains("test-shell"),
495 "unexpected system message: {:?}",
496 system_message
497 );
498 assert!(
499 system_prompt.contains("## Fixing Diagnostics"),
500 "unexpected system message: {:?}",
501 system_message
502 );
503}
504
505#[gpui::test]
506async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
507 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
508 let fake_model = model.as_fake();
509
510 thread
511 .update(cx, |thread, cx| {
512 thread.send(UserMessageId::new(), ["abc"], cx)
513 })
514 .unwrap();
515 cx.run_until_parked();
516 let mut pending_completions = fake_model.pending_completions();
517 assert_eq!(
518 pending_completions.len(),
519 1,
520 "unexpected pending completions: {:?}",
521 pending_completions
522 );
523
524 let pending_completion = pending_completions.pop().unwrap();
525 assert_eq!(pending_completion.messages[0].role, Role::System);
526
527 let system_message = &pending_completion.messages[0];
528 let system_prompt = system_message.content[0].to_str().unwrap();
529 assert!(
530 !system_prompt.contains("## Tool Use"),
531 "unexpected system message: {:?}",
532 system_message
533 );
534 assert!(
535 !system_prompt.contains("## Fixing Diagnostics"),
536 "unexpected system message: {:?}",
537 system_message
538 );
539}
540
541#[gpui::test]
542async fn test_prompt_caching(cx: &mut TestAppContext) {
543 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
544 let fake_model = model.as_fake();
545
546 // Send initial user message and verify it's cached
547 thread
548 .update(cx, |thread, cx| {
549 thread.send(UserMessageId::new(), ["Message 1"], cx)
550 })
551 .unwrap();
552 cx.run_until_parked();
553
554 let completion = fake_model.pending_completions().pop().unwrap();
555 assert_eq!(
556 completion.messages[1..],
557 vec![LanguageModelRequestMessage {
558 role: Role::User,
559 content: vec!["Message 1".into()],
560 cache: true,
561 reasoning_details: None,
562 }]
563 );
564 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
565 "Response to Message 1".into(),
566 ));
567 fake_model.end_last_completion_stream();
568 cx.run_until_parked();
569
570 // Send another user message and verify only the latest is cached
571 thread
572 .update(cx, |thread, cx| {
573 thread.send(UserMessageId::new(), ["Message 2"], cx)
574 })
575 .unwrap();
576 cx.run_until_parked();
577
578 let completion = fake_model.pending_completions().pop().unwrap();
579 assert_eq!(
580 completion.messages[1..],
581 vec![
582 LanguageModelRequestMessage {
583 role: Role::User,
584 content: vec!["Message 1".into()],
585 cache: false,
586 reasoning_details: None,
587 },
588 LanguageModelRequestMessage {
589 role: Role::Assistant,
590 content: vec!["Response to Message 1".into()],
591 cache: false,
592 reasoning_details: None,
593 },
594 LanguageModelRequestMessage {
595 role: Role::User,
596 content: vec!["Message 2".into()],
597 cache: true,
598 reasoning_details: None,
599 }
600 ]
601 );
602 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
603 "Response to Message 2".into(),
604 ));
605 fake_model.end_last_completion_stream();
606 cx.run_until_parked();
607
608 // Simulate a tool call and verify that the latest tool result is cached
609 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
610 thread
611 .update(cx, |thread, cx| {
612 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
613 })
614 .unwrap();
615 cx.run_until_parked();
616
617 let tool_use = LanguageModelToolUse {
618 id: "tool_1".into(),
619 name: EchoTool::NAME.into(),
620 raw_input: json!({"text": "test"}).to_string(),
621 input: json!({"text": "test"}),
622 is_input_complete: true,
623 thought_signature: None,
624 };
625 fake_model
626 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
627 fake_model.end_last_completion_stream();
628 cx.run_until_parked();
629
630 let completion = fake_model.pending_completions().pop().unwrap();
631 let tool_result = LanguageModelToolResult {
632 tool_use_id: "tool_1".into(),
633 tool_name: EchoTool::NAME.into(),
634 is_error: false,
635 content: "test".into(),
636 output: Some("test".into()),
637 };
638 assert_eq!(
639 completion.messages[1..],
640 vec![
641 LanguageModelRequestMessage {
642 role: Role::User,
643 content: vec!["Message 1".into()],
644 cache: false,
645 reasoning_details: None,
646 },
647 LanguageModelRequestMessage {
648 role: Role::Assistant,
649 content: vec!["Response to Message 1".into()],
650 cache: false,
651 reasoning_details: None,
652 },
653 LanguageModelRequestMessage {
654 role: Role::User,
655 content: vec!["Message 2".into()],
656 cache: false,
657 reasoning_details: None,
658 },
659 LanguageModelRequestMessage {
660 role: Role::Assistant,
661 content: vec!["Response to Message 2".into()],
662 cache: false,
663 reasoning_details: None,
664 },
665 LanguageModelRequestMessage {
666 role: Role::User,
667 content: vec!["Use the echo tool".into()],
668 cache: false,
669 reasoning_details: None,
670 },
671 LanguageModelRequestMessage {
672 role: Role::Assistant,
673 content: vec![MessageContent::ToolUse(tool_use)],
674 cache: false,
675 reasoning_details: None,
676 },
677 LanguageModelRequestMessage {
678 role: Role::User,
679 content: vec![MessageContent::ToolResult(tool_result)],
680 cache: true,
681 reasoning_details: None,
682 }
683 ]
684 );
685}
686
687#[gpui::test]
688#[cfg_attr(not(feature = "e2e"), ignore)]
689async fn test_basic_tool_calls(cx: &mut TestAppContext) {
690 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
691
692 // Test a tool call that's likely to complete *before* streaming stops.
693 let events = thread
694 .update(cx, |thread, cx| {
695 thread.add_tool(EchoTool);
696 thread.send(
697 UserMessageId::new(),
698 ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
699 cx,
700 )
701 })
702 .unwrap()
703 .collect()
704 .await;
705 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
706
707 // Test a tool calls that's likely to complete *after* streaming stops.
708 let events = thread
709 .update(cx, |thread, cx| {
710 thread.remove_tool(&EchoTool::NAME);
711 thread.add_tool(DelayTool);
712 thread.send(
713 UserMessageId::new(),
714 [
715 "Now call the delay tool with 200ms.",
716 "When the timer goes off, then you echo the output of the tool.",
717 ],
718 cx,
719 )
720 })
721 .unwrap()
722 .collect()
723 .await;
724 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
725 thread.update(cx, |thread, _cx| {
726 assert!(
727 thread
728 .last_received_or_pending_message()
729 .unwrap()
730 .as_agent_message()
731 .unwrap()
732 .content
733 .iter()
734 .any(|content| {
735 if let AgentMessageContent::Text(text) = content {
736 text.contains("Ding")
737 } else {
738 false
739 }
740 }),
741 "{}",
742 thread.to_markdown()
743 );
744 });
745}
746
747#[gpui::test]
748#[cfg_attr(not(feature = "e2e"), ignore)]
749async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
750 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
751
752 // Test a tool call that's likely to complete *before* streaming stops.
753 let mut events = thread
754 .update(cx, |thread, cx| {
755 thread.add_tool(WordListTool);
756 thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
757 })
758 .unwrap();
759
760 let mut saw_partial_tool_use = false;
761 while let Some(event) = events.next().await {
762 if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
763 thread.update(cx, |thread, _cx| {
764 // Look for a tool use in the thread's last message
765 let message = thread.last_received_or_pending_message().unwrap();
766 let agent_message = message.as_agent_message().unwrap();
767 let last_content = agent_message.content.last().unwrap();
768 if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
769 assert_eq!(last_tool_use.name.as_ref(), "word_list");
770 if tool_call.status == acp::ToolCallStatus::Pending {
771 if !last_tool_use.is_input_complete
772 && last_tool_use.input.get("g").is_none()
773 {
774 saw_partial_tool_use = true;
775 }
776 } else {
777 last_tool_use
778 .input
779 .get("a")
780 .expect("'a' has streamed because input is now complete");
781 last_tool_use
782 .input
783 .get("g")
784 .expect("'g' has streamed because input is now complete");
785 }
786 } else {
787 panic!("last content should be a tool use");
788 }
789 });
790 }
791 }
792
793 assert!(
794 saw_partial_tool_use,
795 "should see at least one partially streamed tool use in the history"
796 );
797}
798
799#[gpui::test]
800async fn test_tool_authorization(cx: &mut TestAppContext) {
801 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
802 let fake_model = model.as_fake();
803
804 let mut events = thread
805 .update(cx, |thread, cx| {
806 thread.add_tool(ToolRequiringPermission);
807 thread.send(UserMessageId::new(), ["abc"], cx)
808 })
809 .unwrap();
810 cx.run_until_parked();
811 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
812 LanguageModelToolUse {
813 id: "tool_id_1".into(),
814 name: ToolRequiringPermission::NAME.into(),
815 raw_input: "{}".into(),
816 input: json!({}),
817 is_input_complete: true,
818 thought_signature: None,
819 },
820 ));
821 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
822 LanguageModelToolUse {
823 id: "tool_id_2".into(),
824 name: ToolRequiringPermission::NAME.into(),
825 raw_input: "{}".into(),
826 input: json!({}),
827 is_input_complete: true,
828 thought_signature: None,
829 },
830 ));
831 fake_model.end_last_completion_stream();
832 let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
833 let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
834
835 // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
836 tool_call_auth_1
837 .response
838 .send(acp::PermissionOptionId::new("allow"))
839 .unwrap();
840 cx.run_until_parked();
841
842 // Reject the second - send "deny" option_id directly since Deny is now a button
843 tool_call_auth_2
844 .response
845 .send(acp::PermissionOptionId::new("deny"))
846 .unwrap();
847 cx.run_until_parked();
848
849 let completion = fake_model.pending_completions().pop().unwrap();
850 let message = completion.messages.last().unwrap();
851 assert_eq!(
852 message.content,
853 vec![
854 language_model::MessageContent::ToolResult(LanguageModelToolResult {
855 tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
856 tool_name: ToolRequiringPermission::NAME.into(),
857 is_error: false,
858 content: "Allowed".into(),
859 output: Some("Allowed".into())
860 }),
861 language_model::MessageContent::ToolResult(LanguageModelToolResult {
862 tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
863 tool_name: ToolRequiringPermission::NAME.into(),
864 is_error: true,
865 content: "Permission to run tool denied by user".into(),
866 output: Some("Permission to run tool denied by user".into())
867 })
868 ]
869 );
870
871 // Simulate yet another tool call.
872 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
873 LanguageModelToolUse {
874 id: "tool_id_3".into(),
875 name: ToolRequiringPermission::NAME.into(),
876 raw_input: "{}".into(),
877 input: json!({}),
878 is_input_complete: true,
879 thought_signature: None,
880 },
881 ));
882 fake_model.end_last_completion_stream();
883
884 // Respond by always allowing tools - send transformed option_id
885 // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
886 let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
887 tool_call_auth_3
888 .response
889 .send(acp::PermissionOptionId::new(
890 "always_allow:tool_requiring_permission",
891 ))
892 .unwrap();
893 cx.run_until_parked();
894 let completion = fake_model.pending_completions().pop().unwrap();
895 let message = completion.messages.last().unwrap();
896 assert_eq!(
897 message.content,
898 vec![language_model::MessageContent::ToolResult(
899 LanguageModelToolResult {
900 tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
901 tool_name: ToolRequiringPermission::NAME.into(),
902 is_error: false,
903 content: "Allowed".into(),
904 output: Some("Allowed".into())
905 }
906 )]
907 );
908
909 // Simulate a final tool call, ensuring we don't trigger authorization.
910 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
911 LanguageModelToolUse {
912 id: "tool_id_4".into(),
913 name: ToolRequiringPermission::NAME.into(),
914 raw_input: "{}".into(),
915 input: json!({}),
916 is_input_complete: true,
917 thought_signature: None,
918 },
919 ));
920 fake_model.end_last_completion_stream();
921 cx.run_until_parked();
922 let completion = fake_model.pending_completions().pop().unwrap();
923 let message = completion.messages.last().unwrap();
924 assert_eq!(
925 message.content,
926 vec![language_model::MessageContent::ToolResult(
927 LanguageModelToolResult {
928 tool_use_id: "tool_id_4".into(),
929 tool_name: ToolRequiringPermission::NAME.into(),
930 is_error: false,
931 content: "Allowed".into(),
932 output: Some("Allowed".into())
933 }
934 )]
935 );
936}
937
938#[gpui::test]
939async fn test_tool_hallucination(cx: &mut TestAppContext) {
940 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
941 let fake_model = model.as_fake();
942
943 let mut events = thread
944 .update(cx, |thread, cx| {
945 thread.send(UserMessageId::new(), ["abc"], cx)
946 })
947 .unwrap();
948 cx.run_until_parked();
949 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
950 LanguageModelToolUse {
951 id: "tool_id_1".into(),
952 name: "nonexistent_tool".into(),
953 raw_input: "{}".into(),
954 input: json!({}),
955 is_input_complete: true,
956 thought_signature: None,
957 },
958 ));
959 fake_model.end_last_completion_stream();
960
961 let tool_call = expect_tool_call(&mut events).await;
962 assert_eq!(tool_call.title, "nonexistent_tool");
963 assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
964 let update = expect_tool_call_update_fields(&mut events).await;
965 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
966}
967
968async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
969 let event = events
970 .next()
971 .await
972 .expect("no tool call authorization event received")
973 .unwrap();
974 match event {
975 ThreadEvent::ToolCall(tool_call) => tool_call,
976 event => {
977 panic!("Unexpected event {event:?}");
978 }
979 }
980}
981
982async fn expect_tool_call_update_fields(
983 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
984) -> acp::ToolCallUpdate {
985 let event = events
986 .next()
987 .await
988 .expect("no tool call authorization event received")
989 .unwrap();
990 match event {
991 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
992 event => {
993 panic!("Unexpected event {event:?}");
994 }
995 }
996}
997
998async fn next_tool_call_authorization(
999 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1000) -> ToolCallAuthorization {
1001 loop {
1002 let event = events
1003 .next()
1004 .await
1005 .expect("no tool call authorization event received")
1006 .unwrap();
1007 if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1008 let permission_kinds = tool_call_authorization
1009 .options
1010 .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1011 .map(|option| option.kind);
1012 let allow_once = tool_call_authorization
1013 .options
1014 .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1015 .map(|option| option.kind);
1016
1017 assert_eq!(
1018 permission_kinds,
1019 Some(acp::PermissionOptionKind::AllowAlways)
1020 );
1021 assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1022 return tool_call_authorization;
1023 }
1024 }
1025}
1026
1027#[test]
1028fn test_permission_options_terminal_with_pattern() {
1029 let permission_options = ToolPermissionContext::new(
1030 TerminalTool::NAME,
1031 vec!["cargo build --release".to_string()],
1032 )
1033 .build_permission_options();
1034
1035 let PermissionOptions::Dropdown(choices) = permission_options else {
1036 panic!("Expected dropdown permission options");
1037 };
1038
1039 assert_eq!(choices.len(), 3);
1040 let labels: Vec<&str> = choices
1041 .iter()
1042 .map(|choice| choice.allow.name.as_ref())
1043 .collect();
1044 assert!(labels.contains(&"Always for terminal"));
1045 assert!(labels.contains(&"Always for `cargo build` commands"));
1046 assert!(labels.contains(&"Only this time"));
1047}
1048
1049#[test]
1050fn test_permission_options_terminal_command_with_flag_second_token() {
1051 let permission_options =
1052 ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1053 .build_permission_options();
1054
1055 let PermissionOptions::Dropdown(choices) = permission_options else {
1056 panic!("Expected dropdown permission options");
1057 };
1058
1059 assert_eq!(choices.len(), 3);
1060 let labels: Vec<&str> = choices
1061 .iter()
1062 .map(|choice| choice.allow.name.as_ref())
1063 .collect();
1064 assert!(labels.contains(&"Always for terminal"));
1065 assert!(labels.contains(&"Always for `ls` commands"));
1066 assert!(labels.contains(&"Only this time"));
1067}
1068
1069#[test]
1070fn test_permission_options_terminal_single_word_command() {
1071 let permission_options =
1072 ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1073 .build_permission_options();
1074
1075 let PermissionOptions::Dropdown(choices) = permission_options else {
1076 panic!("Expected dropdown permission options");
1077 };
1078
1079 assert_eq!(choices.len(), 3);
1080 let labels: Vec<&str> = choices
1081 .iter()
1082 .map(|choice| choice.allow.name.as_ref())
1083 .collect();
1084 assert!(labels.contains(&"Always for terminal"));
1085 assert!(labels.contains(&"Always for `whoami` commands"));
1086 assert!(labels.contains(&"Only this time"));
1087}
1088
1089#[test]
1090fn test_permission_options_edit_file_with_path_pattern() {
1091 let permission_options =
1092 ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1093 .build_permission_options();
1094
1095 let PermissionOptions::Dropdown(choices) = permission_options else {
1096 panic!("Expected dropdown permission options");
1097 };
1098
1099 let labels: Vec<&str> = choices
1100 .iter()
1101 .map(|choice| choice.allow.name.as_ref())
1102 .collect();
1103 assert!(labels.contains(&"Always for edit file"));
1104 assert!(labels.contains(&"Always for `src/`"));
1105}
1106
1107#[test]
1108fn test_permission_options_fetch_with_domain_pattern() {
1109 let permission_options =
1110 ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1111 .build_permission_options();
1112
1113 let PermissionOptions::Dropdown(choices) = permission_options else {
1114 panic!("Expected dropdown permission options");
1115 };
1116
1117 let labels: Vec<&str> = choices
1118 .iter()
1119 .map(|choice| choice.allow.name.as_ref())
1120 .collect();
1121 assert!(labels.contains(&"Always for fetch"));
1122 assert!(labels.contains(&"Always for `docs.rs`"));
1123}
1124
1125#[test]
1126fn test_permission_options_without_pattern() {
1127 let permission_options = ToolPermissionContext::new(
1128 TerminalTool::NAME,
1129 vec!["./deploy.sh --production".to_string()],
1130 )
1131 .build_permission_options();
1132
1133 let PermissionOptions::Dropdown(choices) = permission_options else {
1134 panic!("Expected dropdown permission options");
1135 };
1136
1137 assert_eq!(choices.len(), 2);
1138 let labels: Vec<&str> = choices
1139 .iter()
1140 .map(|choice| choice.allow.name.as_ref())
1141 .collect();
1142 assert!(labels.contains(&"Always for terminal"));
1143 assert!(labels.contains(&"Only this time"));
1144 assert!(!labels.iter().any(|label| label.contains("commands")));
1145}
1146
1147#[test]
1148fn test_permission_options_symlink_target_are_flat_once_only() {
1149 let permission_options =
1150 ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1151 .build_permission_options();
1152
1153 let PermissionOptions::Flat(options) = permission_options else {
1154 panic!("Expected flat permission options for symlink target authorization");
1155 };
1156
1157 assert_eq!(options.len(), 2);
1158 assert!(options.iter().any(|option| {
1159 option.option_id.0.as_ref() == "allow"
1160 && option.kind == acp::PermissionOptionKind::AllowOnce
1161 }));
1162 assert!(options.iter().any(|option| {
1163 option.option_id.0.as_ref() == "deny"
1164 && option.kind == acp::PermissionOptionKind::RejectOnce
1165 }));
1166}
1167
1168#[test]
1169fn test_permission_option_ids_for_terminal() {
1170 let permission_options = ToolPermissionContext::new(
1171 TerminalTool::NAME,
1172 vec!["cargo build --release".to_string()],
1173 )
1174 .build_permission_options();
1175
1176 let PermissionOptions::Dropdown(choices) = permission_options else {
1177 panic!("Expected dropdown permission options");
1178 };
1179
1180 let allow_ids: Vec<String> = choices
1181 .iter()
1182 .map(|choice| choice.allow.option_id.0.to_string())
1183 .collect();
1184 let deny_ids: Vec<String> = choices
1185 .iter()
1186 .map(|choice| choice.deny.option_id.0.to_string())
1187 .collect();
1188
1189 assert!(allow_ids.contains(&"always_allow:terminal".to_string()));
1190 assert!(allow_ids.contains(&"allow".to_string()));
1191 assert!(
1192 allow_ids
1193 .iter()
1194 .any(|id| id.starts_with("always_allow_pattern:terminal\n")),
1195 "Missing allow pattern option"
1196 );
1197
1198 assert!(deny_ids.contains(&"always_deny:terminal".to_string()));
1199 assert!(deny_ids.contains(&"deny".to_string()));
1200 assert!(
1201 deny_ids
1202 .iter()
1203 .any(|id| id.starts_with("always_deny_pattern:terminal\n")),
1204 "Missing deny pattern option"
1205 );
1206}
1207
1208#[gpui::test]
1209#[cfg_attr(not(feature = "e2e"), ignore)]
1210async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1211 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1212
1213 // Test concurrent tool calls with different delay times
1214 let events = thread
1215 .update(cx, |thread, cx| {
1216 thread.add_tool(DelayTool);
1217 thread.send(
1218 UserMessageId::new(),
1219 [
1220 "Call the delay tool twice in the same message.",
1221 "Once with 100ms. Once with 300ms.",
1222 "When both timers are complete, describe the outputs.",
1223 ],
1224 cx,
1225 )
1226 })
1227 .unwrap()
1228 .collect()
1229 .await;
1230
1231 let stop_reasons = stop_events(events);
1232 assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1233
1234 thread.update(cx, |thread, _cx| {
1235 let last_message = thread.last_received_or_pending_message().unwrap();
1236 let agent_message = last_message.as_agent_message().unwrap();
1237 let text = agent_message
1238 .content
1239 .iter()
1240 .filter_map(|content| {
1241 if let AgentMessageContent::Text(text) = content {
1242 Some(text.as_str())
1243 } else {
1244 None
1245 }
1246 })
1247 .collect::<String>();
1248
1249 assert!(text.contains("Ding"));
1250 });
1251}
1252
1253#[gpui::test]
1254async fn test_profiles(cx: &mut TestAppContext) {
1255 let ThreadTest {
1256 model, thread, fs, ..
1257 } = setup(cx, TestModel::Fake).await;
1258 let fake_model = model.as_fake();
1259
1260 thread.update(cx, |thread, _cx| {
1261 thread.add_tool(DelayTool);
1262 thread.add_tool(EchoTool);
1263 thread.add_tool(InfiniteTool);
1264 });
1265
1266 // Override profiles and wait for settings to be loaded.
1267 fs.insert_file(
1268 paths::settings_file(),
1269 json!({
1270 "agent": {
1271 "profiles": {
1272 "test-1": {
1273 "name": "Test Profile 1",
1274 "tools": {
1275 EchoTool::NAME: true,
1276 DelayTool::NAME: true,
1277 }
1278 },
1279 "test-2": {
1280 "name": "Test Profile 2",
1281 "tools": {
1282 InfiniteTool::NAME: true,
1283 }
1284 }
1285 }
1286 }
1287 })
1288 .to_string()
1289 .into_bytes(),
1290 )
1291 .await;
1292 cx.run_until_parked();
1293
1294 // Test that test-1 profile (default) has echo and delay tools
1295 thread
1296 .update(cx, |thread, cx| {
1297 thread.set_profile(AgentProfileId("test-1".into()), cx);
1298 thread.send(UserMessageId::new(), ["test"], cx)
1299 })
1300 .unwrap();
1301 cx.run_until_parked();
1302
1303 let mut pending_completions = fake_model.pending_completions();
1304 assert_eq!(pending_completions.len(), 1);
1305 let completion = pending_completions.pop().unwrap();
1306 let tool_names: Vec<String> = completion
1307 .tools
1308 .iter()
1309 .map(|tool| tool.name.clone())
1310 .collect();
1311 assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1312 fake_model.end_last_completion_stream();
1313
1314 // Switch to test-2 profile, and verify that it has only the infinite tool.
1315 thread
1316 .update(cx, |thread, cx| {
1317 thread.set_profile(AgentProfileId("test-2".into()), cx);
1318 thread.send(UserMessageId::new(), ["test2"], cx)
1319 })
1320 .unwrap();
1321 cx.run_until_parked();
1322 let mut pending_completions = fake_model.pending_completions();
1323 assert_eq!(pending_completions.len(), 1);
1324 let completion = pending_completions.pop().unwrap();
1325 let tool_names: Vec<String> = completion
1326 .tools
1327 .iter()
1328 .map(|tool| tool.name.clone())
1329 .collect();
1330 assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1331}
1332
1333#[gpui::test]
1334async fn test_mcp_tools(cx: &mut TestAppContext) {
1335 let ThreadTest {
1336 model,
1337 thread,
1338 context_server_store,
1339 fs,
1340 ..
1341 } = setup(cx, TestModel::Fake).await;
1342 let fake_model = model.as_fake();
1343
1344 // Override profiles and wait for settings to be loaded.
1345 fs.insert_file(
1346 paths::settings_file(),
1347 json!({
1348 "agent": {
1349 "tool_permissions": { "default": "allow" },
1350 "profiles": {
1351 "test": {
1352 "name": "Test Profile",
1353 "enable_all_context_servers": true,
1354 "tools": {
1355 EchoTool::NAME: true,
1356 }
1357 },
1358 }
1359 }
1360 })
1361 .to_string()
1362 .into_bytes(),
1363 )
1364 .await;
1365 cx.run_until_parked();
1366 thread.update(cx, |thread, cx| {
1367 thread.set_profile(AgentProfileId("test".into()), cx)
1368 });
1369
1370 let mut mcp_tool_calls = setup_context_server(
1371 "test_server",
1372 vec![context_server::types::Tool {
1373 name: "echo".into(),
1374 description: None,
1375 input_schema: serde_json::to_value(EchoTool::input_schema(
1376 LanguageModelToolSchemaFormat::JsonSchema,
1377 ))
1378 .unwrap(),
1379 output_schema: None,
1380 annotations: None,
1381 }],
1382 &context_server_store,
1383 cx,
1384 );
1385
1386 let events = thread.update(cx, |thread, cx| {
1387 thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1388 });
1389 cx.run_until_parked();
1390
1391 // Simulate the model calling the MCP tool.
1392 let completion = fake_model.pending_completions().pop().unwrap();
1393 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1394 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1395 LanguageModelToolUse {
1396 id: "tool_1".into(),
1397 name: "echo".into(),
1398 raw_input: json!({"text": "test"}).to_string(),
1399 input: json!({"text": "test"}),
1400 is_input_complete: true,
1401 thought_signature: None,
1402 },
1403 ));
1404 fake_model.end_last_completion_stream();
1405 cx.run_until_parked();
1406
1407 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1408 assert_eq!(tool_call_params.name, "echo");
1409 assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1410 tool_call_response
1411 .send(context_server::types::CallToolResponse {
1412 content: vec![context_server::types::ToolResponseContent::Text {
1413 text: "test".into(),
1414 }],
1415 is_error: None,
1416 meta: None,
1417 structured_content: None,
1418 })
1419 .unwrap();
1420 cx.run_until_parked();
1421
1422 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1423 fake_model.send_last_completion_stream_text_chunk("Done!");
1424 fake_model.end_last_completion_stream();
1425 events.collect::<Vec<_>>().await;
1426
1427 // Send again after adding the echo tool, ensuring the name collision is resolved.
1428 let events = thread.update(cx, |thread, cx| {
1429 thread.add_tool(EchoTool);
1430 thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1431 });
1432 cx.run_until_parked();
1433 let completion = fake_model.pending_completions().pop().unwrap();
1434 assert_eq!(
1435 tool_names_for_completion(&completion),
1436 vec!["echo", "test_server_echo"]
1437 );
1438 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1439 LanguageModelToolUse {
1440 id: "tool_2".into(),
1441 name: "test_server_echo".into(),
1442 raw_input: json!({"text": "mcp"}).to_string(),
1443 input: json!({"text": "mcp"}),
1444 is_input_complete: true,
1445 thought_signature: None,
1446 },
1447 ));
1448 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1449 LanguageModelToolUse {
1450 id: "tool_3".into(),
1451 name: "echo".into(),
1452 raw_input: json!({"text": "native"}).to_string(),
1453 input: json!({"text": "native"}),
1454 is_input_complete: true,
1455 thought_signature: None,
1456 },
1457 ));
1458 fake_model.end_last_completion_stream();
1459 cx.run_until_parked();
1460
1461 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1462 assert_eq!(tool_call_params.name, "echo");
1463 assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1464 tool_call_response
1465 .send(context_server::types::CallToolResponse {
1466 content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1467 is_error: None,
1468 meta: None,
1469 structured_content: None,
1470 })
1471 .unwrap();
1472 cx.run_until_parked();
1473
1474 // Ensure the tool results were inserted with the correct names.
1475 let completion = fake_model.pending_completions().pop().unwrap();
1476 assert_eq!(
1477 completion.messages.last().unwrap().content,
1478 vec![
1479 MessageContent::ToolResult(LanguageModelToolResult {
1480 tool_use_id: "tool_3".into(),
1481 tool_name: "echo".into(),
1482 is_error: false,
1483 content: "native".into(),
1484 output: Some("native".into()),
1485 },),
1486 MessageContent::ToolResult(LanguageModelToolResult {
1487 tool_use_id: "tool_2".into(),
1488 tool_name: "test_server_echo".into(),
1489 is_error: false,
1490 content: "mcp".into(),
1491 output: Some("mcp".into()),
1492 },),
1493 ]
1494 );
1495 fake_model.end_last_completion_stream();
1496 events.collect::<Vec<_>>().await;
1497}
1498
1499#[gpui::test]
1500async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1501 let ThreadTest {
1502 model,
1503 thread,
1504 context_server_store,
1505 fs,
1506 ..
1507 } = setup(cx, TestModel::Fake).await;
1508 let fake_model = model.as_fake();
1509
1510 // Setup settings to allow MCP tools
1511 fs.insert_file(
1512 paths::settings_file(),
1513 json!({
1514 "agent": {
1515 "always_allow_tool_actions": true,
1516 "profiles": {
1517 "test": {
1518 "name": "Test Profile",
1519 "enable_all_context_servers": true,
1520 "tools": {}
1521 },
1522 }
1523 }
1524 })
1525 .to_string()
1526 .into_bytes(),
1527 )
1528 .await;
1529 cx.run_until_parked();
1530 thread.update(cx, |thread, cx| {
1531 thread.set_profile(AgentProfileId("test".into()), cx)
1532 });
1533
1534 // Setup a context server with a tool
1535 let mut mcp_tool_calls = setup_context_server(
1536 "github_server",
1537 vec![context_server::types::Tool {
1538 name: "issue_read".into(),
1539 description: Some("Read a GitHub issue".into()),
1540 input_schema: json!({
1541 "type": "object",
1542 "properties": {
1543 "issue_url": { "type": "string" }
1544 }
1545 }),
1546 output_schema: None,
1547 annotations: None,
1548 }],
1549 &context_server_store,
1550 cx,
1551 );
1552
1553 // Send a message and have the model call the MCP tool
1554 let events = thread.update(cx, |thread, cx| {
1555 thread
1556 .send(UserMessageId::new(), ["Read issue #47404"], cx)
1557 .unwrap()
1558 });
1559 cx.run_until_parked();
1560
1561 // Verify the MCP tool is available to the model
1562 let completion = fake_model.pending_completions().pop().unwrap();
1563 assert_eq!(
1564 tool_names_for_completion(&completion),
1565 vec!["issue_read"],
1566 "MCP tool should be available"
1567 );
1568
1569 // Simulate the model calling the MCP tool
1570 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1571 LanguageModelToolUse {
1572 id: "tool_1".into(),
1573 name: "issue_read".into(),
1574 raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1575 .to_string(),
1576 input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1577 is_input_complete: true,
1578 thought_signature: None,
1579 },
1580 ));
1581 fake_model.end_last_completion_stream();
1582 cx.run_until_parked();
1583
1584 // The MCP server receives the tool call and responds with content
1585 let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1586 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1587 assert_eq!(tool_call_params.name, "issue_read");
1588 tool_call_response
1589 .send(context_server::types::CallToolResponse {
1590 content: vec![context_server::types::ToolResponseContent::Text {
1591 text: expected_tool_output.into(),
1592 }],
1593 is_error: None,
1594 meta: None,
1595 structured_content: None,
1596 })
1597 .unwrap();
1598 cx.run_until_parked();
1599
1600 // After tool completes, the model continues with a new completion request
1601 // that includes the tool results. We need to respond to this.
1602 let _completion = fake_model.pending_completions().pop().unwrap();
1603 fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1604 fake_model
1605 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1606 fake_model.end_last_completion_stream();
1607 events.collect::<Vec<_>>().await;
1608
1609 // Verify the tool result is stored in the thread by checking the markdown output.
1610 // The tool result is in the first assistant message (not the last one, which is
1611 // the model's response after the tool completed).
1612 thread.update(cx, |thread, _cx| {
1613 let markdown = thread.to_markdown();
1614 assert!(
1615 markdown.contains("**Tool Result**: issue_read"),
1616 "Thread should contain tool result header"
1617 );
1618 assert!(
1619 markdown.contains(expected_tool_output),
1620 "Thread should contain tool output: {}",
1621 expected_tool_output
1622 );
1623 });
1624
1625 // Simulate app restart: disconnect the MCP server.
1626 // After restart, the MCP server won't be connected yet when the thread is replayed.
1627 context_server_store.update(cx, |store, cx| {
1628 let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1629 });
1630 cx.run_until_parked();
1631
1632 // Replay the thread (this is what happens when loading a saved thread)
1633 let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1634
1635 let mut found_tool_call = None;
1636 let mut found_tool_call_update_with_output = None;
1637
1638 while let Some(event) = replay_events.next().await {
1639 let event = event.unwrap();
1640 match &event {
1641 ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1642 found_tool_call = Some(tc.clone());
1643 }
1644 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1645 if update.tool_call_id.to_string() == "tool_1" =>
1646 {
1647 if update.fields.raw_output.is_some() {
1648 found_tool_call_update_with_output = Some(update.clone());
1649 }
1650 }
1651 _ => {}
1652 }
1653 }
1654
1655 // The tool call should be found
1656 assert!(
1657 found_tool_call.is_some(),
1658 "Tool call should be emitted during replay"
1659 );
1660
1661 assert!(
1662 found_tool_call_update_with_output.is_some(),
1663 "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1664 );
1665
1666 let update = found_tool_call_update_with_output.unwrap();
1667 assert_eq!(
1668 update.fields.raw_output,
1669 Some(expected_tool_output.into()),
1670 "raw_output should contain the saved tool result"
1671 );
1672
1673 // Also verify the status is correct (completed, not failed)
1674 assert_eq!(
1675 update.fields.status,
1676 Some(acp::ToolCallStatus::Completed),
1677 "Tool call status should reflect the original completion status"
1678 );
1679}
1680
1681#[gpui::test]
1682async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1683 let ThreadTest {
1684 model,
1685 thread,
1686 context_server_store,
1687 fs,
1688 ..
1689 } = setup(cx, TestModel::Fake).await;
1690 let fake_model = model.as_fake();
1691
1692 // Set up a profile with all tools enabled
1693 fs.insert_file(
1694 paths::settings_file(),
1695 json!({
1696 "agent": {
1697 "profiles": {
1698 "test": {
1699 "name": "Test Profile",
1700 "enable_all_context_servers": true,
1701 "tools": {
1702 EchoTool::NAME: true,
1703 DelayTool::NAME: true,
1704 WordListTool::NAME: true,
1705 ToolRequiringPermission::NAME: true,
1706 InfiniteTool::NAME: true,
1707 }
1708 },
1709 }
1710 }
1711 })
1712 .to_string()
1713 .into_bytes(),
1714 )
1715 .await;
1716 cx.run_until_parked();
1717
1718 thread.update(cx, |thread, cx| {
1719 thread.set_profile(AgentProfileId("test".into()), cx);
1720 thread.add_tool(EchoTool);
1721 thread.add_tool(DelayTool);
1722 thread.add_tool(WordListTool);
1723 thread.add_tool(ToolRequiringPermission);
1724 thread.add_tool(InfiniteTool);
1725 });
1726
1727 // Set up multiple context servers with some overlapping tool names
1728 let _server1_calls = setup_context_server(
1729 "xxx",
1730 vec![
1731 context_server::types::Tool {
1732 name: "echo".into(), // Conflicts with native EchoTool
1733 description: None,
1734 input_schema: serde_json::to_value(EchoTool::input_schema(
1735 LanguageModelToolSchemaFormat::JsonSchema,
1736 ))
1737 .unwrap(),
1738 output_schema: None,
1739 annotations: None,
1740 },
1741 context_server::types::Tool {
1742 name: "unique_tool_1".into(),
1743 description: None,
1744 input_schema: json!({"type": "object", "properties": {}}),
1745 output_schema: None,
1746 annotations: None,
1747 },
1748 ],
1749 &context_server_store,
1750 cx,
1751 );
1752
1753 let _server2_calls = setup_context_server(
1754 "yyy",
1755 vec![
1756 context_server::types::Tool {
1757 name: "echo".into(), // Also conflicts with native EchoTool
1758 description: None,
1759 input_schema: serde_json::to_value(EchoTool::input_schema(
1760 LanguageModelToolSchemaFormat::JsonSchema,
1761 ))
1762 .unwrap(),
1763 output_schema: None,
1764 annotations: None,
1765 },
1766 context_server::types::Tool {
1767 name: "unique_tool_2".into(),
1768 description: None,
1769 input_schema: json!({"type": "object", "properties": {}}),
1770 output_schema: None,
1771 annotations: None,
1772 },
1773 context_server::types::Tool {
1774 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1775 description: None,
1776 input_schema: json!({"type": "object", "properties": {}}),
1777 output_schema: None,
1778 annotations: None,
1779 },
1780 context_server::types::Tool {
1781 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1782 description: None,
1783 input_schema: json!({"type": "object", "properties": {}}),
1784 output_schema: None,
1785 annotations: None,
1786 },
1787 ],
1788 &context_server_store,
1789 cx,
1790 );
1791 let _server3_calls = setup_context_server(
1792 "zzz",
1793 vec![
1794 context_server::types::Tool {
1795 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1796 description: None,
1797 input_schema: json!({"type": "object", "properties": {}}),
1798 output_schema: None,
1799 annotations: None,
1800 },
1801 context_server::types::Tool {
1802 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1803 description: None,
1804 input_schema: json!({"type": "object", "properties": {}}),
1805 output_schema: None,
1806 annotations: None,
1807 },
1808 context_server::types::Tool {
1809 name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1810 description: None,
1811 input_schema: json!({"type": "object", "properties": {}}),
1812 output_schema: None,
1813 annotations: None,
1814 },
1815 ],
1816 &context_server_store,
1817 cx,
1818 );
1819
1820 // Server with spaces in name - tests snake_case conversion for API compatibility
1821 let _server4_calls = setup_context_server(
1822 "Azure DevOps",
1823 vec![context_server::types::Tool {
1824 name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1825 description: None,
1826 input_schema: serde_json::to_value(EchoTool::input_schema(
1827 LanguageModelToolSchemaFormat::JsonSchema,
1828 ))
1829 .unwrap(),
1830 output_schema: None,
1831 annotations: None,
1832 }],
1833 &context_server_store,
1834 cx,
1835 );
1836
1837 thread
1838 .update(cx, |thread, cx| {
1839 thread.send(UserMessageId::new(), ["Go"], cx)
1840 })
1841 .unwrap();
1842 cx.run_until_parked();
1843 let completion = fake_model.pending_completions().pop().unwrap();
1844 assert_eq!(
1845 tool_names_for_completion(&completion),
1846 vec![
1847 "azure_dev_ops_echo",
1848 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1849 "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1850 "delay",
1851 "echo",
1852 "infinite",
1853 "tool_requiring_permission",
1854 "unique_tool_1",
1855 "unique_tool_2",
1856 "word_list",
1857 "xxx_echo",
1858 "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1859 "yyy_echo",
1860 "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1861 ]
1862 );
1863}
1864
1865#[gpui::test]
1866#[cfg_attr(not(feature = "e2e"), ignore)]
1867async fn test_cancellation(cx: &mut TestAppContext) {
1868 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1869
1870 let mut events = thread
1871 .update(cx, |thread, cx| {
1872 thread.add_tool(InfiniteTool);
1873 thread.add_tool(EchoTool);
1874 thread.send(
1875 UserMessageId::new(),
1876 ["Call the echo tool, then call the infinite tool, then explain their output"],
1877 cx,
1878 )
1879 })
1880 .unwrap();
1881
1882 // Wait until both tools are called.
1883 let mut expected_tools = vec!["Echo", "Infinite Tool"];
1884 let mut echo_id = None;
1885 let mut echo_completed = false;
1886 while let Some(event) = events.next().await {
1887 match event.unwrap() {
1888 ThreadEvent::ToolCall(tool_call) => {
1889 assert_eq!(tool_call.title, expected_tools.remove(0));
1890 if tool_call.title == "Echo" {
1891 echo_id = Some(tool_call.tool_call_id);
1892 }
1893 }
1894 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1895 acp::ToolCallUpdate {
1896 tool_call_id,
1897 fields:
1898 acp::ToolCallUpdateFields {
1899 status: Some(acp::ToolCallStatus::Completed),
1900 ..
1901 },
1902 ..
1903 },
1904 )) if Some(&tool_call_id) == echo_id.as_ref() => {
1905 echo_completed = true;
1906 }
1907 _ => {}
1908 }
1909
1910 if expected_tools.is_empty() && echo_completed {
1911 break;
1912 }
1913 }
1914
1915 // Cancel the current send and ensure that the event stream is closed, even
1916 // if one of the tools is still running.
1917 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1918 let events = events.collect::<Vec<_>>().await;
1919 let last_event = events.last();
1920 assert!(
1921 matches!(
1922 last_event,
1923 Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1924 ),
1925 "unexpected event {last_event:?}"
1926 );
1927
1928 // Ensure we can still send a new message after cancellation.
1929 let events = thread
1930 .update(cx, |thread, cx| {
1931 thread.send(
1932 UserMessageId::new(),
1933 ["Testing: reply with 'Hello' then stop."],
1934 cx,
1935 )
1936 })
1937 .unwrap()
1938 .collect::<Vec<_>>()
1939 .await;
1940 thread.update(cx, |thread, _cx| {
1941 let message = thread.last_received_or_pending_message().unwrap();
1942 let agent_message = message.as_agent_message().unwrap();
1943 assert_eq!(
1944 agent_message.content,
1945 vec![AgentMessageContent::Text("Hello".to_string())]
1946 );
1947 });
1948 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
1949}
1950
1951#[gpui::test]
1952async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
1953 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
1954 always_allow_tools(cx);
1955 let fake_model = model.as_fake();
1956
1957 let environment = Rc::new(cx.update(|cx| {
1958 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
1959 }));
1960 let handle = environment.terminal_handle.clone().unwrap();
1961
1962 let mut events = thread
1963 .update(cx, |thread, cx| {
1964 thread.add_tool(crate::TerminalTool::new(
1965 thread.project().clone(),
1966 environment,
1967 ));
1968 thread.send(UserMessageId::new(), ["run a command"], cx)
1969 })
1970 .unwrap();
1971
1972 cx.run_until_parked();
1973
1974 // Simulate the model calling the terminal tool
1975 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1976 LanguageModelToolUse {
1977 id: "terminal_tool_1".into(),
1978 name: TerminalTool::NAME.into(),
1979 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
1980 input: json!({"command": "sleep 1000", "cd": "."}),
1981 is_input_complete: true,
1982 thought_signature: None,
1983 },
1984 ));
1985 fake_model.end_last_completion_stream();
1986
1987 // Wait for the terminal tool to start running
1988 wait_for_terminal_tool_started(&mut events, cx).await;
1989
1990 // Cancel the thread while the terminal is running
1991 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
1992
1993 // Collect remaining events, driving the executor to let cancellation complete
1994 let remaining_events = collect_events_until_stop(&mut events, cx).await;
1995
1996 // Verify the terminal was killed
1997 assert!(
1998 handle.was_killed(),
1999 "expected terminal handle to be killed on cancellation"
2000 );
2001
2002 // Verify we got a cancellation stop event
2003 assert_eq!(
2004 stop_events(remaining_events),
2005 vec![acp::StopReason::Cancelled],
2006 );
2007
2008 // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2009 thread.update(cx, |thread, _cx| {
2010 let message = thread.last_received_or_pending_message().unwrap();
2011 let agent_message = message.as_agent_message().unwrap();
2012
2013 let tool_use = agent_message
2014 .content
2015 .iter()
2016 .find_map(|content| match content {
2017 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2018 _ => None,
2019 })
2020 .expect("expected tool use in agent message");
2021
2022 let tool_result = agent_message
2023 .tool_results
2024 .get(&tool_use.id)
2025 .expect("expected tool result");
2026
2027 let result_text = match &tool_result.content {
2028 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2029 _ => panic!("expected text content in tool result"),
2030 };
2031
2032 // "partial output" comes from FakeTerminalHandle's output field
2033 assert!(
2034 result_text.contains("partial output"),
2035 "expected tool result to contain terminal output, got: {result_text}"
2036 );
2037 // Match the actual format from process_content in terminal_tool.rs
2038 assert!(
2039 result_text.contains("The user stopped this command"),
2040 "expected tool result to indicate user stopped, got: {result_text}"
2041 );
2042 });
2043
2044 // Verify we can send a new message after cancellation
2045 verify_thread_recovery(&thread, &fake_model, cx).await;
2046}
2047
2048#[gpui::test]
2049async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2050 // This test verifies that tools which properly handle cancellation via
2051 // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2052 // to cancellation and report that they were cancelled.
2053 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2054 always_allow_tools(cx);
2055 let fake_model = model.as_fake();
2056
2057 let (tool, was_cancelled) = CancellationAwareTool::new();
2058
2059 let mut events = thread
2060 .update(cx, |thread, cx| {
2061 thread.add_tool(tool);
2062 thread.send(
2063 UserMessageId::new(),
2064 ["call the cancellation aware tool"],
2065 cx,
2066 )
2067 })
2068 .unwrap();
2069
2070 cx.run_until_parked();
2071
2072 // Simulate the model calling the cancellation-aware tool
2073 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2074 LanguageModelToolUse {
2075 id: "cancellation_aware_1".into(),
2076 name: "cancellation_aware".into(),
2077 raw_input: r#"{}"#.into(),
2078 input: json!({}),
2079 is_input_complete: true,
2080 thought_signature: None,
2081 },
2082 ));
2083 fake_model.end_last_completion_stream();
2084
2085 cx.run_until_parked();
2086
2087 // Wait for the tool call to be reported
2088 let mut tool_started = false;
2089 let deadline = cx.executor().num_cpus() * 100;
2090 for _ in 0..deadline {
2091 cx.run_until_parked();
2092
2093 while let Some(Some(event)) = events.next().now_or_never() {
2094 if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2095 if tool_call.title == "Cancellation Aware Tool" {
2096 tool_started = true;
2097 break;
2098 }
2099 }
2100 }
2101
2102 if tool_started {
2103 break;
2104 }
2105
2106 cx.background_executor
2107 .timer(Duration::from_millis(10))
2108 .await;
2109 }
2110 assert!(tool_started, "expected cancellation aware tool to start");
2111
2112 // Cancel the thread and wait for it to complete
2113 let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2114
2115 // The cancel task should complete promptly because the tool handles cancellation
2116 let timeout = cx.background_executor.timer(Duration::from_secs(5));
2117 futures::select! {
2118 _ = cancel_task.fuse() => {}
2119 _ = timeout.fuse() => {
2120 panic!("cancel task timed out - tool did not respond to cancellation");
2121 }
2122 }
2123
2124 // Verify the tool detected cancellation via its flag
2125 assert!(
2126 was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2127 "tool should have detected cancellation via event_stream.cancelled_by_user()"
2128 );
2129
2130 // Collect remaining events
2131 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2132
2133 // Verify we got a cancellation stop event
2134 assert_eq!(
2135 stop_events(remaining_events),
2136 vec![acp::StopReason::Cancelled],
2137 );
2138
2139 // Verify we can send a new message after cancellation
2140 verify_thread_recovery(&thread, &fake_model, cx).await;
2141}
2142
2143/// Helper to verify thread can recover after cancellation by sending a simple message.
2144async fn verify_thread_recovery(
2145 thread: &Entity<Thread>,
2146 fake_model: &FakeLanguageModel,
2147 cx: &mut TestAppContext,
2148) {
2149 let events = thread
2150 .update(cx, |thread, cx| {
2151 thread.send(
2152 UserMessageId::new(),
2153 ["Testing: reply with 'Hello' then stop."],
2154 cx,
2155 )
2156 })
2157 .unwrap();
2158 cx.run_until_parked();
2159 fake_model.send_last_completion_stream_text_chunk("Hello");
2160 fake_model
2161 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2162 fake_model.end_last_completion_stream();
2163
2164 let events = events.collect::<Vec<_>>().await;
2165 thread.update(cx, |thread, _cx| {
2166 let message = thread.last_received_or_pending_message().unwrap();
2167 let agent_message = message.as_agent_message().unwrap();
2168 assert_eq!(
2169 agent_message.content,
2170 vec![AgentMessageContent::Text("Hello".to_string())]
2171 );
2172 });
2173 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2174}
2175
2176/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2177async fn wait_for_terminal_tool_started(
2178 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2179 cx: &mut TestAppContext,
2180) {
2181 let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2182 for _ in 0..deadline {
2183 cx.run_until_parked();
2184
2185 while let Some(Some(event)) = events.next().now_or_never() {
2186 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2187 update,
2188 ))) = &event
2189 {
2190 if update.fields.content.as_ref().is_some_and(|content| {
2191 content
2192 .iter()
2193 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2194 }) {
2195 return;
2196 }
2197 }
2198 }
2199
2200 cx.background_executor
2201 .timer(Duration::from_millis(10))
2202 .await;
2203 }
2204 panic!("terminal tool did not start within the expected time");
2205}
2206
2207/// Collects events until a Stop event is received, driving the executor to completion.
2208async fn collect_events_until_stop(
2209 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2210 cx: &mut TestAppContext,
2211) -> Vec<Result<ThreadEvent>> {
2212 let mut collected = Vec::new();
2213 let deadline = cx.executor().num_cpus() * 200;
2214
2215 for _ in 0..deadline {
2216 cx.executor().advance_clock(Duration::from_millis(10));
2217 cx.run_until_parked();
2218
2219 while let Some(Some(event)) = events.next().now_or_never() {
2220 let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2221 collected.push(event);
2222 if is_stop {
2223 return collected;
2224 }
2225 }
2226 }
2227 panic!(
2228 "did not receive Stop event within the expected time; collected {} events",
2229 collected.len()
2230 );
2231}
2232
2233#[gpui::test]
2234async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2235 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2236 always_allow_tools(cx);
2237 let fake_model = model.as_fake();
2238
2239 let environment = Rc::new(cx.update(|cx| {
2240 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2241 }));
2242 let handle = environment.terminal_handle.clone().unwrap();
2243
2244 let message_id = UserMessageId::new();
2245 let mut events = thread
2246 .update(cx, |thread, cx| {
2247 thread.add_tool(crate::TerminalTool::new(
2248 thread.project().clone(),
2249 environment,
2250 ));
2251 thread.send(message_id.clone(), ["run a command"], cx)
2252 })
2253 .unwrap();
2254
2255 cx.run_until_parked();
2256
2257 // Simulate the model calling the terminal tool
2258 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2259 LanguageModelToolUse {
2260 id: "terminal_tool_1".into(),
2261 name: TerminalTool::NAME.into(),
2262 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2263 input: json!({"command": "sleep 1000", "cd": "."}),
2264 is_input_complete: true,
2265 thought_signature: None,
2266 },
2267 ));
2268 fake_model.end_last_completion_stream();
2269
2270 // Wait for the terminal tool to start running
2271 wait_for_terminal_tool_started(&mut events, cx).await;
2272
2273 // Truncate the thread while the terminal is running
2274 thread
2275 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2276 .unwrap();
2277
2278 // Drive the executor to let cancellation complete
2279 let _ = collect_events_until_stop(&mut events, cx).await;
2280
2281 // Verify the terminal was killed
2282 assert!(
2283 handle.was_killed(),
2284 "expected terminal handle to be killed on truncate"
2285 );
2286
2287 // Verify the thread is empty after truncation
2288 thread.update(cx, |thread, _cx| {
2289 assert_eq!(
2290 thread.to_markdown(),
2291 "",
2292 "expected thread to be empty after truncating the only message"
2293 );
2294 });
2295
2296 // Verify we can send a new message after truncation
2297 verify_thread_recovery(&thread, &fake_model, cx).await;
2298}
2299
2300#[gpui::test]
2301async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2302 // Tests that cancellation properly kills all running terminal tools when multiple are active.
2303 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2304 always_allow_tools(cx);
2305 let fake_model = model.as_fake();
2306
2307 let environment = Rc::new(MultiTerminalEnvironment::new());
2308
2309 let mut events = thread
2310 .update(cx, |thread, cx| {
2311 thread.add_tool(crate::TerminalTool::new(
2312 thread.project().clone(),
2313 environment.clone(),
2314 ));
2315 thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2316 })
2317 .unwrap();
2318
2319 cx.run_until_parked();
2320
2321 // Simulate the model calling two terminal tools
2322 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2323 LanguageModelToolUse {
2324 id: "terminal_tool_1".into(),
2325 name: TerminalTool::NAME.into(),
2326 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2327 input: json!({"command": "sleep 1000", "cd": "."}),
2328 is_input_complete: true,
2329 thought_signature: None,
2330 },
2331 ));
2332 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2333 LanguageModelToolUse {
2334 id: "terminal_tool_2".into(),
2335 name: TerminalTool::NAME.into(),
2336 raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2337 input: json!({"command": "sleep 2000", "cd": "."}),
2338 is_input_complete: true,
2339 thought_signature: None,
2340 },
2341 ));
2342 fake_model.end_last_completion_stream();
2343
2344 // Wait for both terminal tools to start by counting terminal content updates
2345 let mut terminals_started = 0;
2346 let deadline = cx.executor().num_cpus() * 100;
2347 for _ in 0..deadline {
2348 cx.run_until_parked();
2349
2350 while let Some(Some(event)) = events.next().now_or_never() {
2351 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2352 update,
2353 ))) = &event
2354 {
2355 if update.fields.content.as_ref().is_some_and(|content| {
2356 content
2357 .iter()
2358 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2359 }) {
2360 terminals_started += 1;
2361 if terminals_started >= 2 {
2362 break;
2363 }
2364 }
2365 }
2366 }
2367 if terminals_started >= 2 {
2368 break;
2369 }
2370
2371 cx.background_executor
2372 .timer(Duration::from_millis(10))
2373 .await;
2374 }
2375 assert!(
2376 terminals_started >= 2,
2377 "expected 2 terminal tools to start, got {terminals_started}"
2378 );
2379
2380 // Cancel the thread while both terminals are running
2381 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2382
2383 // Collect remaining events
2384 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2385
2386 // Verify both terminal handles were killed
2387 let handles = environment.handles();
2388 assert_eq!(
2389 handles.len(),
2390 2,
2391 "expected 2 terminal handles to be created"
2392 );
2393 assert!(
2394 handles[0].was_killed(),
2395 "expected first terminal handle to be killed on cancellation"
2396 );
2397 assert!(
2398 handles[1].was_killed(),
2399 "expected second terminal handle to be killed on cancellation"
2400 );
2401
2402 // Verify we got a cancellation stop event
2403 assert_eq!(
2404 stop_events(remaining_events),
2405 vec![acp::StopReason::Cancelled],
2406 );
2407}
2408
2409#[gpui::test]
2410async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2411 // Tests that clicking the stop button on the terminal card (as opposed to the main
2412 // cancel button) properly reports user stopped via the was_stopped_by_user path.
2413 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2414 always_allow_tools(cx);
2415 let fake_model = model.as_fake();
2416
2417 let environment = Rc::new(cx.update(|cx| {
2418 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2419 }));
2420 let handle = environment.terminal_handle.clone().unwrap();
2421
2422 let mut events = thread
2423 .update(cx, |thread, cx| {
2424 thread.add_tool(crate::TerminalTool::new(
2425 thread.project().clone(),
2426 environment,
2427 ));
2428 thread.send(UserMessageId::new(), ["run a command"], cx)
2429 })
2430 .unwrap();
2431
2432 cx.run_until_parked();
2433
2434 // Simulate the model calling the terminal tool
2435 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2436 LanguageModelToolUse {
2437 id: "terminal_tool_1".into(),
2438 name: TerminalTool::NAME.into(),
2439 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2440 input: json!({"command": "sleep 1000", "cd": "."}),
2441 is_input_complete: true,
2442 thought_signature: None,
2443 },
2444 ));
2445 fake_model.end_last_completion_stream();
2446
2447 // Wait for the terminal tool to start running
2448 wait_for_terminal_tool_started(&mut events, cx).await;
2449
2450 // Simulate user clicking stop on the terminal card itself.
2451 // This sets the flag and signals exit (simulating what the real UI would do).
2452 handle.set_stopped_by_user(true);
2453 handle.killed.store(true, Ordering::SeqCst);
2454 handle.signal_exit();
2455
2456 // Wait for the tool to complete
2457 cx.run_until_parked();
2458
2459 // The thread continues after tool completion - simulate the model ending its turn
2460 fake_model
2461 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2462 fake_model.end_last_completion_stream();
2463
2464 // Collect remaining events
2465 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2466
2467 // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2468 assert_eq!(
2469 stop_events(remaining_events),
2470 vec![acp::StopReason::EndTurn],
2471 );
2472
2473 // Verify the tool result indicates user stopped
2474 thread.update(cx, |thread, _cx| {
2475 let message = thread.last_received_or_pending_message().unwrap();
2476 let agent_message = message.as_agent_message().unwrap();
2477
2478 let tool_use = agent_message
2479 .content
2480 .iter()
2481 .find_map(|content| match content {
2482 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2483 _ => None,
2484 })
2485 .expect("expected tool use in agent message");
2486
2487 let tool_result = agent_message
2488 .tool_results
2489 .get(&tool_use.id)
2490 .expect("expected tool result");
2491
2492 let result_text = match &tool_result.content {
2493 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2494 _ => panic!("expected text content in tool result"),
2495 };
2496
2497 assert!(
2498 result_text.contains("The user stopped this command"),
2499 "expected tool result to indicate user stopped, got: {result_text}"
2500 );
2501 });
2502}
2503
2504#[gpui::test]
2505async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2506 // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2507 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2508 always_allow_tools(cx);
2509 let fake_model = model.as_fake();
2510
2511 let environment = Rc::new(cx.update(|cx| {
2512 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2513 }));
2514 let handle = environment.terminal_handle.clone().unwrap();
2515
2516 let mut events = thread
2517 .update(cx, |thread, cx| {
2518 thread.add_tool(crate::TerminalTool::new(
2519 thread.project().clone(),
2520 environment,
2521 ));
2522 thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2523 })
2524 .unwrap();
2525
2526 cx.run_until_parked();
2527
2528 // Simulate the model calling the terminal tool with a short timeout
2529 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2530 LanguageModelToolUse {
2531 id: "terminal_tool_1".into(),
2532 name: TerminalTool::NAME.into(),
2533 raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2534 input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2535 is_input_complete: true,
2536 thought_signature: None,
2537 },
2538 ));
2539 fake_model.end_last_completion_stream();
2540
2541 // Wait for the terminal tool to start running
2542 wait_for_terminal_tool_started(&mut events, cx).await;
2543
2544 // Advance clock past the timeout
2545 cx.executor().advance_clock(Duration::from_millis(200));
2546 cx.run_until_parked();
2547
2548 // The thread continues after tool completion - simulate the model ending its turn
2549 fake_model
2550 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2551 fake_model.end_last_completion_stream();
2552
2553 // Collect remaining events
2554 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2555
2556 // Verify the terminal was killed due to timeout
2557 assert!(
2558 handle.was_killed(),
2559 "expected terminal handle to be killed on timeout"
2560 );
2561
2562 // Verify we got an EndTurn (the tool completed, just with timeout)
2563 assert_eq!(
2564 stop_events(remaining_events),
2565 vec![acp::StopReason::EndTurn],
2566 );
2567
2568 // Verify the tool result indicates timeout, not user stopped
2569 thread.update(cx, |thread, _cx| {
2570 let message = thread.last_received_or_pending_message().unwrap();
2571 let agent_message = message.as_agent_message().unwrap();
2572
2573 let tool_use = agent_message
2574 .content
2575 .iter()
2576 .find_map(|content| match content {
2577 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2578 _ => None,
2579 })
2580 .expect("expected tool use in agent message");
2581
2582 let tool_result = agent_message
2583 .tool_results
2584 .get(&tool_use.id)
2585 .expect("expected tool result");
2586
2587 let result_text = match &tool_result.content {
2588 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2589 _ => panic!("expected text content in tool result"),
2590 };
2591
2592 assert!(
2593 result_text.contains("timed out"),
2594 "expected tool result to indicate timeout, got: {result_text}"
2595 );
2596 assert!(
2597 !result_text.contains("The user stopped"),
2598 "tool result should not mention user stopped when it timed out, got: {result_text}"
2599 );
2600 });
2601}
2602
2603#[gpui::test]
2604async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2605 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2606 let fake_model = model.as_fake();
2607
2608 let events_1 = thread
2609 .update(cx, |thread, cx| {
2610 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2611 })
2612 .unwrap();
2613 cx.run_until_parked();
2614 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2615 cx.run_until_parked();
2616
2617 let events_2 = thread
2618 .update(cx, |thread, cx| {
2619 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2620 })
2621 .unwrap();
2622 cx.run_until_parked();
2623 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2624 fake_model
2625 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2626 fake_model.end_last_completion_stream();
2627
2628 let events_1 = events_1.collect::<Vec<_>>().await;
2629 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2630 let events_2 = events_2.collect::<Vec<_>>().await;
2631 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2632}
2633
2634#[gpui::test]
2635async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2636 // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2637 // the retry loop waits on a timer. If the user switches models and sends a new message
2638 // during that delay, the old turn should exit immediately instead of retrying with the
2639 // stale model.
2640 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2641 let model_a = model.as_fake();
2642
2643 // Start a turn with model_a.
2644 let events_1 = thread
2645 .update(cx, |thread, cx| {
2646 thread.send(UserMessageId::new(), ["Hello"], cx)
2647 })
2648 .unwrap();
2649 cx.run_until_parked();
2650 assert_eq!(model_a.completion_count(), 1);
2651
2652 // Model returns a retryable upstream 500. The turn enters the retry delay.
2653 model_a.send_last_completion_stream_error(
2654 LanguageModelCompletionError::UpstreamProviderError {
2655 message: "Internal server error".to_string(),
2656 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2657 retry_after: None,
2658 },
2659 );
2660 model_a.end_last_completion_stream();
2661 cx.run_until_parked();
2662
2663 // The old completion was consumed; model_a has no pending requests yet because the
2664 // retry timer hasn't fired.
2665 assert_eq!(model_a.completion_count(), 0);
2666
2667 // Switch to model_b and send a new message. This cancels the old turn.
2668 let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2669 "fake", "model-b", "Model B", false,
2670 ));
2671 thread.update(cx, |thread, cx| {
2672 thread.set_model(model_b.clone(), cx);
2673 });
2674 let events_2 = thread
2675 .update(cx, |thread, cx| {
2676 thread.send(UserMessageId::new(), ["Continue"], cx)
2677 })
2678 .unwrap();
2679 cx.run_until_parked();
2680
2681 // model_b should have received its completion request.
2682 assert_eq!(model_b.as_fake().completion_count(), 1);
2683
2684 // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2685 cx.executor().advance_clock(Duration::from_secs(10));
2686 cx.run_until_parked();
2687
2688 // model_a must NOT have received another completion request — the cancelled turn
2689 // should have exited during the retry delay rather than retrying with the old model.
2690 assert_eq!(
2691 model_a.completion_count(),
2692 0,
2693 "old model should not receive a retry request after cancellation"
2694 );
2695
2696 // Complete model_b's turn.
2697 model_b
2698 .as_fake()
2699 .send_last_completion_stream_text_chunk("Done!");
2700 model_b
2701 .as_fake()
2702 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2703 model_b.as_fake().end_last_completion_stream();
2704
2705 let events_1 = events_1.collect::<Vec<_>>().await;
2706 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2707
2708 let events_2 = events_2.collect::<Vec<_>>().await;
2709 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2710}
2711
2712#[gpui::test]
2713async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2714 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2715 let fake_model = model.as_fake();
2716
2717 let events_1 = thread
2718 .update(cx, |thread, cx| {
2719 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2720 })
2721 .unwrap();
2722 cx.run_until_parked();
2723 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2724 fake_model
2725 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2726 fake_model.end_last_completion_stream();
2727 let events_1 = events_1.collect::<Vec<_>>().await;
2728
2729 let events_2 = thread
2730 .update(cx, |thread, cx| {
2731 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2732 })
2733 .unwrap();
2734 cx.run_until_parked();
2735 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2736 fake_model
2737 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2738 fake_model.end_last_completion_stream();
2739 let events_2 = events_2.collect::<Vec<_>>().await;
2740
2741 assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2742 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2743}
2744
2745#[gpui::test]
2746async fn test_refusal(cx: &mut TestAppContext) {
2747 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2748 let fake_model = model.as_fake();
2749
2750 let events = thread
2751 .update(cx, |thread, cx| {
2752 thread.send(UserMessageId::new(), ["Hello"], cx)
2753 })
2754 .unwrap();
2755 cx.run_until_parked();
2756 thread.read_with(cx, |thread, _| {
2757 assert_eq!(
2758 thread.to_markdown(),
2759 indoc! {"
2760 ## User
2761
2762 Hello
2763 "}
2764 );
2765 });
2766
2767 fake_model.send_last_completion_stream_text_chunk("Hey!");
2768 cx.run_until_parked();
2769 thread.read_with(cx, |thread, _| {
2770 assert_eq!(
2771 thread.to_markdown(),
2772 indoc! {"
2773 ## User
2774
2775 Hello
2776
2777 ## Assistant
2778
2779 Hey!
2780 "}
2781 );
2782 });
2783
2784 // If the model refuses to continue, the thread should remove all the messages after the last user message.
2785 fake_model
2786 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2787 let events = events.collect::<Vec<_>>().await;
2788 assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2789 thread.read_with(cx, |thread, _| {
2790 assert_eq!(thread.to_markdown(), "");
2791 });
2792}
2793
2794#[gpui::test]
2795async fn test_truncate_first_message(cx: &mut TestAppContext) {
2796 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2797 let fake_model = model.as_fake();
2798
2799 let message_id = UserMessageId::new();
2800 thread
2801 .update(cx, |thread, cx| {
2802 thread.send(message_id.clone(), ["Hello"], cx)
2803 })
2804 .unwrap();
2805 cx.run_until_parked();
2806 thread.read_with(cx, |thread, _| {
2807 assert_eq!(
2808 thread.to_markdown(),
2809 indoc! {"
2810 ## User
2811
2812 Hello
2813 "}
2814 );
2815 assert_eq!(thread.latest_token_usage(), None);
2816 });
2817
2818 fake_model.send_last_completion_stream_text_chunk("Hey!");
2819 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2820 language_model::TokenUsage {
2821 input_tokens: 32_000,
2822 output_tokens: 16_000,
2823 cache_creation_input_tokens: 0,
2824 cache_read_input_tokens: 0,
2825 },
2826 ));
2827 cx.run_until_parked();
2828 thread.read_with(cx, |thread, _| {
2829 assert_eq!(
2830 thread.to_markdown(),
2831 indoc! {"
2832 ## User
2833
2834 Hello
2835
2836 ## Assistant
2837
2838 Hey!
2839 "}
2840 );
2841 assert_eq!(
2842 thread.latest_token_usage(),
2843 Some(acp_thread::TokenUsage {
2844 used_tokens: 32_000 + 16_000,
2845 max_tokens: 1_000_000,
2846 max_output_tokens: None,
2847 input_tokens: 32_000,
2848 output_tokens: 16_000,
2849 })
2850 );
2851 });
2852
2853 thread
2854 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2855 .unwrap();
2856 cx.run_until_parked();
2857 thread.read_with(cx, |thread, _| {
2858 assert_eq!(thread.to_markdown(), "");
2859 assert_eq!(thread.latest_token_usage(), None);
2860 });
2861
2862 // Ensure we can still send a new message after truncation.
2863 thread
2864 .update(cx, |thread, cx| {
2865 thread.send(UserMessageId::new(), ["Hi"], cx)
2866 })
2867 .unwrap();
2868 thread.update(cx, |thread, _cx| {
2869 assert_eq!(
2870 thread.to_markdown(),
2871 indoc! {"
2872 ## User
2873
2874 Hi
2875 "}
2876 );
2877 });
2878 cx.run_until_parked();
2879 fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2880 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2881 language_model::TokenUsage {
2882 input_tokens: 40_000,
2883 output_tokens: 20_000,
2884 cache_creation_input_tokens: 0,
2885 cache_read_input_tokens: 0,
2886 },
2887 ));
2888 cx.run_until_parked();
2889 thread.read_with(cx, |thread, _| {
2890 assert_eq!(
2891 thread.to_markdown(),
2892 indoc! {"
2893 ## User
2894
2895 Hi
2896
2897 ## Assistant
2898
2899 Ahoy!
2900 "}
2901 );
2902
2903 assert_eq!(
2904 thread.latest_token_usage(),
2905 Some(acp_thread::TokenUsage {
2906 used_tokens: 40_000 + 20_000,
2907 max_tokens: 1_000_000,
2908 max_output_tokens: None,
2909 input_tokens: 40_000,
2910 output_tokens: 20_000,
2911 })
2912 );
2913 });
2914}
2915
2916#[gpui::test]
2917async fn test_truncate_second_message(cx: &mut TestAppContext) {
2918 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2919 let fake_model = model.as_fake();
2920
2921 thread
2922 .update(cx, |thread, cx| {
2923 thread.send(UserMessageId::new(), ["Message 1"], cx)
2924 })
2925 .unwrap();
2926 cx.run_until_parked();
2927 fake_model.send_last_completion_stream_text_chunk("Message 1 response");
2928 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2929 language_model::TokenUsage {
2930 input_tokens: 32_000,
2931 output_tokens: 16_000,
2932 cache_creation_input_tokens: 0,
2933 cache_read_input_tokens: 0,
2934 },
2935 ));
2936 fake_model.end_last_completion_stream();
2937 cx.run_until_parked();
2938
2939 let assert_first_message_state = |cx: &mut TestAppContext| {
2940 thread.clone().read_with(cx, |thread, _| {
2941 assert_eq!(
2942 thread.to_markdown(),
2943 indoc! {"
2944 ## User
2945
2946 Message 1
2947
2948 ## Assistant
2949
2950 Message 1 response
2951 "}
2952 );
2953
2954 assert_eq!(
2955 thread.latest_token_usage(),
2956 Some(acp_thread::TokenUsage {
2957 used_tokens: 32_000 + 16_000,
2958 max_tokens: 1_000_000,
2959 max_output_tokens: None,
2960 input_tokens: 32_000,
2961 output_tokens: 16_000,
2962 })
2963 );
2964 });
2965 };
2966
2967 assert_first_message_state(cx);
2968
2969 let second_message_id = UserMessageId::new();
2970 thread
2971 .update(cx, |thread, cx| {
2972 thread.send(second_message_id.clone(), ["Message 2"], cx)
2973 })
2974 .unwrap();
2975 cx.run_until_parked();
2976
2977 fake_model.send_last_completion_stream_text_chunk("Message 2 response");
2978 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2979 language_model::TokenUsage {
2980 input_tokens: 40_000,
2981 output_tokens: 20_000,
2982 cache_creation_input_tokens: 0,
2983 cache_read_input_tokens: 0,
2984 },
2985 ));
2986 fake_model.end_last_completion_stream();
2987 cx.run_until_parked();
2988
2989 thread.read_with(cx, |thread, _| {
2990 assert_eq!(
2991 thread.to_markdown(),
2992 indoc! {"
2993 ## User
2994
2995 Message 1
2996
2997 ## Assistant
2998
2999 Message 1 response
3000
3001 ## User
3002
3003 Message 2
3004
3005 ## Assistant
3006
3007 Message 2 response
3008 "}
3009 );
3010
3011 assert_eq!(
3012 thread.latest_token_usage(),
3013 Some(acp_thread::TokenUsage {
3014 used_tokens: 40_000 + 20_000,
3015 max_tokens: 1_000_000,
3016 max_output_tokens: None,
3017 input_tokens: 40_000,
3018 output_tokens: 20_000,
3019 })
3020 );
3021 });
3022
3023 thread
3024 .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3025 .unwrap();
3026 cx.run_until_parked();
3027
3028 assert_first_message_state(cx);
3029}
3030
3031#[gpui::test]
3032async fn test_title_generation(cx: &mut TestAppContext) {
3033 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3034 let fake_model = model.as_fake();
3035
3036 let summary_model = Arc::new(FakeLanguageModel::default());
3037 thread.update(cx, |thread, cx| {
3038 thread.set_summarization_model(Some(summary_model.clone()), cx)
3039 });
3040
3041 let send = thread
3042 .update(cx, |thread, cx| {
3043 thread.send(UserMessageId::new(), ["Hello"], cx)
3044 })
3045 .unwrap();
3046 cx.run_until_parked();
3047
3048 fake_model.send_last_completion_stream_text_chunk("Hey!");
3049 fake_model.end_last_completion_stream();
3050 cx.run_until_parked();
3051 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "New Thread"));
3052
3053 // Ensure the summary model has been invoked to generate a title.
3054 summary_model.send_last_completion_stream_text_chunk("Hello ");
3055 summary_model.send_last_completion_stream_text_chunk("world\nG");
3056 summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3057 summary_model.end_last_completion_stream();
3058 send.collect::<Vec<_>>().await;
3059 cx.run_until_parked();
3060 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3061
3062 // Send another message, ensuring no title is generated this time.
3063 let send = thread
3064 .update(cx, |thread, cx| {
3065 thread.send(UserMessageId::new(), ["Hello again"], cx)
3066 })
3067 .unwrap();
3068 cx.run_until_parked();
3069 fake_model.send_last_completion_stream_text_chunk("Hey again!");
3070 fake_model.end_last_completion_stream();
3071 cx.run_until_parked();
3072 assert_eq!(summary_model.pending_completions(), Vec::new());
3073 send.collect::<Vec<_>>().await;
3074 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3075}
3076
3077#[gpui::test]
3078async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3079 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3080 let fake_model = model.as_fake();
3081
3082 let _events = thread
3083 .update(cx, |thread, cx| {
3084 thread.add_tool(ToolRequiringPermission);
3085 thread.add_tool(EchoTool);
3086 thread.send(UserMessageId::new(), ["Hey!"], cx)
3087 })
3088 .unwrap();
3089 cx.run_until_parked();
3090
3091 let permission_tool_use = LanguageModelToolUse {
3092 id: "tool_id_1".into(),
3093 name: ToolRequiringPermission::NAME.into(),
3094 raw_input: "{}".into(),
3095 input: json!({}),
3096 is_input_complete: true,
3097 thought_signature: None,
3098 };
3099 let echo_tool_use = LanguageModelToolUse {
3100 id: "tool_id_2".into(),
3101 name: EchoTool::NAME.into(),
3102 raw_input: json!({"text": "test"}).to_string(),
3103 input: json!({"text": "test"}),
3104 is_input_complete: true,
3105 thought_signature: None,
3106 };
3107 fake_model.send_last_completion_stream_text_chunk("Hi!");
3108 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3109 permission_tool_use,
3110 ));
3111 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3112 echo_tool_use.clone(),
3113 ));
3114 fake_model.end_last_completion_stream();
3115 cx.run_until_parked();
3116
3117 // Ensure pending tools are skipped when building a request.
3118 let request = thread
3119 .read_with(cx, |thread, cx| {
3120 thread.build_completion_request(CompletionIntent::EditFile, cx)
3121 })
3122 .unwrap();
3123 assert_eq!(
3124 request.messages[1..],
3125 vec![
3126 LanguageModelRequestMessage {
3127 role: Role::User,
3128 content: vec!["Hey!".into()],
3129 cache: true,
3130 reasoning_details: None,
3131 },
3132 LanguageModelRequestMessage {
3133 role: Role::Assistant,
3134 content: vec![
3135 MessageContent::Text("Hi!".into()),
3136 MessageContent::ToolUse(echo_tool_use.clone())
3137 ],
3138 cache: false,
3139 reasoning_details: None,
3140 },
3141 LanguageModelRequestMessage {
3142 role: Role::User,
3143 content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3144 tool_use_id: echo_tool_use.id.clone(),
3145 tool_name: echo_tool_use.name,
3146 is_error: false,
3147 content: "test".into(),
3148 output: Some("test".into())
3149 })],
3150 cache: false,
3151 reasoning_details: None,
3152 },
3153 ],
3154 );
3155}
3156
3157#[gpui::test]
3158async fn test_agent_connection(cx: &mut TestAppContext) {
3159 cx.update(settings::init);
3160 let templates = Templates::new();
3161
3162 // Initialize language model system with test provider
3163 cx.update(|cx| {
3164 gpui_tokio::init(cx);
3165
3166 let http_client = FakeHttpClient::with_404_response();
3167 let clock = Arc::new(clock::FakeSystemClock::new());
3168 let client = Client::new(clock, http_client, cx);
3169 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3170 language_model::init(user_store.clone(), client.clone(), cx);
3171 language_models::init(user_store, client.clone(), cx);
3172 LanguageModelRegistry::test(cx);
3173 });
3174 cx.executor().forbid_parking();
3175
3176 // Create a project for new_thread
3177 let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3178 fake_fs.insert_tree(path!("/test"), json!({})).await;
3179 let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3180 let cwd = Path::new("/test");
3181 let thread_store = cx.new(|cx| ThreadStore::new(cx));
3182
3183 // Create agent and connection
3184 let agent = cx
3185 .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3186 let connection = NativeAgentConnection(agent.clone());
3187
3188 // Create a thread using new_thread
3189 let connection_rc = Rc::new(connection.clone());
3190 let acp_thread = cx
3191 .update(|cx| connection_rc.new_session(project, cwd, cx))
3192 .await
3193 .expect("new_thread should succeed");
3194
3195 // Get the session_id from the AcpThread
3196 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3197
3198 // Test model_selector returns Some
3199 let selector_opt = connection.model_selector(&session_id);
3200 assert!(
3201 selector_opt.is_some(),
3202 "agent should always support ModelSelector"
3203 );
3204 let selector = selector_opt.unwrap();
3205
3206 // Test list_models
3207 let listed_models = cx
3208 .update(|cx| selector.list_models(cx))
3209 .await
3210 .expect("list_models should succeed");
3211 let AgentModelList::Grouped(listed_models) = listed_models else {
3212 panic!("Unexpected model list type");
3213 };
3214 assert!(!listed_models.is_empty(), "should have at least one model");
3215 assert_eq!(
3216 listed_models[&AgentModelGroupName("Fake".into())][0]
3217 .id
3218 .0
3219 .as_ref(),
3220 "fake/fake"
3221 );
3222
3223 // Test selected_model returns the default
3224 let model = cx
3225 .update(|cx| selector.selected_model(cx))
3226 .await
3227 .expect("selected_model should succeed");
3228 let model = cx
3229 .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3230 .unwrap();
3231 let model = model.as_fake();
3232 assert_eq!(model.id().0, "fake", "should return default model");
3233
3234 let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3235 cx.run_until_parked();
3236 model.send_last_completion_stream_text_chunk("def");
3237 cx.run_until_parked();
3238 acp_thread.read_with(cx, |thread, cx| {
3239 assert_eq!(
3240 thread.to_markdown(cx),
3241 indoc! {"
3242 ## User
3243
3244 abc
3245
3246 ## Assistant
3247
3248 def
3249
3250 "}
3251 )
3252 });
3253
3254 // Test cancel
3255 cx.update(|cx| connection.cancel(&session_id, cx));
3256 request.await.expect("prompt should fail gracefully");
3257
3258 // Explicitly close the session and drop the ACP thread.
3259 cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3260 .await
3261 .unwrap();
3262 drop(acp_thread);
3263 let result = cx
3264 .update(|cx| {
3265 connection.prompt(
3266 Some(acp_thread::UserMessageId::new()),
3267 acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3268 cx,
3269 )
3270 })
3271 .await;
3272 assert_eq!(
3273 result.as_ref().unwrap_err().to_string(),
3274 "Session not found",
3275 "unexpected result: {:?}",
3276 result
3277 );
3278}
3279
3280#[gpui::test]
3281async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3282 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3283 thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3284 let fake_model = model.as_fake();
3285
3286 let mut events = thread
3287 .update(cx, |thread, cx| {
3288 thread.send(UserMessageId::new(), ["Echo something"], cx)
3289 })
3290 .unwrap();
3291 cx.run_until_parked();
3292
3293 // Simulate streaming partial input.
3294 let input = json!({});
3295 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3296 LanguageModelToolUse {
3297 id: "1".into(),
3298 name: EchoTool::NAME.into(),
3299 raw_input: input.to_string(),
3300 input,
3301 is_input_complete: false,
3302 thought_signature: None,
3303 },
3304 ));
3305
3306 // Input streaming completed
3307 let input = json!({ "text": "Hello!" });
3308 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3309 LanguageModelToolUse {
3310 id: "1".into(),
3311 name: "echo".into(),
3312 raw_input: input.to_string(),
3313 input,
3314 is_input_complete: true,
3315 thought_signature: None,
3316 },
3317 ));
3318 fake_model.end_last_completion_stream();
3319 cx.run_until_parked();
3320
3321 let tool_call = expect_tool_call(&mut events).await;
3322 assert_eq!(
3323 tool_call,
3324 acp::ToolCall::new("1", "Echo")
3325 .raw_input(json!({}))
3326 .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3327 );
3328 let update = expect_tool_call_update_fields(&mut events).await;
3329 assert_eq!(
3330 update,
3331 acp::ToolCallUpdate::new(
3332 "1",
3333 acp::ToolCallUpdateFields::new()
3334 .title("Echo")
3335 .kind(acp::ToolKind::Other)
3336 .raw_input(json!({ "text": "Hello!"}))
3337 )
3338 );
3339 let update = expect_tool_call_update_fields(&mut events).await;
3340 assert_eq!(
3341 update,
3342 acp::ToolCallUpdate::new(
3343 "1",
3344 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3345 )
3346 );
3347 let update = expect_tool_call_update_fields(&mut events).await;
3348 assert_eq!(
3349 update,
3350 acp::ToolCallUpdate::new(
3351 "1",
3352 acp::ToolCallUpdateFields::new()
3353 .status(acp::ToolCallStatus::Completed)
3354 .raw_output("Hello!")
3355 )
3356 );
3357}
3358
3359#[gpui::test]
3360async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3361 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3362 let fake_model = model.as_fake();
3363
3364 let mut events = thread
3365 .update(cx, |thread, cx| {
3366 thread.send(UserMessageId::new(), ["Hello!"], cx)
3367 })
3368 .unwrap();
3369 cx.run_until_parked();
3370
3371 fake_model.send_last_completion_stream_text_chunk("Hey!");
3372 fake_model.end_last_completion_stream();
3373
3374 let mut retry_events = Vec::new();
3375 while let Some(Ok(event)) = events.next().await {
3376 match event {
3377 ThreadEvent::Retry(retry_status) => {
3378 retry_events.push(retry_status);
3379 }
3380 ThreadEvent::Stop(..) => break,
3381 _ => {}
3382 }
3383 }
3384
3385 assert_eq!(retry_events.len(), 0);
3386 thread.read_with(cx, |thread, _cx| {
3387 assert_eq!(
3388 thread.to_markdown(),
3389 indoc! {"
3390 ## User
3391
3392 Hello!
3393
3394 ## Assistant
3395
3396 Hey!
3397 "}
3398 )
3399 });
3400}
3401
3402#[gpui::test]
3403async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3404 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3405 let fake_model = model.as_fake();
3406
3407 let mut events = thread
3408 .update(cx, |thread, cx| {
3409 thread.send(UserMessageId::new(), ["Hello!"], cx)
3410 })
3411 .unwrap();
3412 cx.run_until_parked();
3413
3414 fake_model.send_last_completion_stream_text_chunk("Hey,");
3415 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3416 provider: LanguageModelProviderName::new("Anthropic"),
3417 retry_after: Some(Duration::from_secs(3)),
3418 });
3419 fake_model.end_last_completion_stream();
3420
3421 cx.executor().advance_clock(Duration::from_secs(3));
3422 cx.run_until_parked();
3423
3424 fake_model.send_last_completion_stream_text_chunk("there!");
3425 fake_model.end_last_completion_stream();
3426 cx.run_until_parked();
3427
3428 let mut retry_events = Vec::new();
3429 while let Some(Ok(event)) = events.next().await {
3430 match event {
3431 ThreadEvent::Retry(retry_status) => {
3432 retry_events.push(retry_status);
3433 }
3434 ThreadEvent::Stop(..) => break,
3435 _ => {}
3436 }
3437 }
3438
3439 assert_eq!(retry_events.len(), 1);
3440 assert!(matches!(
3441 retry_events[0],
3442 acp_thread::RetryStatus { attempt: 1, .. }
3443 ));
3444 thread.read_with(cx, |thread, _cx| {
3445 assert_eq!(
3446 thread.to_markdown(),
3447 indoc! {"
3448 ## User
3449
3450 Hello!
3451
3452 ## Assistant
3453
3454 Hey,
3455
3456 [resume]
3457
3458 ## Assistant
3459
3460 there!
3461 "}
3462 )
3463 });
3464}
3465
3466#[gpui::test]
3467async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3468 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3469 let fake_model = model.as_fake();
3470
3471 let events = thread
3472 .update(cx, |thread, cx| {
3473 thread.add_tool(EchoTool);
3474 thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3475 })
3476 .unwrap();
3477 cx.run_until_parked();
3478
3479 let tool_use_1 = LanguageModelToolUse {
3480 id: "tool_1".into(),
3481 name: EchoTool::NAME.into(),
3482 raw_input: json!({"text": "test"}).to_string(),
3483 input: json!({"text": "test"}),
3484 is_input_complete: true,
3485 thought_signature: None,
3486 };
3487 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3488 tool_use_1.clone(),
3489 ));
3490 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3491 provider: LanguageModelProviderName::new("Anthropic"),
3492 retry_after: Some(Duration::from_secs(3)),
3493 });
3494 fake_model.end_last_completion_stream();
3495
3496 cx.executor().advance_clock(Duration::from_secs(3));
3497 let completion = fake_model.pending_completions().pop().unwrap();
3498 assert_eq!(
3499 completion.messages[1..],
3500 vec![
3501 LanguageModelRequestMessage {
3502 role: Role::User,
3503 content: vec!["Call the echo tool!".into()],
3504 cache: false,
3505 reasoning_details: None,
3506 },
3507 LanguageModelRequestMessage {
3508 role: Role::Assistant,
3509 content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3510 cache: false,
3511 reasoning_details: None,
3512 },
3513 LanguageModelRequestMessage {
3514 role: Role::User,
3515 content: vec![language_model::MessageContent::ToolResult(
3516 LanguageModelToolResult {
3517 tool_use_id: tool_use_1.id.clone(),
3518 tool_name: tool_use_1.name.clone(),
3519 is_error: false,
3520 content: "test".into(),
3521 output: Some("test".into())
3522 }
3523 )],
3524 cache: true,
3525 reasoning_details: None,
3526 },
3527 ]
3528 );
3529
3530 fake_model.send_last_completion_stream_text_chunk("Done");
3531 fake_model.end_last_completion_stream();
3532 cx.run_until_parked();
3533 events.collect::<Vec<_>>().await;
3534 thread.read_with(cx, |thread, _cx| {
3535 assert_eq!(
3536 thread.last_received_or_pending_message(),
3537 Some(Message::Agent(AgentMessage {
3538 content: vec![AgentMessageContent::Text("Done".into())],
3539 tool_results: IndexMap::default(),
3540 reasoning_details: None,
3541 }))
3542 );
3543 })
3544}
3545
3546#[gpui::test]
3547async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3548 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3549 let fake_model = model.as_fake();
3550
3551 let mut events = thread
3552 .update(cx, |thread, cx| {
3553 thread.send(UserMessageId::new(), ["Hello!"], cx)
3554 })
3555 .unwrap();
3556 cx.run_until_parked();
3557
3558 for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3559 fake_model.send_last_completion_stream_error(
3560 LanguageModelCompletionError::ServerOverloaded {
3561 provider: LanguageModelProviderName::new("Anthropic"),
3562 retry_after: Some(Duration::from_secs(3)),
3563 },
3564 );
3565 fake_model.end_last_completion_stream();
3566 cx.executor().advance_clock(Duration::from_secs(3));
3567 cx.run_until_parked();
3568 }
3569
3570 let mut errors = Vec::new();
3571 let mut retry_events = Vec::new();
3572 while let Some(event) = events.next().await {
3573 match event {
3574 Ok(ThreadEvent::Retry(retry_status)) => {
3575 retry_events.push(retry_status);
3576 }
3577 Ok(ThreadEvent::Stop(..)) => break,
3578 Err(error) => errors.push(error),
3579 _ => {}
3580 }
3581 }
3582
3583 assert_eq!(
3584 retry_events.len(),
3585 crate::thread::MAX_RETRY_ATTEMPTS as usize
3586 );
3587 for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3588 assert_eq!(retry_events[i].attempt, i + 1);
3589 }
3590 assert_eq!(errors.len(), 1);
3591 let error = errors[0]
3592 .downcast_ref::<LanguageModelCompletionError>()
3593 .unwrap();
3594 assert!(matches!(
3595 error,
3596 LanguageModelCompletionError::ServerOverloaded { .. }
3597 ));
3598}
3599
3600#[gpui::test]
3601async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3602 cx: &mut TestAppContext,
3603) {
3604 init_test(cx);
3605 always_allow_tools(cx);
3606
3607 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3608 let fake_model = model.as_fake();
3609
3610 thread.update(cx, |thread, _cx| {
3611 thread.add_tool(StreamingEchoTool::new());
3612 });
3613
3614 let _events = thread
3615 .update(cx, |thread, cx| {
3616 thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3617 })
3618 .unwrap();
3619 cx.run_until_parked();
3620
3621 // Send a partial tool use (is_input_complete = false), simulating the LLM
3622 // streaming input for a tool.
3623 let tool_use = LanguageModelToolUse {
3624 id: "tool_1".into(),
3625 name: "streaming_echo".into(),
3626 raw_input: r#"{"text": "partial"}"#.into(),
3627 input: json!({"text": "partial"}),
3628 is_input_complete: false,
3629 thought_signature: None,
3630 };
3631 fake_model
3632 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3633 cx.run_until_parked();
3634
3635 // Send a stream error WITHOUT ever sending is_input_complete = true.
3636 // Before the fix, this would deadlock: the tool waits for more partials
3637 // (or cancellation), run_turn_internal waits for the tool, and the sender
3638 // keeping the channel open lives inside RunningTurn.
3639 fake_model.send_last_completion_stream_error(
3640 LanguageModelCompletionError::UpstreamProviderError {
3641 message: "Internal server error".to_string(),
3642 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3643 retry_after: None,
3644 },
3645 );
3646 fake_model.end_last_completion_stream();
3647
3648 // Advance past the retry delay so run_turn_internal retries.
3649 cx.executor().advance_clock(Duration::from_secs(5));
3650 cx.run_until_parked();
3651
3652 // The retry request should contain the streaming tool's error result,
3653 // proving the tool terminated and its result was forwarded.
3654 let completion = fake_model
3655 .pending_completions()
3656 .pop()
3657 .expect("No running turn");
3658 assert_eq!(
3659 completion.messages[1..],
3660 vec![
3661 LanguageModelRequestMessage {
3662 role: Role::User,
3663 content: vec!["Use the streaming_echo tool".into()],
3664 cache: false,
3665 reasoning_details: None,
3666 },
3667 LanguageModelRequestMessage {
3668 role: Role::Assistant,
3669 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3670 cache: false,
3671 reasoning_details: None,
3672 },
3673 LanguageModelRequestMessage {
3674 role: Role::User,
3675 content: vec![language_model::MessageContent::ToolResult(
3676 LanguageModelToolResult {
3677 tool_use_id: tool_use.id.clone(),
3678 tool_name: tool_use.name,
3679 is_error: true,
3680 content: "Failed to receive tool input: tool input was not fully received"
3681 .into(),
3682 output: Some(
3683 "Failed to receive tool input: tool input was not fully received"
3684 .into()
3685 ),
3686 }
3687 )],
3688 cache: true,
3689 reasoning_details: None,
3690 },
3691 ]
3692 );
3693
3694 // Finish the retry round so the turn completes cleanly.
3695 fake_model.send_last_completion_stream_text_chunk("Done");
3696 fake_model.end_last_completion_stream();
3697 cx.run_until_parked();
3698
3699 thread.read_with(cx, |thread, _cx| {
3700 assert!(
3701 thread.is_turn_complete(),
3702 "Thread should not be stuck; the turn should have completed",
3703 );
3704 });
3705}
3706
3707/// Filters out the stop events for asserting against in tests
3708fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3709 result_events
3710 .into_iter()
3711 .filter_map(|event| match event.unwrap() {
3712 ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3713 _ => None,
3714 })
3715 .collect()
3716}
3717
3718struct ThreadTest {
3719 model: Arc<dyn LanguageModel>,
3720 thread: Entity<Thread>,
3721 project_context: Entity<ProjectContext>,
3722 context_server_store: Entity<ContextServerStore>,
3723 fs: Arc<FakeFs>,
3724}
3725
3726enum TestModel {
3727 Sonnet4,
3728 Fake,
3729}
3730
3731impl TestModel {
3732 fn id(&self) -> LanguageModelId {
3733 match self {
3734 TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3735 TestModel::Fake => unreachable!(),
3736 }
3737 }
3738}
3739
3740async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3741 cx.executor().allow_parking();
3742
3743 let fs = FakeFs::new(cx.background_executor.clone());
3744 fs.create_dir(paths::settings_file().parent().unwrap())
3745 .await
3746 .unwrap();
3747 fs.insert_file(
3748 paths::settings_file(),
3749 json!({
3750 "agent": {
3751 "default_profile": "test-profile",
3752 "profiles": {
3753 "test-profile": {
3754 "name": "Test Profile",
3755 "tools": {
3756 EchoTool::NAME: true,
3757 DelayTool::NAME: true,
3758 WordListTool::NAME: true,
3759 ToolRequiringPermission::NAME: true,
3760 InfiniteTool::NAME: true,
3761 CancellationAwareTool::NAME: true,
3762 StreamingEchoTool::NAME: true,
3763 StreamingFailingEchoTool::NAME: true,
3764 TerminalTool::NAME: true,
3765 }
3766 }
3767 }
3768 }
3769 })
3770 .to_string()
3771 .into_bytes(),
3772 )
3773 .await;
3774
3775 cx.update(|cx| {
3776 settings::init(cx);
3777
3778 match model {
3779 TestModel::Fake => {}
3780 TestModel::Sonnet4 => {
3781 gpui_tokio::init(cx);
3782 let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3783 cx.set_http_client(Arc::new(http_client));
3784 let client = Client::production(cx);
3785 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3786 language_model::init(user_store.clone(), client.clone(), cx);
3787 language_models::init(user_store, client.clone(), cx);
3788 }
3789 };
3790
3791 watch_settings(fs.clone(), cx);
3792 });
3793
3794 let templates = Templates::new();
3795
3796 fs.insert_tree(path!("/test"), json!({})).await;
3797 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3798
3799 let model = cx
3800 .update(|cx| {
3801 if let TestModel::Fake = model {
3802 Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3803 } else {
3804 let model_id = model.id();
3805 let models = LanguageModelRegistry::read_global(cx);
3806 let model = models
3807 .available_models(cx)
3808 .find(|model| model.id() == model_id)
3809 .unwrap();
3810
3811 let provider = models.provider(&model.provider_id()).unwrap();
3812 let authenticated = provider.authenticate(cx);
3813
3814 cx.spawn(async move |_cx| {
3815 authenticated.await.unwrap();
3816 model
3817 })
3818 }
3819 })
3820 .await;
3821
3822 let project_context = cx.new(|_cx| ProjectContext::default());
3823 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
3824 let context_server_registry =
3825 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
3826 let thread = cx.new(|cx| {
3827 Thread::new(
3828 project,
3829 project_context.clone(),
3830 context_server_registry,
3831 templates,
3832 Some(model.clone()),
3833 cx,
3834 )
3835 });
3836 ThreadTest {
3837 model,
3838 thread,
3839 project_context,
3840 context_server_store,
3841 fs,
3842 }
3843}
3844
3845#[cfg(test)]
3846#[ctor::ctor]
3847fn init_logger() {
3848 if std::env::var("RUST_LOG").is_ok() {
3849 env_logger::init();
3850 }
3851}
3852
3853fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
3854 let fs = fs.clone();
3855 cx.spawn({
3856 async move |cx| {
3857 let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
3858 cx.background_executor(),
3859 fs,
3860 paths::settings_file().clone(),
3861 );
3862 let _watcher_task = watcher_task;
3863
3864 while let Some(new_settings_content) = new_settings_content_rx.next().await {
3865 cx.update(|cx| {
3866 SettingsStore::update_global(cx, |settings, cx| {
3867 settings.set_user_settings(&new_settings_content, cx)
3868 })
3869 })
3870 .ok();
3871 }
3872 }
3873 })
3874 .detach();
3875}
3876
3877fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
3878 completion
3879 .tools
3880 .iter()
3881 .map(|tool| tool.name.clone())
3882 .collect()
3883}
3884
3885fn setup_context_server(
3886 name: &'static str,
3887 tools: Vec<context_server::types::Tool>,
3888 context_server_store: &Entity<ContextServerStore>,
3889 cx: &mut TestAppContext,
3890) -> mpsc::UnboundedReceiver<(
3891 context_server::types::CallToolParams,
3892 oneshot::Sender<context_server::types::CallToolResponse>,
3893)> {
3894 cx.update(|cx| {
3895 let mut settings = ProjectSettings::get_global(cx).clone();
3896 settings.context_servers.insert(
3897 name.into(),
3898 project::project_settings::ContextServerSettings::Stdio {
3899 enabled: true,
3900 remote: false,
3901 command: ContextServerCommand {
3902 path: "somebinary".into(),
3903 args: Vec::new(),
3904 env: None,
3905 timeout: None,
3906 },
3907 },
3908 );
3909 ProjectSettings::override_global(settings, cx);
3910 });
3911
3912 let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
3913 let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
3914 .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
3915 context_server::types::InitializeResponse {
3916 protocol_version: context_server::types::ProtocolVersion(
3917 context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
3918 ),
3919 server_info: context_server::types::Implementation {
3920 name: name.into(),
3921 version: "1.0.0".to_string(),
3922 },
3923 capabilities: context_server::types::ServerCapabilities {
3924 tools: Some(context_server::types::ToolsCapabilities {
3925 list_changed: Some(true),
3926 }),
3927 ..Default::default()
3928 },
3929 meta: None,
3930 }
3931 })
3932 .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
3933 let tools = tools.clone();
3934 async move {
3935 context_server::types::ListToolsResponse {
3936 tools,
3937 next_cursor: None,
3938 meta: None,
3939 }
3940 }
3941 })
3942 .on_request::<context_server::types::requests::CallTool, _>(move |params| {
3943 let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
3944 async move {
3945 let (response_tx, response_rx) = oneshot::channel();
3946 mcp_tool_calls_tx
3947 .unbounded_send((params, response_tx))
3948 .unwrap();
3949 response_rx.await.unwrap()
3950 }
3951 });
3952 context_server_store.update(cx, |store, cx| {
3953 store.start_server(
3954 Arc::new(ContextServer::new(
3955 ContextServerId(name.into()),
3956 Arc::new(fake_transport),
3957 )),
3958 cx,
3959 );
3960 });
3961 cx.run_until_parked();
3962 mcp_tool_calls_rx
3963}
3964
3965#[gpui::test]
3966async fn test_tokens_before_message(cx: &mut TestAppContext) {
3967 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3968 let fake_model = model.as_fake();
3969
3970 // First message
3971 let message_1_id = UserMessageId::new();
3972 thread
3973 .update(cx, |thread, cx| {
3974 thread.send(message_1_id.clone(), ["First message"], cx)
3975 })
3976 .unwrap();
3977 cx.run_until_parked();
3978
3979 // Before any response, tokens_before_message should return None for first message
3980 thread.read_with(cx, |thread, _| {
3981 assert_eq!(
3982 thread.tokens_before_message(&message_1_id),
3983 None,
3984 "First message should have no tokens before it"
3985 );
3986 });
3987
3988 // Complete first message with usage
3989 fake_model.send_last_completion_stream_text_chunk("Response 1");
3990 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3991 language_model::TokenUsage {
3992 input_tokens: 100,
3993 output_tokens: 50,
3994 cache_creation_input_tokens: 0,
3995 cache_read_input_tokens: 0,
3996 },
3997 ));
3998 fake_model.end_last_completion_stream();
3999 cx.run_until_parked();
4000
4001 // First message still has no tokens before it
4002 thread.read_with(cx, |thread, _| {
4003 assert_eq!(
4004 thread.tokens_before_message(&message_1_id),
4005 None,
4006 "First message should still have no tokens before it after response"
4007 );
4008 });
4009
4010 // Second message
4011 let message_2_id = UserMessageId::new();
4012 thread
4013 .update(cx, |thread, cx| {
4014 thread.send(message_2_id.clone(), ["Second message"], cx)
4015 })
4016 .unwrap();
4017 cx.run_until_parked();
4018
4019 // Second message should have first message's input tokens before it
4020 thread.read_with(cx, |thread, _| {
4021 assert_eq!(
4022 thread.tokens_before_message(&message_2_id),
4023 Some(100),
4024 "Second message should have 100 tokens before it (from first request)"
4025 );
4026 });
4027
4028 // Complete second message
4029 fake_model.send_last_completion_stream_text_chunk("Response 2");
4030 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4031 language_model::TokenUsage {
4032 input_tokens: 250, // Total for this request (includes previous context)
4033 output_tokens: 75,
4034 cache_creation_input_tokens: 0,
4035 cache_read_input_tokens: 0,
4036 },
4037 ));
4038 fake_model.end_last_completion_stream();
4039 cx.run_until_parked();
4040
4041 // Third message
4042 let message_3_id = UserMessageId::new();
4043 thread
4044 .update(cx, |thread, cx| {
4045 thread.send(message_3_id.clone(), ["Third message"], cx)
4046 })
4047 .unwrap();
4048 cx.run_until_parked();
4049
4050 // Third message should have second message's input tokens (250) before it
4051 thread.read_with(cx, |thread, _| {
4052 assert_eq!(
4053 thread.tokens_before_message(&message_3_id),
4054 Some(250),
4055 "Third message should have 250 tokens before it (from second request)"
4056 );
4057 // Second message should still have 100
4058 assert_eq!(
4059 thread.tokens_before_message(&message_2_id),
4060 Some(100),
4061 "Second message should still have 100 tokens before it"
4062 );
4063 // First message still has none
4064 assert_eq!(
4065 thread.tokens_before_message(&message_1_id),
4066 None,
4067 "First message should still have no tokens before it"
4068 );
4069 });
4070}
4071
4072#[gpui::test]
4073async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4074 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4075 let fake_model = model.as_fake();
4076
4077 // Set up three messages with responses
4078 let message_1_id = UserMessageId::new();
4079 thread
4080 .update(cx, |thread, cx| {
4081 thread.send(message_1_id.clone(), ["Message 1"], cx)
4082 })
4083 .unwrap();
4084 cx.run_until_parked();
4085 fake_model.send_last_completion_stream_text_chunk("Response 1");
4086 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4087 language_model::TokenUsage {
4088 input_tokens: 100,
4089 output_tokens: 50,
4090 cache_creation_input_tokens: 0,
4091 cache_read_input_tokens: 0,
4092 },
4093 ));
4094 fake_model.end_last_completion_stream();
4095 cx.run_until_parked();
4096
4097 let message_2_id = UserMessageId::new();
4098 thread
4099 .update(cx, |thread, cx| {
4100 thread.send(message_2_id.clone(), ["Message 2"], cx)
4101 })
4102 .unwrap();
4103 cx.run_until_parked();
4104 fake_model.send_last_completion_stream_text_chunk("Response 2");
4105 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4106 language_model::TokenUsage {
4107 input_tokens: 250,
4108 output_tokens: 75,
4109 cache_creation_input_tokens: 0,
4110 cache_read_input_tokens: 0,
4111 },
4112 ));
4113 fake_model.end_last_completion_stream();
4114 cx.run_until_parked();
4115
4116 // Verify initial state
4117 thread.read_with(cx, |thread, _| {
4118 assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4119 });
4120
4121 // Truncate at message 2 (removes message 2 and everything after)
4122 thread
4123 .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4124 .unwrap();
4125 cx.run_until_parked();
4126
4127 // After truncation, message_2_id no longer exists, so lookup should return None
4128 thread.read_with(cx, |thread, _| {
4129 assert_eq!(
4130 thread.tokens_before_message(&message_2_id),
4131 None,
4132 "After truncation, message 2 no longer exists"
4133 );
4134 // Message 1 still exists but has no tokens before it
4135 assert_eq!(
4136 thread.tokens_before_message(&message_1_id),
4137 None,
4138 "First message still has no tokens before it"
4139 );
4140 });
4141}
4142
4143#[gpui::test]
4144async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4145 init_test(cx);
4146
4147 let fs = FakeFs::new(cx.executor());
4148 fs.insert_tree("/root", json!({})).await;
4149 let project = Project::test(fs, ["/root".as_ref()], cx).await;
4150
4151 // Test 1: Deny rule blocks command
4152 {
4153 let environment = Rc::new(cx.update(|cx| {
4154 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4155 }));
4156
4157 cx.update(|cx| {
4158 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4159 settings.tool_permissions.tools.insert(
4160 TerminalTool::NAME.into(),
4161 agent_settings::ToolRules {
4162 default: Some(settings::ToolPermissionMode::Confirm),
4163 always_allow: vec![],
4164 always_deny: vec![
4165 agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4166 ],
4167 always_confirm: vec![],
4168 invalid_patterns: vec![],
4169 },
4170 );
4171 agent_settings::AgentSettings::override_global(settings, cx);
4172 });
4173
4174 #[allow(clippy::arc_with_non_send_sync)]
4175 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4176 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4177
4178 let task = cx.update(|cx| {
4179 tool.run(
4180 ToolInput::resolved(crate::TerminalToolInput {
4181 command: "rm -rf /".to_string(),
4182 cd: ".".to_string(),
4183 timeout_ms: None,
4184 }),
4185 event_stream,
4186 cx,
4187 )
4188 });
4189
4190 let result = task.await;
4191 assert!(
4192 result.is_err(),
4193 "expected command to be blocked by deny rule"
4194 );
4195 let err_msg = result.unwrap_err().to_lowercase();
4196 assert!(
4197 err_msg.contains("blocked"),
4198 "error should mention the command was blocked"
4199 );
4200 }
4201
4202 // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4203 {
4204 let environment = Rc::new(cx.update(|cx| {
4205 FakeThreadEnvironment::default()
4206 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4207 }));
4208
4209 cx.update(|cx| {
4210 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4211 settings.tool_permissions.tools.insert(
4212 TerminalTool::NAME.into(),
4213 agent_settings::ToolRules {
4214 default: Some(settings::ToolPermissionMode::Deny),
4215 always_allow: vec![
4216 agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4217 ],
4218 always_deny: vec![],
4219 always_confirm: vec![],
4220 invalid_patterns: vec![],
4221 },
4222 );
4223 agent_settings::AgentSettings::override_global(settings, cx);
4224 });
4225
4226 #[allow(clippy::arc_with_non_send_sync)]
4227 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4228 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4229
4230 let task = cx.update(|cx| {
4231 tool.run(
4232 ToolInput::resolved(crate::TerminalToolInput {
4233 command: "echo hello".to_string(),
4234 cd: ".".to_string(),
4235 timeout_ms: None,
4236 }),
4237 event_stream,
4238 cx,
4239 )
4240 });
4241
4242 let update = rx.expect_update_fields().await;
4243 assert!(
4244 update.content.iter().any(|blocks| {
4245 blocks
4246 .iter()
4247 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4248 }),
4249 "expected terminal content (allow rule should skip confirmation and override default deny)"
4250 );
4251
4252 let result = task.await;
4253 assert!(
4254 result.is_ok(),
4255 "expected command to succeed without confirmation"
4256 );
4257 }
4258
4259 // Test 3: global default: allow does NOT override always_confirm patterns
4260 {
4261 let environment = Rc::new(cx.update(|cx| {
4262 FakeThreadEnvironment::default()
4263 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4264 }));
4265
4266 cx.update(|cx| {
4267 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4268 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4269 settings.tool_permissions.tools.insert(
4270 TerminalTool::NAME.into(),
4271 agent_settings::ToolRules {
4272 default: Some(settings::ToolPermissionMode::Allow),
4273 always_allow: vec![],
4274 always_deny: vec![],
4275 always_confirm: vec![
4276 agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4277 ],
4278 invalid_patterns: vec![],
4279 },
4280 );
4281 agent_settings::AgentSettings::override_global(settings, cx);
4282 });
4283
4284 #[allow(clippy::arc_with_non_send_sync)]
4285 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4286 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4287
4288 let _task = cx.update(|cx| {
4289 tool.run(
4290 ToolInput::resolved(crate::TerminalToolInput {
4291 command: "sudo rm file".to_string(),
4292 cd: ".".to_string(),
4293 timeout_ms: None,
4294 }),
4295 event_stream,
4296 cx,
4297 )
4298 });
4299
4300 // With global default: allow, confirm patterns are still respected
4301 // The expect_authorization() call will panic if no authorization is requested,
4302 // which validates that the confirm pattern still triggers confirmation
4303 let _auth = rx.expect_authorization().await;
4304
4305 drop(_task);
4306 }
4307
4308 // Test 4: tool-specific default: deny is respected even with global default: allow
4309 {
4310 let environment = Rc::new(cx.update(|cx| {
4311 FakeThreadEnvironment::default()
4312 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4313 }));
4314
4315 cx.update(|cx| {
4316 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4317 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4318 settings.tool_permissions.tools.insert(
4319 TerminalTool::NAME.into(),
4320 agent_settings::ToolRules {
4321 default: Some(settings::ToolPermissionMode::Deny),
4322 always_allow: vec![],
4323 always_deny: vec![],
4324 always_confirm: vec![],
4325 invalid_patterns: vec![],
4326 },
4327 );
4328 agent_settings::AgentSettings::override_global(settings, cx);
4329 });
4330
4331 #[allow(clippy::arc_with_non_send_sync)]
4332 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4333 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4334
4335 let task = cx.update(|cx| {
4336 tool.run(
4337 ToolInput::resolved(crate::TerminalToolInput {
4338 command: "echo hello".to_string(),
4339 cd: ".".to_string(),
4340 timeout_ms: None,
4341 }),
4342 event_stream,
4343 cx,
4344 )
4345 });
4346
4347 // tool-specific default: deny is respected even with global default: allow
4348 let result = task.await;
4349 assert!(
4350 result.is_err(),
4351 "expected command to be blocked by tool-specific deny default"
4352 );
4353 let err_msg = result.unwrap_err().to_lowercase();
4354 assert!(
4355 err_msg.contains("disabled"),
4356 "error should mention the tool is disabled, got: {err_msg}"
4357 );
4358 }
4359}
4360
4361#[gpui::test]
4362async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4363 init_test(cx);
4364 cx.update(|cx| {
4365 LanguageModelRegistry::test(cx);
4366 });
4367 cx.update(|cx| {
4368 cx.update_flags(true, vec!["subagents".to_string()]);
4369 });
4370
4371 let fs = FakeFs::new(cx.executor());
4372 fs.insert_tree(
4373 "/",
4374 json!({
4375 "a": {
4376 "b.md": "Lorem"
4377 }
4378 }),
4379 )
4380 .await;
4381 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4382 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4383 let agent = cx.update(|cx| {
4384 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4385 });
4386 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4387
4388 let acp_thread = cx
4389 .update(|cx| {
4390 connection
4391 .clone()
4392 .new_session(project.clone(), Path::new(""), cx)
4393 })
4394 .await
4395 .unwrap();
4396 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4397 let thread = agent.read_with(cx, |agent, _| {
4398 agent.sessions.get(&session_id).unwrap().thread.clone()
4399 });
4400 let model = Arc::new(FakeLanguageModel::default());
4401
4402 // Ensure empty threads are not saved, even if they get mutated.
4403 thread.update(cx, |thread, cx| {
4404 thread.set_model(model.clone(), cx);
4405 });
4406 cx.run_until_parked();
4407
4408 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4409 cx.run_until_parked();
4410 model.send_last_completion_stream_text_chunk("spawning subagent");
4411 let subagent_tool_input = SpawnAgentToolInput {
4412 label: "label".to_string(),
4413 message: "subagent task prompt".to_string(),
4414 session_id: None,
4415 };
4416 let subagent_tool_use = LanguageModelToolUse {
4417 id: "subagent_1".into(),
4418 name: SpawnAgentTool::NAME.into(),
4419 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4420 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4421 is_input_complete: true,
4422 thought_signature: None,
4423 };
4424 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4425 subagent_tool_use,
4426 ));
4427 model.end_last_completion_stream();
4428
4429 cx.run_until_parked();
4430
4431 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4432 thread
4433 .running_subagent_ids(cx)
4434 .get(0)
4435 .expect("subagent thread should be running")
4436 .clone()
4437 });
4438
4439 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4440 agent
4441 .sessions
4442 .get(&subagent_session_id)
4443 .expect("subagent session should exist")
4444 .acp_thread
4445 .clone()
4446 });
4447
4448 model.send_last_completion_stream_text_chunk("subagent task response");
4449 model.end_last_completion_stream();
4450
4451 cx.run_until_parked();
4452
4453 assert_eq!(
4454 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4455 indoc! {"
4456 ## User
4457
4458 subagent task prompt
4459
4460 ## Assistant
4461
4462 subagent task response
4463
4464 "}
4465 );
4466
4467 model.send_last_completion_stream_text_chunk("Response");
4468 model.end_last_completion_stream();
4469
4470 send.await.unwrap();
4471
4472 assert_eq!(
4473 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4474 indoc! {r#"
4475 ## User
4476
4477 Prompt
4478
4479 ## Assistant
4480
4481 spawning subagent
4482
4483 **Tool Call: label**
4484 Status: Completed
4485
4486 subagent task response
4487
4488 ## Assistant
4489
4490 Response
4491
4492 "#},
4493 );
4494}
4495
4496#[gpui::test]
4497async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4498 init_test(cx);
4499 cx.update(|cx| {
4500 LanguageModelRegistry::test(cx);
4501 });
4502 cx.update(|cx| {
4503 cx.update_flags(true, vec!["subagents".to_string()]);
4504 });
4505
4506 let fs = FakeFs::new(cx.executor());
4507 fs.insert_tree(
4508 "/",
4509 json!({
4510 "a": {
4511 "b.md": "Lorem"
4512 }
4513 }),
4514 )
4515 .await;
4516 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4517 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4518 let agent = cx.update(|cx| {
4519 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4520 });
4521 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4522
4523 let acp_thread = cx
4524 .update(|cx| {
4525 connection
4526 .clone()
4527 .new_session(project.clone(), Path::new(""), cx)
4528 })
4529 .await
4530 .unwrap();
4531 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4532 let thread = agent.read_with(cx, |agent, _| {
4533 agent.sessions.get(&session_id).unwrap().thread.clone()
4534 });
4535 let model = Arc::new(FakeLanguageModel::default());
4536
4537 // Ensure empty threads are not saved, even if they get mutated.
4538 thread.update(cx, |thread, cx| {
4539 thread.set_model(model.clone(), cx);
4540 });
4541 cx.run_until_parked();
4542
4543 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4544 cx.run_until_parked();
4545 model.send_last_completion_stream_text_chunk("spawning subagent");
4546 let subagent_tool_input = SpawnAgentToolInput {
4547 label: "label".to_string(),
4548 message: "subagent task prompt".to_string(),
4549 session_id: None,
4550 };
4551 let subagent_tool_use = LanguageModelToolUse {
4552 id: "subagent_1".into(),
4553 name: SpawnAgentTool::NAME.into(),
4554 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4555 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4556 is_input_complete: true,
4557 thought_signature: None,
4558 };
4559 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4560 subagent_tool_use,
4561 ));
4562 model.end_last_completion_stream();
4563
4564 cx.run_until_parked();
4565
4566 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4567 thread
4568 .running_subagent_ids(cx)
4569 .get(0)
4570 .expect("subagent thread should be running")
4571 .clone()
4572 });
4573
4574 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4575 agent
4576 .sessions
4577 .get(&subagent_session_id)
4578 .expect("subagent session should exist")
4579 .acp_thread
4580 .clone()
4581 });
4582
4583 model.send_last_completion_stream_text_chunk("subagent task response 1");
4584 model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4585 text: "thinking more about the subagent task".into(),
4586 signature: None,
4587 });
4588 model.send_last_completion_stream_text_chunk("subagent task response 2");
4589 model.end_last_completion_stream();
4590
4591 cx.run_until_parked();
4592
4593 assert_eq!(
4594 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4595 indoc! {"
4596 ## User
4597
4598 subagent task prompt
4599
4600 ## Assistant
4601
4602 subagent task response 1
4603
4604 <thinking>
4605 thinking more about the subagent task
4606 </thinking>
4607
4608 subagent task response 2
4609
4610 "}
4611 );
4612
4613 model.send_last_completion_stream_text_chunk("Response");
4614 model.end_last_completion_stream();
4615
4616 send.await.unwrap();
4617
4618 assert_eq!(
4619 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4620 indoc! {r#"
4621 ## User
4622
4623 Prompt
4624
4625 ## Assistant
4626
4627 spawning subagent
4628
4629 **Tool Call: label**
4630 Status: Completed
4631
4632 subagent task response 1
4633
4634 subagent task response 2
4635
4636 ## Assistant
4637
4638 Response
4639
4640 "#},
4641 );
4642}
4643
4644#[gpui::test]
4645async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4646 init_test(cx);
4647 cx.update(|cx| {
4648 LanguageModelRegistry::test(cx);
4649 });
4650 cx.update(|cx| {
4651 cx.update_flags(true, vec!["subagents".to_string()]);
4652 });
4653
4654 let fs = FakeFs::new(cx.executor());
4655 fs.insert_tree(
4656 "/",
4657 json!({
4658 "a": {
4659 "b.md": "Lorem"
4660 }
4661 }),
4662 )
4663 .await;
4664 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4665 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4666 let agent = cx.update(|cx| {
4667 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4668 });
4669 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4670
4671 let acp_thread = cx
4672 .update(|cx| {
4673 connection
4674 .clone()
4675 .new_session(project.clone(), Path::new(""), cx)
4676 })
4677 .await
4678 .unwrap();
4679 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4680 let thread = agent.read_with(cx, |agent, _| {
4681 agent.sessions.get(&session_id).unwrap().thread.clone()
4682 });
4683 let model = Arc::new(FakeLanguageModel::default());
4684
4685 // Ensure empty threads are not saved, even if they get mutated.
4686 thread.update(cx, |thread, cx| {
4687 thread.set_model(model.clone(), cx);
4688 });
4689 cx.run_until_parked();
4690
4691 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4692 cx.run_until_parked();
4693 model.send_last_completion_stream_text_chunk("spawning subagent");
4694 let subagent_tool_input = SpawnAgentToolInput {
4695 label: "label".to_string(),
4696 message: "subagent task prompt".to_string(),
4697 session_id: None,
4698 };
4699 let subagent_tool_use = LanguageModelToolUse {
4700 id: "subagent_1".into(),
4701 name: SpawnAgentTool::NAME.into(),
4702 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4703 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4704 is_input_complete: true,
4705 thought_signature: None,
4706 };
4707 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4708 subagent_tool_use,
4709 ));
4710 model.end_last_completion_stream();
4711
4712 cx.run_until_parked();
4713
4714 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4715 thread
4716 .running_subagent_ids(cx)
4717 .get(0)
4718 .expect("subagent thread should be running")
4719 .clone()
4720 });
4721 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4722 agent
4723 .sessions
4724 .get(&subagent_session_id)
4725 .expect("subagent session should exist")
4726 .acp_thread
4727 .clone()
4728 });
4729
4730 // model.send_last_completion_stream_text_chunk("subagent task response");
4731 // model.end_last_completion_stream();
4732
4733 // cx.run_until_parked();
4734
4735 acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4736
4737 cx.run_until_parked();
4738
4739 send.await.unwrap();
4740
4741 acp_thread.read_with(cx, |thread, cx| {
4742 assert_eq!(thread.status(), ThreadStatus::Idle);
4743 assert_eq!(
4744 thread.to_markdown(cx),
4745 indoc! {"
4746 ## User
4747
4748 Prompt
4749
4750 ## Assistant
4751
4752 spawning subagent
4753
4754 **Tool Call: label**
4755 Status: Canceled
4756
4757 "}
4758 );
4759 });
4760 subagent_acp_thread.read_with(cx, |thread, cx| {
4761 assert_eq!(thread.status(), ThreadStatus::Idle);
4762 assert_eq!(
4763 thread.to_markdown(cx),
4764 indoc! {"
4765 ## User
4766
4767 subagent task prompt
4768
4769 "}
4770 );
4771 });
4772}
4773
4774#[gpui::test]
4775async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4776 init_test(cx);
4777 cx.update(|cx| {
4778 LanguageModelRegistry::test(cx);
4779 });
4780 cx.update(|cx| {
4781 cx.update_flags(true, vec!["subagents".to_string()]);
4782 });
4783
4784 let fs = FakeFs::new(cx.executor());
4785 fs.insert_tree(
4786 "/",
4787 json!({
4788 "a": {
4789 "b.md": "Lorem"
4790 }
4791 }),
4792 )
4793 .await;
4794 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4795 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4796 let agent = cx.update(|cx| {
4797 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4798 });
4799 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4800
4801 let acp_thread = cx
4802 .update(|cx| {
4803 connection
4804 .clone()
4805 .new_session(project.clone(), Path::new(""), cx)
4806 })
4807 .await
4808 .unwrap();
4809 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4810 let thread = agent.read_with(cx, |agent, _| {
4811 agent.sessions.get(&session_id).unwrap().thread.clone()
4812 });
4813 let model = Arc::new(FakeLanguageModel::default());
4814
4815 thread.update(cx, |thread, cx| {
4816 thread.set_model(model.clone(), cx);
4817 });
4818 cx.run_until_parked();
4819
4820 // === First turn: create subagent ===
4821 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
4822 cx.run_until_parked();
4823 model.send_last_completion_stream_text_chunk("spawning subagent");
4824 let subagent_tool_input = SpawnAgentToolInput {
4825 label: "initial task".to_string(),
4826 message: "do the first task".to_string(),
4827 session_id: None,
4828 };
4829 let subagent_tool_use = LanguageModelToolUse {
4830 id: "subagent_1".into(),
4831 name: SpawnAgentTool::NAME.into(),
4832 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4833 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4834 is_input_complete: true,
4835 thought_signature: None,
4836 };
4837 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4838 subagent_tool_use,
4839 ));
4840 model.end_last_completion_stream();
4841
4842 cx.run_until_parked();
4843
4844 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4845 thread
4846 .running_subagent_ids(cx)
4847 .get(0)
4848 .expect("subagent thread should be running")
4849 .clone()
4850 });
4851
4852 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4853 agent
4854 .sessions
4855 .get(&subagent_session_id)
4856 .expect("subagent session should exist")
4857 .acp_thread
4858 .clone()
4859 });
4860
4861 // Subagent responds
4862 model.send_last_completion_stream_text_chunk("first task response");
4863 model.end_last_completion_stream();
4864
4865 cx.run_until_parked();
4866
4867 // Parent model responds to complete first turn
4868 model.send_last_completion_stream_text_chunk("First response");
4869 model.end_last_completion_stream();
4870
4871 send.await.unwrap();
4872
4873 // Verify subagent is no longer running
4874 thread.read_with(cx, |thread, cx| {
4875 assert!(
4876 thread.running_subagent_ids(cx).is_empty(),
4877 "subagent should not be running after completion"
4878 );
4879 });
4880
4881 // === Second turn: resume subagent with session_id ===
4882 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
4883 cx.run_until_parked();
4884 model.send_last_completion_stream_text_chunk("resuming subagent");
4885 let resume_tool_input = SpawnAgentToolInput {
4886 label: "follow-up task".to_string(),
4887 message: "do the follow-up task".to_string(),
4888 session_id: Some(subagent_session_id.clone()),
4889 };
4890 let resume_tool_use = LanguageModelToolUse {
4891 id: "subagent_2".into(),
4892 name: SpawnAgentTool::NAME.into(),
4893 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
4894 input: serde_json::to_value(&resume_tool_input).unwrap(),
4895 is_input_complete: true,
4896 thought_signature: None,
4897 };
4898 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
4899 model.end_last_completion_stream();
4900
4901 cx.run_until_parked();
4902
4903 // Subagent should be running again with the same session
4904 thread.read_with(cx, |thread, cx| {
4905 let running = thread.running_subagent_ids(cx);
4906 assert_eq!(running.len(), 1, "subagent should be running");
4907 assert_eq!(running[0], subagent_session_id, "should be same session");
4908 });
4909
4910 // Subagent responds to follow-up
4911 model.send_last_completion_stream_text_chunk("follow-up task response");
4912 model.end_last_completion_stream();
4913
4914 cx.run_until_parked();
4915
4916 // Parent model responds to complete second turn
4917 model.send_last_completion_stream_text_chunk("Second response");
4918 model.end_last_completion_stream();
4919
4920 send2.await.unwrap();
4921
4922 // Verify subagent is no longer running
4923 thread.read_with(cx, |thread, cx| {
4924 assert!(
4925 thread.running_subagent_ids(cx).is_empty(),
4926 "subagent should not be running after resume completion"
4927 );
4928 });
4929
4930 // Verify the subagent's acp thread has both conversation turns
4931 assert_eq!(
4932 subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4933 indoc! {"
4934 ## User
4935
4936 do the first task
4937
4938 ## Assistant
4939
4940 first task response
4941
4942 ## User
4943
4944 do the follow-up task
4945
4946 ## Assistant
4947
4948 follow-up task response
4949
4950 "}
4951 );
4952}
4953
4954#[gpui::test]
4955async fn test_subagent_tool_is_present_when_feature_flag_enabled(cx: &mut TestAppContext) {
4956 init_test(cx);
4957
4958 cx.update(|cx| {
4959 cx.update_flags(true, vec!["subagents".to_string()]);
4960 });
4961
4962 let fs = FakeFs::new(cx.executor());
4963 fs.insert_tree(path!("/test"), json!({})).await;
4964 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4965 let project_context = cx.new(|_cx| ProjectContext::default());
4966 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4967 let context_server_registry =
4968 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4969 let model = Arc::new(FakeLanguageModel::default());
4970
4971 let environment = Rc::new(cx.update(|cx| {
4972 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4973 }));
4974
4975 let thread = cx.new(|cx| {
4976 let mut thread = Thread::new(
4977 project.clone(),
4978 project_context,
4979 context_server_registry,
4980 Templates::new(),
4981 Some(model),
4982 cx,
4983 );
4984 thread.add_default_tools(environment, cx);
4985 thread
4986 });
4987
4988 thread.read_with(cx, |thread, _| {
4989 assert!(
4990 thread.has_registered_tool(SpawnAgentTool::NAME),
4991 "subagent tool should be present when feature flag is enabled"
4992 );
4993 });
4994}
4995
4996#[gpui::test]
4997async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
4998 init_test(cx);
4999
5000 cx.update(|cx| {
5001 cx.update_flags(true, vec!["subagents".to_string()]);
5002 });
5003
5004 let fs = FakeFs::new(cx.executor());
5005 fs.insert_tree(path!("/test"), json!({})).await;
5006 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5007 let project_context = cx.new(|_cx| ProjectContext::default());
5008 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5009 let context_server_registry =
5010 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5011 let model = Arc::new(FakeLanguageModel::default());
5012
5013 let parent_thread = cx.new(|cx| {
5014 Thread::new(
5015 project.clone(),
5016 project_context,
5017 context_server_registry,
5018 Templates::new(),
5019 Some(model.clone()),
5020 cx,
5021 )
5022 });
5023
5024 let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5025 subagent_thread.read_with(cx, |subagent_thread, cx| {
5026 assert!(subagent_thread.is_subagent());
5027 assert_eq!(subagent_thread.depth(), 1);
5028 assert_eq!(
5029 subagent_thread.model().map(|model| model.id()),
5030 Some(model.id())
5031 );
5032 assert_eq!(
5033 subagent_thread.parent_thread_id(),
5034 Some(parent_thread.read(cx).id().clone())
5035 );
5036 });
5037}
5038
5039#[gpui::test]
5040async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5041 init_test(cx);
5042
5043 cx.update(|cx| {
5044 cx.update_flags(true, vec!["subagents".to_string()]);
5045 });
5046
5047 let fs = FakeFs::new(cx.executor());
5048 fs.insert_tree(path!("/test"), json!({})).await;
5049 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5050 let project_context = cx.new(|_cx| ProjectContext::default());
5051 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5052 let context_server_registry =
5053 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5054 let model = Arc::new(FakeLanguageModel::default());
5055 let environment = Rc::new(cx.update(|cx| {
5056 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5057 }));
5058
5059 let deep_parent_thread = cx.new(|cx| {
5060 let mut thread = Thread::new(
5061 project.clone(),
5062 project_context,
5063 context_server_registry,
5064 Templates::new(),
5065 Some(model.clone()),
5066 cx,
5067 );
5068 thread.set_subagent_context(SubagentContext {
5069 parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5070 depth: MAX_SUBAGENT_DEPTH - 1,
5071 });
5072 thread
5073 });
5074 let deep_subagent_thread = cx.new(|cx| {
5075 let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5076 thread.add_default_tools(environment, cx);
5077 thread
5078 });
5079
5080 deep_subagent_thread.read_with(cx, |thread, _| {
5081 assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5082 assert!(
5083 !thread.has_registered_tool(SpawnAgentTool::NAME),
5084 "subagent tool should not be present at max depth"
5085 );
5086 });
5087}
5088
5089#[gpui::test]
5090async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5091 init_test(cx);
5092
5093 cx.update(|cx| {
5094 cx.update_flags(true, vec!["subagents".to_string()]);
5095 });
5096
5097 let fs = FakeFs::new(cx.executor());
5098 fs.insert_tree(path!("/test"), json!({})).await;
5099 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5100 let project_context = cx.new(|_cx| ProjectContext::default());
5101 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5102 let context_server_registry =
5103 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5104 let model = Arc::new(FakeLanguageModel::default());
5105
5106 let parent = cx.new(|cx| {
5107 Thread::new(
5108 project.clone(),
5109 project_context.clone(),
5110 context_server_registry.clone(),
5111 Templates::new(),
5112 Some(model.clone()),
5113 cx,
5114 )
5115 });
5116
5117 let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5118
5119 parent.update(cx, |thread, _cx| {
5120 thread.register_running_subagent(subagent.downgrade());
5121 });
5122
5123 subagent
5124 .update(cx, |thread, cx| {
5125 thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5126 })
5127 .unwrap();
5128 cx.run_until_parked();
5129
5130 subagent.read_with(cx, |thread, _| {
5131 assert!(!thread.is_turn_complete(), "subagent should be running");
5132 });
5133
5134 parent.update(cx, |thread, cx| {
5135 thread.cancel(cx).detach();
5136 });
5137
5138 subagent.read_with(cx, |thread, _| {
5139 assert!(
5140 thread.is_turn_complete(),
5141 "subagent should be cancelled when parent cancels"
5142 );
5143 });
5144}
5145
5146#[gpui::test]
5147async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5148 init_test(cx);
5149 cx.update(|cx| {
5150 LanguageModelRegistry::test(cx);
5151 });
5152 cx.update(|cx| {
5153 cx.update_flags(true, vec!["subagents".to_string()]);
5154 });
5155
5156 let fs = FakeFs::new(cx.executor());
5157 fs.insert_tree(
5158 "/",
5159 json!({
5160 "a": {
5161 "b.md": "Lorem"
5162 }
5163 }),
5164 )
5165 .await;
5166 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5167 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5168 let agent = cx.update(|cx| {
5169 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5170 });
5171 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5172
5173 let acp_thread = cx
5174 .update(|cx| {
5175 connection
5176 .clone()
5177 .new_session(project.clone(), Path::new(""), cx)
5178 })
5179 .await
5180 .unwrap();
5181 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5182 let thread = agent.read_with(cx, |agent, _| {
5183 agent.sessions.get(&session_id).unwrap().thread.clone()
5184 });
5185 let model = Arc::new(FakeLanguageModel::default());
5186
5187 thread.update(cx, |thread, cx| {
5188 thread.set_model(model.clone(), cx);
5189 });
5190 cx.run_until_parked();
5191
5192 // Start the parent turn
5193 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5194 cx.run_until_parked();
5195 model.send_last_completion_stream_text_chunk("spawning subagent");
5196 let subagent_tool_input = SpawnAgentToolInput {
5197 label: "label".to_string(),
5198 message: "subagent task prompt".to_string(),
5199 session_id: None,
5200 };
5201 let subagent_tool_use = LanguageModelToolUse {
5202 id: "subagent_1".into(),
5203 name: SpawnAgentTool::NAME.into(),
5204 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5205 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5206 is_input_complete: true,
5207 thought_signature: None,
5208 };
5209 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5210 subagent_tool_use,
5211 ));
5212 model.end_last_completion_stream();
5213
5214 cx.run_until_parked();
5215
5216 // Verify subagent is running
5217 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5218 thread
5219 .running_subagent_ids(cx)
5220 .get(0)
5221 .expect("subagent thread should be running")
5222 .clone()
5223 });
5224
5225 // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5226 model.send_last_completion_stream_text_chunk("partial work");
5227 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5228 TokenUsage {
5229 input_tokens: 850_000,
5230 output_tokens: 0,
5231 cache_creation_input_tokens: 0,
5232 cache_read_input_tokens: 0,
5233 },
5234 ));
5235
5236 cx.run_until_parked();
5237
5238 // The subagent should no longer be running
5239 thread.read_with(cx, |thread, cx| {
5240 assert!(
5241 thread.running_subagent_ids(cx).is_empty(),
5242 "subagent should be stopped after context window warning"
5243 );
5244 });
5245
5246 // The parent model should get a new completion request to respond to the tool error
5247 model.send_last_completion_stream_text_chunk("Response after warning");
5248 model.end_last_completion_stream();
5249
5250 send.await.unwrap();
5251
5252 // Verify the parent thread shows the warning error in the tool call
5253 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5254 assert!(
5255 markdown.contains("nearing the end of its context window"),
5256 "tool output should contain context window warning message, got:\n{markdown}"
5257 );
5258 assert!(
5259 markdown.contains("Status: Failed"),
5260 "tool call should have Failed status, got:\n{markdown}"
5261 );
5262
5263 // Verify the subagent session still exists (can be resumed)
5264 agent.read_with(cx, |agent, _cx| {
5265 assert!(
5266 agent.sessions.contains_key(&subagent_session_id),
5267 "subagent session should still exist for potential resume"
5268 );
5269 });
5270}
5271
5272#[gpui::test]
5273async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5274 init_test(cx);
5275 cx.update(|cx| {
5276 LanguageModelRegistry::test(cx);
5277 });
5278 cx.update(|cx| {
5279 cx.update_flags(true, vec!["subagents".to_string()]);
5280 });
5281
5282 let fs = FakeFs::new(cx.executor());
5283 fs.insert_tree(
5284 "/",
5285 json!({
5286 "a": {
5287 "b.md": "Lorem"
5288 }
5289 }),
5290 )
5291 .await;
5292 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5293 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5294 let agent = cx.update(|cx| {
5295 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5296 });
5297 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5298
5299 let acp_thread = cx
5300 .update(|cx| {
5301 connection
5302 .clone()
5303 .new_session(project.clone(), Path::new(""), cx)
5304 })
5305 .await
5306 .unwrap();
5307 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5308 let thread = agent.read_with(cx, |agent, _| {
5309 agent.sessions.get(&session_id).unwrap().thread.clone()
5310 });
5311 let model = Arc::new(FakeLanguageModel::default());
5312
5313 thread.update(cx, |thread, cx| {
5314 thread.set_model(model.clone(), cx);
5315 });
5316 cx.run_until_parked();
5317
5318 // === First turn: create subagent, trigger context window warning ===
5319 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5320 cx.run_until_parked();
5321 model.send_last_completion_stream_text_chunk("spawning subagent");
5322 let subagent_tool_input = SpawnAgentToolInput {
5323 label: "initial task".to_string(),
5324 message: "do the first task".to_string(),
5325 session_id: None,
5326 };
5327 let subagent_tool_use = LanguageModelToolUse {
5328 id: "subagent_1".into(),
5329 name: SpawnAgentTool::NAME.into(),
5330 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5331 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5332 is_input_complete: true,
5333 thought_signature: None,
5334 };
5335 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5336 subagent_tool_use,
5337 ));
5338 model.end_last_completion_stream();
5339
5340 cx.run_until_parked();
5341
5342 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5343 thread
5344 .running_subagent_ids(cx)
5345 .get(0)
5346 .expect("subagent thread should be running")
5347 .clone()
5348 });
5349
5350 // Subagent sends a usage update that crosses the warning threshold.
5351 // This triggers Normal→Warning, stopping the subagent.
5352 model.send_last_completion_stream_text_chunk("partial work");
5353 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5354 TokenUsage {
5355 input_tokens: 850_000,
5356 output_tokens: 0,
5357 cache_creation_input_tokens: 0,
5358 cache_read_input_tokens: 0,
5359 },
5360 ));
5361
5362 cx.run_until_parked();
5363
5364 // Verify the first turn was stopped with a context window warning
5365 thread.read_with(cx, |thread, cx| {
5366 assert!(
5367 thread.running_subagent_ids(cx).is_empty(),
5368 "subagent should be stopped after context window warning"
5369 );
5370 });
5371
5372 // Parent model responds to complete first turn
5373 model.send_last_completion_stream_text_chunk("First response");
5374 model.end_last_completion_stream();
5375
5376 send.await.unwrap();
5377
5378 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5379 assert!(
5380 markdown.contains("nearing the end of its context window"),
5381 "first turn should have context window warning, got:\n{markdown}"
5382 );
5383
5384 // === Second turn: resume the same subagent (now at Warning level) ===
5385 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5386 cx.run_until_parked();
5387 model.send_last_completion_stream_text_chunk("resuming subagent");
5388 let resume_tool_input = SpawnAgentToolInput {
5389 label: "follow-up task".to_string(),
5390 message: "do the follow-up task".to_string(),
5391 session_id: Some(subagent_session_id.clone()),
5392 };
5393 let resume_tool_use = LanguageModelToolUse {
5394 id: "subagent_2".into(),
5395 name: SpawnAgentTool::NAME.into(),
5396 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5397 input: serde_json::to_value(&resume_tool_input).unwrap(),
5398 is_input_complete: true,
5399 thought_signature: None,
5400 };
5401 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5402 model.end_last_completion_stream();
5403
5404 cx.run_until_parked();
5405
5406 // Subagent responds with tokens still at warning level (no worse).
5407 // Since ratio_before_prompt was already Warning, this should NOT
5408 // trigger the context window warning again.
5409 model.send_last_completion_stream_text_chunk("follow-up task response");
5410 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5411 TokenUsage {
5412 input_tokens: 870_000,
5413 output_tokens: 0,
5414 cache_creation_input_tokens: 0,
5415 cache_read_input_tokens: 0,
5416 },
5417 ));
5418 model.end_last_completion_stream();
5419
5420 cx.run_until_parked();
5421
5422 // Parent model responds to complete second turn
5423 model.send_last_completion_stream_text_chunk("Second response");
5424 model.end_last_completion_stream();
5425
5426 send2.await.unwrap();
5427
5428 // The resumed subagent should have completed normally since the ratio
5429 // didn't transition (it was Warning before and stayed at Warning)
5430 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5431 assert!(
5432 markdown.contains("follow-up task response"),
5433 "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5434 );
5435 // The second tool call should NOT have a context window warning
5436 let second_tool_pos = markdown
5437 .find("follow-up task")
5438 .expect("should find follow-up tool call");
5439 let after_second_tool = &markdown[second_tool_pos..];
5440 assert!(
5441 !after_second_tool.contains("nearing the end of its context window"),
5442 "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5443 );
5444}
5445
5446#[gpui::test]
5447async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5448 init_test(cx);
5449 cx.update(|cx| {
5450 LanguageModelRegistry::test(cx);
5451 });
5452 cx.update(|cx| {
5453 cx.update_flags(true, vec!["subagents".to_string()]);
5454 });
5455
5456 let fs = FakeFs::new(cx.executor());
5457 fs.insert_tree(
5458 "/",
5459 json!({
5460 "a": {
5461 "b.md": "Lorem"
5462 }
5463 }),
5464 )
5465 .await;
5466 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5467 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5468 let agent = cx.update(|cx| {
5469 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5470 });
5471 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5472
5473 let acp_thread = cx
5474 .update(|cx| {
5475 connection
5476 .clone()
5477 .new_session(project.clone(), Path::new(""), cx)
5478 })
5479 .await
5480 .unwrap();
5481 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5482 let thread = agent.read_with(cx, |agent, _| {
5483 agent.sessions.get(&session_id).unwrap().thread.clone()
5484 });
5485 let model = Arc::new(FakeLanguageModel::default());
5486
5487 thread.update(cx, |thread, cx| {
5488 thread.set_model(model.clone(), cx);
5489 });
5490 cx.run_until_parked();
5491
5492 // Start the parent turn
5493 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5494 cx.run_until_parked();
5495 model.send_last_completion_stream_text_chunk("spawning subagent");
5496 let subagent_tool_input = SpawnAgentToolInput {
5497 label: "label".to_string(),
5498 message: "subagent task prompt".to_string(),
5499 session_id: None,
5500 };
5501 let subagent_tool_use = LanguageModelToolUse {
5502 id: "subagent_1".into(),
5503 name: SpawnAgentTool::NAME.into(),
5504 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5505 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5506 is_input_complete: true,
5507 thought_signature: None,
5508 };
5509 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5510 subagent_tool_use,
5511 ));
5512 model.end_last_completion_stream();
5513
5514 cx.run_until_parked();
5515
5516 // Verify subagent is running
5517 thread.read_with(cx, |thread, cx| {
5518 assert!(
5519 !thread.running_subagent_ids(cx).is_empty(),
5520 "subagent should be running"
5521 );
5522 });
5523
5524 // The subagent's model returns a non-retryable error
5525 model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5526 tokens: None,
5527 });
5528
5529 cx.run_until_parked();
5530
5531 // The subagent should no longer be running
5532 thread.read_with(cx, |thread, cx| {
5533 assert!(
5534 thread.running_subagent_ids(cx).is_empty(),
5535 "subagent should not be running after error"
5536 );
5537 });
5538
5539 // The parent model should get a new completion request to respond to the tool error
5540 model.send_last_completion_stream_text_chunk("Response after error");
5541 model.end_last_completion_stream();
5542
5543 send.await.unwrap();
5544
5545 // Verify the parent thread shows the error in the tool call
5546 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5547 assert!(
5548 markdown.contains("Status: Failed"),
5549 "tool call should have Failed status after model error, got:\n{markdown}"
5550 );
5551}
5552
5553#[gpui::test]
5554async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5555 init_test(cx);
5556
5557 let fs = FakeFs::new(cx.executor());
5558 fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5559 .await;
5560 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5561
5562 cx.update(|cx| {
5563 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5564 settings.tool_permissions.tools.insert(
5565 EditFileTool::NAME.into(),
5566 agent_settings::ToolRules {
5567 default: Some(settings::ToolPermissionMode::Allow),
5568 always_allow: vec![],
5569 always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5570 always_confirm: vec![],
5571 invalid_patterns: vec![],
5572 },
5573 );
5574 agent_settings::AgentSettings::override_global(settings, cx);
5575 });
5576
5577 let context_server_registry =
5578 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5579 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5580 let templates = crate::Templates::new();
5581 let thread = cx.new(|cx| {
5582 crate::Thread::new(
5583 project.clone(),
5584 cx.new(|_cx| prompt_store::ProjectContext::default()),
5585 context_server_registry,
5586 templates.clone(),
5587 None,
5588 cx,
5589 )
5590 });
5591
5592 #[allow(clippy::arc_with_non_send_sync)]
5593 let tool = Arc::new(crate::EditFileTool::new(
5594 project.clone(),
5595 thread.downgrade(),
5596 language_registry,
5597 templates,
5598 ));
5599 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5600
5601 let task = cx.update(|cx| {
5602 tool.run(
5603 ToolInput::resolved(crate::EditFileToolInput {
5604 display_description: "Edit sensitive file".to_string(),
5605 path: "root/sensitive_config.txt".into(),
5606 mode: crate::EditFileMode::Edit,
5607 }),
5608 event_stream,
5609 cx,
5610 )
5611 });
5612
5613 let result = task.await;
5614 assert!(result.is_err(), "expected edit to be blocked");
5615 assert!(
5616 result.unwrap_err().to_string().contains("blocked"),
5617 "error should mention the edit was blocked"
5618 );
5619}
5620
5621#[gpui::test]
5622async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5623 init_test(cx);
5624
5625 let fs = FakeFs::new(cx.executor());
5626 fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5627 .await;
5628 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5629
5630 cx.update(|cx| {
5631 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5632 settings.tool_permissions.tools.insert(
5633 DeletePathTool::NAME.into(),
5634 agent_settings::ToolRules {
5635 default: Some(settings::ToolPermissionMode::Allow),
5636 always_allow: vec![],
5637 always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5638 always_confirm: vec![],
5639 invalid_patterns: vec![],
5640 },
5641 );
5642 agent_settings::AgentSettings::override_global(settings, cx);
5643 });
5644
5645 let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5646
5647 #[allow(clippy::arc_with_non_send_sync)]
5648 let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5649 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5650
5651 let task = cx.update(|cx| {
5652 tool.run(
5653 ToolInput::resolved(crate::DeletePathToolInput {
5654 path: "root/important_data.txt".to_string(),
5655 }),
5656 event_stream,
5657 cx,
5658 )
5659 });
5660
5661 let result = task.await;
5662 assert!(result.is_err(), "expected deletion to be blocked");
5663 assert!(
5664 result.unwrap_err().contains("blocked"),
5665 "error should mention the deletion was blocked"
5666 );
5667}
5668
5669#[gpui::test]
5670async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5671 init_test(cx);
5672
5673 let fs = FakeFs::new(cx.executor());
5674 fs.insert_tree(
5675 "/root",
5676 json!({
5677 "safe.txt": "content",
5678 "protected": {}
5679 }),
5680 )
5681 .await;
5682 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5683
5684 cx.update(|cx| {
5685 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5686 settings.tool_permissions.tools.insert(
5687 MovePathTool::NAME.into(),
5688 agent_settings::ToolRules {
5689 default: Some(settings::ToolPermissionMode::Allow),
5690 always_allow: vec![],
5691 always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5692 always_confirm: vec![],
5693 invalid_patterns: vec![],
5694 },
5695 );
5696 agent_settings::AgentSettings::override_global(settings, cx);
5697 });
5698
5699 #[allow(clippy::arc_with_non_send_sync)]
5700 let tool = Arc::new(crate::MovePathTool::new(project));
5701 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5702
5703 let task = cx.update(|cx| {
5704 tool.run(
5705 ToolInput::resolved(crate::MovePathToolInput {
5706 source_path: "root/safe.txt".to_string(),
5707 destination_path: "root/protected/safe.txt".to_string(),
5708 }),
5709 event_stream,
5710 cx,
5711 )
5712 });
5713
5714 let result = task.await;
5715 assert!(
5716 result.is_err(),
5717 "expected move to be blocked due to destination path"
5718 );
5719 assert!(
5720 result.unwrap_err().contains("blocked"),
5721 "error should mention the move was blocked"
5722 );
5723}
5724
5725#[gpui::test]
5726async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5727 init_test(cx);
5728
5729 let fs = FakeFs::new(cx.executor());
5730 fs.insert_tree(
5731 "/root",
5732 json!({
5733 "secret.txt": "secret content",
5734 "public": {}
5735 }),
5736 )
5737 .await;
5738 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5739
5740 cx.update(|cx| {
5741 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5742 settings.tool_permissions.tools.insert(
5743 MovePathTool::NAME.into(),
5744 agent_settings::ToolRules {
5745 default: Some(settings::ToolPermissionMode::Allow),
5746 always_allow: vec![],
5747 always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5748 always_confirm: vec![],
5749 invalid_patterns: vec![],
5750 },
5751 );
5752 agent_settings::AgentSettings::override_global(settings, cx);
5753 });
5754
5755 #[allow(clippy::arc_with_non_send_sync)]
5756 let tool = Arc::new(crate::MovePathTool::new(project));
5757 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5758
5759 let task = cx.update(|cx| {
5760 tool.run(
5761 ToolInput::resolved(crate::MovePathToolInput {
5762 source_path: "root/secret.txt".to_string(),
5763 destination_path: "root/public/not_secret.txt".to_string(),
5764 }),
5765 event_stream,
5766 cx,
5767 )
5768 });
5769
5770 let result = task.await;
5771 assert!(
5772 result.is_err(),
5773 "expected move to be blocked due to source path"
5774 );
5775 assert!(
5776 result.unwrap_err().contains("blocked"),
5777 "error should mention the move was blocked"
5778 );
5779}
5780
5781#[gpui::test]
5782async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5783 init_test(cx);
5784
5785 let fs = FakeFs::new(cx.executor());
5786 fs.insert_tree(
5787 "/root",
5788 json!({
5789 "confidential.txt": "confidential data",
5790 "dest": {}
5791 }),
5792 )
5793 .await;
5794 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5795
5796 cx.update(|cx| {
5797 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5798 settings.tool_permissions.tools.insert(
5799 CopyPathTool::NAME.into(),
5800 agent_settings::ToolRules {
5801 default: Some(settings::ToolPermissionMode::Allow),
5802 always_allow: vec![],
5803 always_deny: vec![
5804 agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5805 ],
5806 always_confirm: vec![],
5807 invalid_patterns: vec![],
5808 },
5809 );
5810 agent_settings::AgentSettings::override_global(settings, cx);
5811 });
5812
5813 #[allow(clippy::arc_with_non_send_sync)]
5814 let tool = Arc::new(crate::CopyPathTool::new(project));
5815 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5816
5817 let task = cx.update(|cx| {
5818 tool.run(
5819 ToolInput::resolved(crate::CopyPathToolInput {
5820 source_path: "root/confidential.txt".to_string(),
5821 destination_path: "root/dest/copy.txt".to_string(),
5822 }),
5823 event_stream,
5824 cx,
5825 )
5826 });
5827
5828 let result = task.await;
5829 assert!(result.is_err(), "expected copy to be blocked");
5830 assert!(
5831 result.unwrap_err().contains("blocked"),
5832 "error should mention the copy was blocked"
5833 );
5834}
5835
5836#[gpui::test]
5837async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5838 init_test(cx);
5839
5840 let fs = FakeFs::new(cx.executor());
5841 fs.insert_tree(
5842 "/root",
5843 json!({
5844 "normal.txt": "normal content",
5845 "readonly": {
5846 "config.txt": "readonly content"
5847 }
5848 }),
5849 )
5850 .await;
5851 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5852
5853 cx.update(|cx| {
5854 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5855 settings.tool_permissions.tools.insert(
5856 SaveFileTool::NAME.into(),
5857 agent_settings::ToolRules {
5858 default: Some(settings::ToolPermissionMode::Allow),
5859 always_allow: vec![],
5860 always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
5861 always_confirm: vec![],
5862 invalid_patterns: vec![],
5863 },
5864 );
5865 agent_settings::AgentSettings::override_global(settings, cx);
5866 });
5867
5868 #[allow(clippy::arc_with_non_send_sync)]
5869 let tool = Arc::new(crate::SaveFileTool::new(project));
5870 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5871
5872 let task = cx.update(|cx| {
5873 tool.run(
5874 ToolInput::resolved(crate::SaveFileToolInput {
5875 paths: vec![
5876 std::path::PathBuf::from("root/normal.txt"),
5877 std::path::PathBuf::from("root/readonly/config.txt"),
5878 ],
5879 }),
5880 event_stream,
5881 cx,
5882 )
5883 });
5884
5885 let result = task.await;
5886 assert!(
5887 result.is_err(),
5888 "expected save to be blocked due to denied path"
5889 );
5890 assert!(
5891 result.unwrap_err().contains("blocked"),
5892 "error should mention the save was blocked"
5893 );
5894}
5895
5896#[gpui::test]
5897async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
5898 init_test(cx);
5899
5900 let fs = FakeFs::new(cx.executor());
5901 fs.insert_tree("/root", json!({"config.secret": "secret config"}))
5902 .await;
5903 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5904
5905 cx.update(|cx| {
5906 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5907 settings.tool_permissions.tools.insert(
5908 SaveFileTool::NAME.into(),
5909 agent_settings::ToolRules {
5910 default: Some(settings::ToolPermissionMode::Allow),
5911 always_allow: vec![],
5912 always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
5913 always_confirm: vec![],
5914 invalid_patterns: vec![],
5915 },
5916 );
5917 agent_settings::AgentSettings::override_global(settings, cx);
5918 });
5919
5920 #[allow(clippy::arc_with_non_send_sync)]
5921 let tool = Arc::new(crate::SaveFileTool::new(project));
5922 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5923
5924 let task = cx.update(|cx| {
5925 tool.run(
5926 ToolInput::resolved(crate::SaveFileToolInput {
5927 paths: vec![std::path::PathBuf::from("root/config.secret")],
5928 }),
5929 event_stream,
5930 cx,
5931 )
5932 });
5933
5934 let result = task.await;
5935 assert!(result.is_err(), "expected save to be blocked");
5936 assert!(
5937 result.unwrap_err().contains("blocked"),
5938 "error should mention the save was blocked"
5939 );
5940}
5941
5942#[gpui::test]
5943async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
5944 init_test(cx);
5945
5946 cx.update(|cx| {
5947 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5948 settings.tool_permissions.tools.insert(
5949 WebSearchTool::NAME.into(),
5950 agent_settings::ToolRules {
5951 default: Some(settings::ToolPermissionMode::Allow),
5952 always_allow: vec![],
5953 always_deny: vec![
5954 agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
5955 ],
5956 always_confirm: vec![],
5957 invalid_patterns: vec![],
5958 },
5959 );
5960 agent_settings::AgentSettings::override_global(settings, cx);
5961 });
5962
5963 #[allow(clippy::arc_with_non_send_sync)]
5964 let tool = Arc::new(crate::WebSearchTool);
5965 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5966
5967 let input: crate::WebSearchToolInput =
5968 serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
5969
5970 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
5971
5972 let result = task.await;
5973 assert!(result.is_err(), "expected search to be blocked");
5974 match result.unwrap_err() {
5975 crate::WebSearchToolOutput::Error { error } => {
5976 assert!(
5977 error.contains("blocked"),
5978 "error should mention the search was blocked"
5979 );
5980 }
5981 other => panic!("expected Error variant, got: {other:?}"),
5982 }
5983}
5984
5985#[gpui::test]
5986async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
5987 init_test(cx);
5988
5989 let fs = FakeFs::new(cx.executor());
5990 fs.insert_tree("/root", json!({"README.md": "# Hello"}))
5991 .await;
5992 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5993
5994 cx.update(|cx| {
5995 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5996 settings.tool_permissions.tools.insert(
5997 EditFileTool::NAME.into(),
5998 agent_settings::ToolRules {
5999 default: Some(settings::ToolPermissionMode::Confirm),
6000 always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6001 always_deny: vec![],
6002 always_confirm: vec![],
6003 invalid_patterns: vec![],
6004 },
6005 );
6006 agent_settings::AgentSettings::override_global(settings, cx);
6007 });
6008
6009 let context_server_registry =
6010 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6011 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6012 let templates = crate::Templates::new();
6013 let thread = cx.new(|cx| {
6014 crate::Thread::new(
6015 project.clone(),
6016 cx.new(|_cx| prompt_store::ProjectContext::default()),
6017 context_server_registry,
6018 templates.clone(),
6019 None,
6020 cx,
6021 )
6022 });
6023
6024 #[allow(clippy::arc_with_non_send_sync)]
6025 let tool = Arc::new(crate::EditFileTool::new(
6026 project,
6027 thread.downgrade(),
6028 language_registry,
6029 templates,
6030 ));
6031 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6032
6033 let _task = cx.update(|cx| {
6034 tool.run(
6035 ToolInput::resolved(crate::EditFileToolInput {
6036 display_description: "Edit README".to_string(),
6037 path: "root/README.md".into(),
6038 mode: crate::EditFileMode::Edit,
6039 }),
6040 event_stream,
6041 cx,
6042 )
6043 });
6044
6045 cx.run_until_parked();
6046
6047 let event = rx.try_next();
6048 assert!(
6049 !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6050 "expected no authorization request for allowed .md file"
6051 );
6052}
6053
6054#[gpui::test]
6055async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6056 init_test(cx);
6057
6058 let fs = FakeFs::new(cx.executor());
6059 fs.insert_tree(
6060 "/root",
6061 json!({
6062 ".zed": {
6063 "settings.json": "{}"
6064 },
6065 "README.md": "# Hello"
6066 }),
6067 )
6068 .await;
6069 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6070
6071 cx.update(|cx| {
6072 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6073 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6074 agent_settings::AgentSettings::override_global(settings, cx);
6075 });
6076
6077 let context_server_registry =
6078 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6079 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6080 let templates = crate::Templates::new();
6081 let thread = cx.new(|cx| {
6082 crate::Thread::new(
6083 project.clone(),
6084 cx.new(|_cx| prompt_store::ProjectContext::default()),
6085 context_server_registry,
6086 templates.clone(),
6087 None,
6088 cx,
6089 )
6090 });
6091
6092 #[allow(clippy::arc_with_non_send_sync)]
6093 let tool = Arc::new(crate::EditFileTool::new(
6094 project,
6095 thread.downgrade(),
6096 language_registry,
6097 templates,
6098 ));
6099
6100 // Editing a file inside .zed/ should still prompt even with global default: allow,
6101 // because local settings paths are sensitive and require confirmation regardless.
6102 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6103 let _task = cx.update(|cx| {
6104 tool.run(
6105 ToolInput::resolved(crate::EditFileToolInput {
6106 display_description: "Edit local settings".to_string(),
6107 path: "root/.zed/settings.json".into(),
6108 mode: crate::EditFileMode::Edit,
6109 }),
6110 event_stream,
6111 cx,
6112 )
6113 });
6114
6115 let _update = rx.expect_update_fields().await;
6116 let _auth = rx.expect_authorization().await;
6117}
6118
6119#[gpui::test]
6120async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6121 init_test(cx);
6122
6123 cx.update(|cx| {
6124 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6125 settings.tool_permissions.tools.insert(
6126 FetchTool::NAME.into(),
6127 agent_settings::ToolRules {
6128 default: Some(settings::ToolPermissionMode::Allow),
6129 always_allow: vec![],
6130 always_deny: vec![
6131 agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6132 ],
6133 always_confirm: vec![],
6134 invalid_patterns: vec![],
6135 },
6136 );
6137 agent_settings::AgentSettings::override_global(settings, cx);
6138 });
6139
6140 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6141
6142 #[allow(clippy::arc_with_non_send_sync)]
6143 let tool = Arc::new(crate::FetchTool::new(http_client));
6144 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6145
6146 let input: crate::FetchToolInput =
6147 serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6148
6149 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6150
6151 let result = task.await;
6152 assert!(result.is_err(), "expected fetch to be blocked");
6153 assert!(
6154 result.unwrap_err().contains("blocked"),
6155 "error should mention the fetch was blocked"
6156 );
6157}
6158
6159#[gpui::test]
6160async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6161 init_test(cx);
6162
6163 cx.update(|cx| {
6164 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6165 settings.tool_permissions.tools.insert(
6166 FetchTool::NAME.into(),
6167 agent_settings::ToolRules {
6168 default: Some(settings::ToolPermissionMode::Confirm),
6169 always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6170 always_deny: vec![],
6171 always_confirm: vec![],
6172 invalid_patterns: vec![],
6173 },
6174 );
6175 agent_settings::AgentSettings::override_global(settings, cx);
6176 });
6177
6178 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6179
6180 #[allow(clippy::arc_with_non_send_sync)]
6181 let tool = Arc::new(crate::FetchTool::new(http_client));
6182 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6183
6184 let input: crate::FetchToolInput =
6185 serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6186
6187 let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6188
6189 cx.run_until_parked();
6190
6191 let event = rx.try_next();
6192 assert!(
6193 !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6194 "expected no authorization request for allowed docs.rs URL"
6195 );
6196}
6197
6198#[gpui::test]
6199async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6200 init_test(cx);
6201 always_allow_tools(cx);
6202
6203 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6204 let fake_model = model.as_fake();
6205
6206 // Add a tool so we can simulate tool calls
6207 thread.update(cx, |thread, _cx| {
6208 thread.add_tool(EchoTool);
6209 });
6210
6211 // Start a turn by sending a message
6212 let mut events = thread
6213 .update(cx, |thread, cx| {
6214 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6215 })
6216 .unwrap();
6217 cx.run_until_parked();
6218
6219 // Simulate the model making a tool call
6220 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6221 LanguageModelToolUse {
6222 id: "tool_1".into(),
6223 name: "echo".into(),
6224 raw_input: r#"{"text": "hello"}"#.into(),
6225 input: json!({"text": "hello"}),
6226 is_input_complete: true,
6227 thought_signature: None,
6228 },
6229 ));
6230 fake_model
6231 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6232
6233 // Signal that a message is queued before ending the stream
6234 thread.update(cx, |thread, _cx| {
6235 thread.set_has_queued_message(true);
6236 });
6237
6238 // Now end the stream - tool will run, and the boundary check should see the queue
6239 fake_model.end_last_completion_stream();
6240
6241 // Collect all events until the turn stops
6242 let all_events = collect_events_until_stop(&mut events, cx).await;
6243
6244 // Verify we received the tool call event
6245 let tool_call_ids: Vec<_> = all_events
6246 .iter()
6247 .filter_map(|e| match e {
6248 Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6249 _ => None,
6250 })
6251 .collect();
6252 assert_eq!(
6253 tool_call_ids,
6254 vec!["tool_1"],
6255 "Should have received a tool call event for our echo tool"
6256 );
6257
6258 // The turn should have stopped with EndTurn
6259 let stop_reasons = stop_events(all_events);
6260 assert_eq!(
6261 stop_reasons,
6262 vec![acp::StopReason::EndTurn],
6263 "Turn should have ended after tool completion due to queued message"
6264 );
6265
6266 // Verify the queued message flag is still set
6267 thread.update(cx, |thread, _cx| {
6268 assert!(
6269 thread.has_queued_message(),
6270 "Should still have queued message flag set"
6271 );
6272 });
6273
6274 // Thread should be idle now
6275 thread.update(cx, |thread, _cx| {
6276 assert!(
6277 thread.is_turn_complete(),
6278 "Thread should not be running after turn ends"
6279 );
6280 });
6281}
6282
6283#[gpui::test]
6284async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6285 init_test(cx);
6286 always_allow_tools(cx);
6287
6288 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6289 let fake_model = model.as_fake();
6290
6291 thread.update(cx, |thread, _cx| {
6292 thread.add_tool(StreamingFailingEchoTool {
6293 receive_chunks_until_failure: 1,
6294 });
6295 });
6296
6297 let _events = thread
6298 .update(cx, |thread, cx| {
6299 thread.send(
6300 UserMessageId::new(),
6301 ["Use the streaming_failing_echo tool"],
6302 cx,
6303 )
6304 })
6305 .unwrap();
6306 cx.run_until_parked();
6307
6308 let tool_use = LanguageModelToolUse {
6309 id: "call_1".into(),
6310 name: StreamingFailingEchoTool::NAME.into(),
6311 raw_input: "hello".into(),
6312 input: json!({}),
6313 is_input_complete: false,
6314 thought_signature: None,
6315 };
6316
6317 fake_model
6318 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6319
6320 cx.run_until_parked();
6321
6322 let completions = fake_model.pending_completions();
6323 let last_completion = completions.last().unwrap();
6324
6325 assert_eq!(
6326 last_completion.messages[1..],
6327 vec![
6328 LanguageModelRequestMessage {
6329 role: Role::User,
6330 content: vec!["Use the streaming_failing_echo tool".into()],
6331 cache: false,
6332 reasoning_details: None,
6333 },
6334 LanguageModelRequestMessage {
6335 role: Role::Assistant,
6336 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6337 cache: false,
6338 reasoning_details: None,
6339 },
6340 LanguageModelRequestMessage {
6341 role: Role::User,
6342 content: vec![language_model::MessageContent::ToolResult(
6343 LanguageModelToolResult {
6344 tool_use_id: tool_use.id.clone(),
6345 tool_name: tool_use.name,
6346 is_error: true,
6347 content: "failed".into(),
6348 output: Some("failed".into()),
6349 }
6350 )],
6351 cache: true,
6352 reasoning_details: None,
6353 },
6354 ]
6355 );
6356}
6357
6358#[gpui::test]
6359async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6360 init_test(cx);
6361 always_allow_tools(cx);
6362
6363 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6364 let fake_model = model.as_fake();
6365
6366 let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6367 oneshot::channel();
6368
6369 thread.update(cx, |thread, _cx| {
6370 thread.add_tool(
6371 StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6372 );
6373 thread.add_tool(StreamingFailingEchoTool {
6374 receive_chunks_until_failure: 1,
6375 });
6376 });
6377
6378 let _events = thread
6379 .update(cx, |thread, cx| {
6380 thread.send(
6381 UserMessageId::new(),
6382 ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6383 cx,
6384 )
6385 })
6386 .unwrap();
6387 cx.run_until_parked();
6388
6389 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6390 LanguageModelToolUse {
6391 id: "call_1".into(),
6392 name: StreamingEchoTool::NAME.into(),
6393 raw_input: "hello".into(),
6394 input: json!({ "text": "hello" }),
6395 is_input_complete: false,
6396 thought_signature: None,
6397 },
6398 ));
6399 let first_tool_use = LanguageModelToolUse {
6400 id: "call_1".into(),
6401 name: StreamingEchoTool::NAME.into(),
6402 raw_input: "hello world".into(),
6403 input: json!({ "text": "hello world" }),
6404 is_input_complete: true,
6405 thought_signature: None,
6406 };
6407 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6408 first_tool_use.clone(),
6409 ));
6410 let second_tool_use = LanguageModelToolUse {
6411 name: StreamingFailingEchoTool::NAME.into(),
6412 raw_input: "hello".into(),
6413 input: json!({ "text": "hello" }),
6414 is_input_complete: false,
6415 thought_signature: None,
6416 id: "call_2".into(),
6417 };
6418 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6419 second_tool_use.clone(),
6420 ));
6421
6422 cx.run_until_parked();
6423
6424 complete_streaming_echo_tool_call_tx.send(()).unwrap();
6425
6426 cx.run_until_parked();
6427
6428 let completions = fake_model.pending_completions();
6429 let last_completion = completions.last().unwrap();
6430
6431 assert_eq!(
6432 last_completion.messages[1..],
6433 vec![
6434 LanguageModelRequestMessage {
6435 role: Role::User,
6436 content: vec![
6437 "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6438 ],
6439 cache: false,
6440 reasoning_details: None,
6441 },
6442 LanguageModelRequestMessage {
6443 role: Role::Assistant,
6444 content: vec![
6445 language_model::MessageContent::ToolUse(first_tool_use.clone()),
6446 language_model::MessageContent::ToolUse(second_tool_use.clone())
6447 ],
6448 cache: false,
6449 reasoning_details: None,
6450 },
6451 LanguageModelRequestMessage {
6452 role: Role::User,
6453 content: vec![
6454 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6455 tool_use_id: second_tool_use.id.clone(),
6456 tool_name: second_tool_use.name,
6457 is_error: true,
6458 content: "failed".into(),
6459 output: Some("failed".into()),
6460 }),
6461 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6462 tool_use_id: first_tool_use.id.clone(),
6463 tool_name: first_tool_use.name,
6464 is_error: false,
6465 content: "hello world".into(),
6466 output: Some("hello world".into()),
6467 }),
6468 ],
6469 cache: true,
6470 reasoning_details: None,
6471 },
6472 ]
6473 );
6474}