1use super::*;
2use acp_thread::{
3 AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
4 UserMessageId,
5};
6use agent_client_protocol::{self as acp};
7use agent_settings::AgentProfileId;
8use anyhow::Result;
9use client::{Client, UserStore};
10use cloud_llm_client::CompletionIntent;
11use collections::IndexMap;
12use context_server::{ContextServer, ContextServerCommand, ContextServerId};
13use feature_flags::FeatureFlagAppExt as _;
14use fs::{FakeFs, Fs};
15use futures::{
16 FutureExt as _, StreamExt,
17 channel::{
18 mpsc::{self, UnboundedReceiver},
19 oneshot,
20 },
21 future::{Fuse, Shared},
22};
23use gpui::{
24 App, AppContext, AsyncApp, Entity, Task, TestAppContext, UpdateGlobal,
25 http_client::FakeHttpClient,
26};
27use indoc::indoc;
28use language_model::{
29 LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
30 LanguageModelProviderName, LanguageModelRegistry, LanguageModelRequest,
31 LanguageModelRequestMessage, LanguageModelToolResult, LanguageModelToolSchemaFormat,
32 LanguageModelToolUse, MessageContent, Role, StopReason, TokenUsage,
33 fake_provider::FakeLanguageModel,
34};
35use pretty_assertions::assert_eq;
36use project::{
37 Project, context_server_store::ContextServerStore, project_settings::ProjectSettings,
38};
39use prompt_store::ProjectContext;
40use reqwest_client::ReqwestClient;
41use schemars::JsonSchema;
42use serde::{Deserialize, Serialize};
43use serde_json::json;
44use settings::{Settings, SettingsStore};
45use std::{
46 path::Path,
47 pin::Pin,
48 rc::Rc,
49 sync::{
50 Arc,
51 atomic::{AtomicBool, AtomicUsize, Ordering},
52 },
53 time::Duration,
54};
55use util::path;
56
57mod edit_file_thread_test;
58mod test_tools;
59use test_tools::*;
60
61pub(crate) fn init_test(cx: &mut TestAppContext) {
62 cx.update(|cx| {
63 let settings_store = SettingsStore::test(cx);
64 cx.set_global(settings_store);
65 });
66}
67
68pub(crate) struct FakeTerminalHandle {
69 killed: Arc<AtomicBool>,
70 stopped_by_user: Arc<AtomicBool>,
71 exit_sender: std::cell::RefCell<Option<futures::channel::oneshot::Sender<()>>>,
72 wait_for_exit: Shared<Task<acp::TerminalExitStatus>>,
73 output: acp::TerminalOutputResponse,
74 id: acp::TerminalId,
75}
76
77impl FakeTerminalHandle {
78 pub(crate) fn new_never_exits(cx: &mut App) -> Self {
79 let killed = Arc::new(AtomicBool::new(false));
80 let stopped_by_user = Arc::new(AtomicBool::new(false));
81
82 let (exit_sender, exit_receiver) = futures::channel::oneshot::channel();
83
84 let wait_for_exit = cx
85 .spawn(async move |_cx| {
86 // Wait for the exit signal (sent when kill() is called)
87 let _ = exit_receiver.await;
88 acp::TerminalExitStatus::new()
89 })
90 .shared();
91
92 Self {
93 killed,
94 stopped_by_user,
95 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
96 wait_for_exit,
97 output: acp::TerminalOutputResponse::new("partial output".to_string(), false),
98 id: acp::TerminalId::new("fake_terminal".to_string()),
99 }
100 }
101
102 pub(crate) fn new_with_immediate_exit(cx: &mut App, exit_code: u32) -> Self {
103 let killed = Arc::new(AtomicBool::new(false));
104 let stopped_by_user = Arc::new(AtomicBool::new(false));
105 let (exit_sender, _exit_receiver) = futures::channel::oneshot::channel();
106
107 let wait_for_exit = cx
108 .spawn(async move |_cx| acp::TerminalExitStatus::new().exit_code(exit_code))
109 .shared();
110
111 Self {
112 killed,
113 stopped_by_user,
114 exit_sender: std::cell::RefCell::new(Some(exit_sender)),
115 wait_for_exit,
116 output: acp::TerminalOutputResponse::new("command output".to_string(), false),
117 id: acp::TerminalId::new("fake_terminal".to_string()),
118 }
119 }
120
121 pub(crate) fn was_killed(&self) -> bool {
122 self.killed.load(Ordering::SeqCst)
123 }
124
125 pub(crate) fn set_stopped_by_user(&self, stopped: bool) {
126 self.stopped_by_user.store(stopped, Ordering::SeqCst);
127 }
128
129 pub(crate) fn signal_exit(&self) {
130 if let Some(sender) = self.exit_sender.borrow_mut().take() {
131 let _ = sender.send(());
132 }
133 }
134}
135
136impl crate::TerminalHandle for FakeTerminalHandle {
137 fn id(&self, _cx: &AsyncApp) -> Result<acp::TerminalId> {
138 Ok(self.id.clone())
139 }
140
141 fn current_output(&self, _cx: &AsyncApp) -> Result<acp::TerminalOutputResponse> {
142 Ok(self.output.clone())
143 }
144
145 fn wait_for_exit(&self, _cx: &AsyncApp) -> Result<Shared<Task<acp::TerminalExitStatus>>> {
146 Ok(self.wait_for_exit.clone())
147 }
148
149 fn kill(&self, _cx: &AsyncApp) -> Result<()> {
150 self.killed.store(true, Ordering::SeqCst);
151 self.signal_exit();
152 Ok(())
153 }
154
155 fn was_stopped_by_user(&self, _cx: &AsyncApp) -> Result<bool> {
156 Ok(self.stopped_by_user.load(Ordering::SeqCst))
157 }
158}
159
160struct FakeSubagentHandle {
161 session_id: acp::SessionId,
162 send_task: Shared<Task<String>>,
163}
164
165impl SubagentHandle for FakeSubagentHandle {
166 fn id(&self) -> acp::SessionId {
167 self.session_id.clone()
168 }
169
170 fn num_entries(&self, _cx: &App) -> usize {
171 unimplemented!()
172 }
173
174 fn send(&self, _message: String, cx: &AsyncApp) -> Task<Result<String>> {
175 let task = self.send_task.clone();
176 cx.background_spawn(async move { Ok(task.await) })
177 }
178}
179
180#[derive(Default)]
181pub(crate) struct FakeThreadEnvironment {
182 terminal_handle: Option<Rc<FakeTerminalHandle>>,
183 subagent_handle: Option<Rc<FakeSubagentHandle>>,
184 terminal_creations: Arc<AtomicUsize>,
185}
186
187impl FakeThreadEnvironment {
188 pub(crate) fn with_terminal(self, terminal_handle: FakeTerminalHandle) -> Self {
189 Self {
190 terminal_handle: Some(terminal_handle.into()),
191 ..self
192 }
193 }
194
195 pub(crate) fn terminal_creation_count(&self) -> usize {
196 self.terminal_creations.load(Ordering::SeqCst)
197 }
198}
199
200impl crate::ThreadEnvironment for FakeThreadEnvironment {
201 fn create_terminal(
202 &self,
203 _command: String,
204 _cwd: Option<std::path::PathBuf>,
205 _output_byte_limit: Option<u64>,
206 _cx: &mut AsyncApp,
207 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
208 self.terminal_creations.fetch_add(1, Ordering::SeqCst);
209 let handle = self
210 .terminal_handle
211 .clone()
212 .expect("Terminal handle not available on FakeThreadEnvironment");
213 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
214 }
215
216 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
217 Ok(self
218 .subagent_handle
219 .clone()
220 .expect("Subagent handle not available on FakeThreadEnvironment")
221 as Rc<dyn SubagentHandle>)
222 }
223}
224
225/// Environment that creates multiple independent terminal handles for testing concurrent terminals.
226struct MultiTerminalEnvironment {
227 handles: std::cell::RefCell<Vec<Rc<FakeTerminalHandle>>>,
228}
229
230impl MultiTerminalEnvironment {
231 fn new() -> Self {
232 Self {
233 handles: std::cell::RefCell::new(Vec::new()),
234 }
235 }
236
237 fn handles(&self) -> Vec<Rc<FakeTerminalHandle>> {
238 self.handles.borrow().clone()
239 }
240}
241
242impl crate::ThreadEnvironment for MultiTerminalEnvironment {
243 fn create_terminal(
244 &self,
245 _command: String,
246 _cwd: Option<std::path::PathBuf>,
247 _output_byte_limit: Option<u64>,
248 cx: &mut AsyncApp,
249 ) -> Task<Result<Rc<dyn crate::TerminalHandle>>> {
250 let handle = Rc::new(cx.update(|cx| FakeTerminalHandle::new_never_exits(cx)));
251 self.handles.borrow_mut().push(handle.clone());
252 Task::ready(Ok(handle as Rc<dyn crate::TerminalHandle>))
253 }
254
255 fn create_subagent(&self, _label: String, _cx: &mut App) -> Result<Rc<dyn SubagentHandle>> {
256 unimplemented!()
257 }
258}
259
260fn always_allow_tools(cx: &mut TestAppContext) {
261 cx.update(|cx| {
262 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
263 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
264 agent_settings::AgentSettings::override_global(settings, cx);
265 });
266}
267
268#[gpui::test]
269async fn test_echo(cx: &mut TestAppContext) {
270 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
271 let fake_model = model.as_fake();
272
273 let events = thread
274 .update(cx, |thread, cx| {
275 thread.send(UserMessageId::new(), ["Testing: Reply with 'Hello'"], cx)
276 })
277 .unwrap();
278 cx.run_until_parked();
279 fake_model.send_last_completion_stream_text_chunk("Hello");
280 fake_model
281 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
282 fake_model.end_last_completion_stream();
283
284 let events = events.collect().await;
285 thread.update(cx, |thread, _cx| {
286 assert_eq!(
287 thread.last_received_or_pending_message().unwrap().role(),
288 Role::Assistant
289 );
290 assert_eq!(
291 thread
292 .last_received_or_pending_message()
293 .unwrap()
294 .to_markdown(),
295 "Hello\n"
296 )
297 });
298 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
299}
300
301#[gpui::test]
302async fn test_terminal_tool_timeout_kills_handle(cx: &mut TestAppContext) {
303 init_test(cx);
304 always_allow_tools(cx);
305
306 let fs = FakeFs::new(cx.executor());
307 let project = Project::test(fs, [], cx).await;
308
309 let environment = Rc::new(cx.update(|cx| {
310 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
311 }));
312 let handle = environment.terminal_handle.clone().unwrap();
313
314 #[allow(clippy::arc_with_non_send_sync)]
315 let tool = Arc::new(crate::TerminalTool::new(project, environment));
316 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
317
318 let task = cx.update(|cx| {
319 tool.run(
320 ToolInput::resolved(crate::TerminalToolInput {
321 command: "sleep 1000".to_string(),
322 cd: ".".to_string(),
323 timeout_ms: Some(5),
324 }),
325 event_stream,
326 cx,
327 )
328 });
329
330 let update = rx.expect_update_fields().await;
331 assert!(
332 update.content.iter().any(|blocks| {
333 blocks
334 .iter()
335 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
336 }),
337 "expected tool call update to include terminal content"
338 );
339
340 let mut task_future: Pin<Box<Fuse<Task<Result<String, String>>>>> = Box::pin(task.fuse());
341
342 let deadline = std::time::Instant::now() + Duration::from_millis(500);
343 loop {
344 if let Some(result) = task_future.as_mut().now_or_never() {
345 let result = result.expect("terminal tool task should complete");
346
347 assert!(
348 handle.was_killed(),
349 "expected terminal handle to be killed on timeout"
350 );
351 assert!(
352 result.contains("partial output"),
353 "expected result to include terminal output, got: {result}"
354 );
355 return;
356 }
357
358 if std::time::Instant::now() >= deadline {
359 panic!("timed out waiting for terminal tool task to complete");
360 }
361
362 cx.run_until_parked();
363 cx.background_executor.timer(Duration::from_millis(1)).await;
364 }
365}
366
367#[gpui::test]
368#[ignore]
369async fn test_terminal_tool_without_timeout_does_not_kill_handle(cx: &mut TestAppContext) {
370 init_test(cx);
371 always_allow_tools(cx);
372
373 let fs = FakeFs::new(cx.executor());
374 let project = Project::test(fs, [], cx).await;
375
376 let environment = Rc::new(cx.update(|cx| {
377 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
378 }));
379 let handle = environment.terminal_handle.clone().unwrap();
380
381 #[allow(clippy::arc_with_non_send_sync)]
382 let tool = Arc::new(crate::TerminalTool::new(project, environment));
383 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
384
385 let _task = cx.update(|cx| {
386 tool.run(
387 ToolInput::resolved(crate::TerminalToolInput {
388 command: "sleep 1000".to_string(),
389 cd: ".".to_string(),
390 timeout_ms: None,
391 }),
392 event_stream,
393 cx,
394 )
395 });
396
397 let update = rx.expect_update_fields().await;
398 assert!(
399 update.content.iter().any(|blocks| {
400 blocks
401 .iter()
402 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
403 }),
404 "expected tool call update to include terminal content"
405 );
406
407 cx.background_executor
408 .timer(Duration::from_millis(25))
409 .await;
410
411 assert!(
412 !handle.was_killed(),
413 "did not expect terminal handle to be killed without a timeout"
414 );
415}
416
417#[gpui::test]
418async fn test_thinking(cx: &mut TestAppContext) {
419 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
420 let fake_model = model.as_fake();
421
422 let events = thread
423 .update(cx, |thread, cx| {
424 thread.send(
425 UserMessageId::new(),
426 [indoc! {"
427 Testing:
428
429 Generate a thinking step where you just think the word 'Think',
430 and have your final answer be 'Hello'
431 "}],
432 cx,
433 )
434 })
435 .unwrap();
436 cx.run_until_parked();
437 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
438 text: "Think".to_string(),
439 signature: None,
440 });
441 fake_model.send_last_completion_stream_text_chunk("Hello");
442 fake_model
443 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
444 fake_model.end_last_completion_stream();
445
446 let events = events.collect().await;
447 thread.update(cx, |thread, _cx| {
448 assert_eq!(
449 thread.last_received_or_pending_message().unwrap().role(),
450 Role::Assistant
451 );
452 assert_eq!(
453 thread
454 .last_received_or_pending_message()
455 .unwrap()
456 .to_markdown(),
457 indoc! {"
458 <think>Think</think>
459 Hello
460 "}
461 )
462 });
463 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
464}
465
466#[gpui::test]
467async fn test_system_prompt(cx: &mut TestAppContext) {
468 let ThreadTest {
469 model,
470 thread,
471 project_context,
472 ..
473 } = setup(cx, TestModel::Fake).await;
474 let fake_model = model.as_fake();
475
476 project_context.update(cx, |project_context, _cx| {
477 project_context.shell = "test-shell".into()
478 });
479 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
480 thread
481 .update(cx, |thread, cx| {
482 thread.send(UserMessageId::new(), ["abc"], cx)
483 })
484 .unwrap();
485 cx.run_until_parked();
486 let mut pending_completions = fake_model.pending_completions();
487 assert_eq!(
488 pending_completions.len(),
489 1,
490 "unexpected pending completions: {:?}",
491 pending_completions
492 );
493
494 let pending_completion = pending_completions.pop().unwrap();
495 assert_eq!(pending_completion.messages[0].role, Role::System);
496
497 let system_message = &pending_completion.messages[0];
498 let system_prompt = system_message.content[0].to_str().unwrap();
499 assert!(
500 system_prompt.contains("test-shell"),
501 "unexpected system message: {:?}",
502 system_message
503 );
504 assert!(
505 system_prompt.contains("## Fixing Diagnostics"),
506 "unexpected system message: {:?}",
507 system_message
508 );
509}
510
511#[gpui::test]
512async fn test_system_prompt_without_tools(cx: &mut TestAppContext) {
513 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
514 let fake_model = model.as_fake();
515
516 thread
517 .update(cx, |thread, cx| {
518 thread.send(UserMessageId::new(), ["abc"], cx)
519 })
520 .unwrap();
521 cx.run_until_parked();
522 let mut pending_completions = fake_model.pending_completions();
523 assert_eq!(
524 pending_completions.len(),
525 1,
526 "unexpected pending completions: {:?}",
527 pending_completions
528 );
529
530 let pending_completion = pending_completions.pop().unwrap();
531 assert_eq!(pending_completion.messages[0].role, Role::System);
532
533 let system_message = &pending_completion.messages[0];
534 let system_prompt = system_message.content[0].to_str().unwrap();
535 assert!(
536 !system_prompt.contains("## Tool Use"),
537 "unexpected system message: {:?}",
538 system_message
539 );
540 assert!(
541 !system_prompt.contains("## Fixing Diagnostics"),
542 "unexpected system message: {:?}",
543 system_message
544 );
545}
546
547#[gpui::test]
548async fn test_prompt_caching(cx: &mut TestAppContext) {
549 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
550 let fake_model = model.as_fake();
551
552 // Send initial user message and verify it's cached
553 thread
554 .update(cx, |thread, cx| {
555 thread.send(UserMessageId::new(), ["Message 1"], cx)
556 })
557 .unwrap();
558 cx.run_until_parked();
559
560 let completion = fake_model.pending_completions().pop().unwrap();
561 assert_eq!(
562 completion.messages[1..],
563 vec![LanguageModelRequestMessage {
564 role: Role::User,
565 content: vec!["Message 1".into()],
566 cache: true,
567 reasoning_details: None,
568 }]
569 );
570 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
571 "Response to Message 1".into(),
572 ));
573 fake_model.end_last_completion_stream();
574 cx.run_until_parked();
575
576 // Send another user message and verify only the latest is cached
577 thread
578 .update(cx, |thread, cx| {
579 thread.send(UserMessageId::new(), ["Message 2"], cx)
580 })
581 .unwrap();
582 cx.run_until_parked();
583
584 let completion = fake_model.pending_completions().pop().unwrap();
585 assert_eq!(
586 completion.messages[1..],
587 vec![
588 LanguageModelRequestMessage {
589 role: Role::User,
590 content: vec!["Message 1".into()],
591 cache: false,
592 reasoning_details: None,
593 },
594 LanguageModelRequestMessage {
595 role: Role::Assistant,
596 content: vec!["Response to Message 1".into()],
597 cache: false,
598 reasoning_details: None,
599 },
600 LanguageModelRequestMessage {
601 role: Role::User,
602 content: vec!["Message 2".into()],
603 cache: true,
604 reasoning_details: None,
605 }
606 ]
607 );
608 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::Text(
609 "Response to Message 2".into(),
610 ));
611 fake_model.end_last_completion_stream();
612 cx.run_until_parked();
613
614 // Simulate a tool call and verify that the latest tool result is cached
615 thread.update(cx, |thread, _| thread.add_tool(EchoTool));
616 thread
617 .update(cx, |thread, cx| {
618 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
619 })
620 .unwrap();
621 cx.run_until_parked();
622
623 let tool_use = LanguageModelToolUse {
624 id: "tool_1".into(),
625 name: EchoTool::NAME.into(),
626 raw_input: json!({"text": "test"}).to_string(),
627 input: json!({"text": "test"}),
628 is_input_complete: true,
629 thought_signature: None,
630 };
631 fake_model
632 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
633 fake_model.end_last_completion_stream();
634 cx.run_until_parked();
635
636 let completion = fake_model.pending_completions().pop().unwrap();
637 let tool_result = LanguageModelToolResult {
638 tool_use_id: "tool_1".into(),
639 tool_name: EchoTool::NAME.into(),
640 is_error: false,
641 content: "test".into(),
642 output: Some("test".into()),
643 };
644 assert_eq!(
645 completion.messages[1..],
646 vec![
647 LanguageModelRequestMessage {
648 role: Role::User,
649 content: vec!["Message 1".into()],
650 cache: false,
651 reasoning_details: None,
652 },
653 LanguageModelRequestMessage {
654 role: Role::Assistant,
655 content: vec!["Response to Message 1".into()],
656 cache: false,
657 reasoning_details: None,
658 },
659 LanguageModelRequestMessage {
660 role: Role::User,
661 content: vec!["Message 2".into()],
662 cache: false,
663 reasoning_details: None,
664 },
665 LanguageModelRequestMessage {
666 role: Role::Assistant,
667 content: vec!["Response to Message 2".into()],
668 cache: false,
669 reasoning_details: None,
670 },
671 LanguageModelRequestMessage {
672 role: Role::User,
673 content: vec!["Use the echo tool".into()],
674 cache: false,
675 reasoning_details: None,
676 },
677 LanguageModelRequestMessage {
678 role: Role::Assistant,
679 content: vec![MessageContent::ToolUse(tool_use)],
680 cache: false,
681 reasoning_details: None,
682 },
683 LanguageModelRequestMessage {
684 role: Role::User,
685 content: vec![MessageContent::ToolResult(tool_result)],
686 cache: true,
687 reasoning_details: None,
688 }
689 ]
690 );
691}
692
693#[gpui::test]
694#[cfg_attr(not(feature = "e2e"), ignore)]
695async fn test_basic_tool_calls(cx: &mut TestAppContext) {
696 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
697
698 // Test a tool call that's likely to complete *before* streaming stops.
699 let events = thread
700 .update(cx, |thread, cx| {
701 thread.add_tool(EchoTool);
702 thread.send(
703 UserMessageId::new(),
704 ["Now test the echo tool with 'Hello'. Does it work? Say 'Yes' or 'No'."],
705 cx,
706 )
707 })
708 .unwrap()
709 .collect()
710 .await;
711 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
712
713 // Test a tool calls that's likely to complete *after* streaming stops.
714 let events = thread
715 .update(cx, |thread, cx| {
716 thread.remove_tool(&EchoTool::NAME);
717 thread.add_tool(DelayTool);
718 thread.send(
719 UserMessageId::new(),
720 [
721 "Now call the delay tool with 200ms.",
722 "When the timer goes off, then you echo the output of the tool.",
723 ],
724 cx,
725 )
726 })
727 .unwrap()
728 .collect()
729 .await;
730 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
731 thread.update(cx, |thread, _cx| {
732 assert!(
733 thread
734 .last_received_or_pending_message()
735 .unwrap()
736 .as_agent_message()
737 .unwrap()
738 .content
739 .iter()
740 .any(|content| {
741 if let AgentMessageContent::Text(text) = content {
742 text.contains("Ding")
743 } else {
744 false
745 }
746 }),
747 "{}",
748 thread.to_markdown()
749 );
750 });
751}
752
753#[gpui::test]
754#[cfg_attr(not(feature = "e2e"), ignore)]
755async fn test_streaming_tool_calls(cx: &mut TestAppContext) {
756 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
757
758 // Test a tool call that's likely to complete *before* streaming stops.
759 let mut events = thread
760 .update(cx, |thread, cx| {
761 thread.add_tool(WordListTool);
762 thread.send(UserMessageId::new(), ["Test the word_list tool."], cx)
763 })
764 .unwrap();
765
766 let mut saw_partial_tool_use = false;
767 while let Some(event) = events.next().await {
768 if let Ok(ThreadEvent::ToolCall(tool_call)) = event {
769 thread.update(cx, |thread, _cx| {
770 // Look for a tool use in the thread's last message
771 let message = thread.last_received_or_pending_message().unwrap();
772 let agent_message = message.as_agent_message().unwrap();
773 let last_content = agent_message.content.last().unwrap();
774 if let AgentMessageContent::ToolUse(last_tool_use) = last_content {
775 assert_eq!(last_tool_use.name.as_ref(), "word_list");
776 if tool_call.status == acp::ToolCallStatus::Pending {
777 if !last_tool_use.is_input_complete
778 && last_tool_use.input.get("g").is_none()
779 {
780 saw_partial_tool_use = true;
781 }
782 } else {
783 last_tool_use
784 .input
785 .get("a")
786 .expect("'a' has streamed because input is now complete");
787 last_tool_use
788 .input
789 .get("g")
790 .expect("'g' has streamed because input is now complete");
791 }
792 } else {
793 panic!("last content should be a tool use");
794 }
795 });
796 }
797 }
798
799 assert!(
800 saw_partial_tool_use,
801 "should see at least one partially streamed tool use in the history"
802 );
803}
804
805#[gpui::test]
806async fn test_tool_authorization(cx: &mut TestAppContext) {
807 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
808 let fake_model = model.as_fake();
809
810 let mut events = thread
811 .update(cx, |thread, cx| {
812 thread.add_tool(ToolRequiringPermission);
813 thread.send(UserMessageId::new(), ["abc"], cx)
814 })
815 .unwrap();
816 cx.run_until_parked();
817 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
818 LanguageModelToolUse {
819 id: "tool_id_1".into(),
820 name: ToolRequiringPermission::NAME.into(),
821 raw_input: "{}".into(),
822 input: json!({}),
823 is_input_complete: true,
824 thought_signature: None,
825 },
826 ));
827 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
828 LanguageModelToolUse {
829 id: "tool_id_2".into(),
830 name: ToolRequiringPermission::NAME.into(),
831 raw_input: "{}".into(),
832 input: json!({}),
833 is_input_complete: true,
834 thought_signature: None,
835 },
836 ));
837 fake_model.end_last_completion_stream();
838 let tool_call_auth_1 = next_tool_call_authorization(&mut events).await;
839 let tool_call_auth_2 = next_tool_call_authorization(&mut events).await;
840
841 // Approve the first - send "allow" option_id (UI transforms "once" to "allow")
842 tool_call_auth_1
843 .response
844 .send(acp::PermissionOptionId::new("allow").into())
845 .unwrap();
846 cx.run_until_parked();
847
848 // Reject the second - send "deny" option_id directly since Deny is now a button
849 tool_call_auth_2
850 .response
851 .send(acp::PermissionOptionId::new("deny").into())
852 .unwrap();
853 cx.run_until_parked();
854
855 let completion = fake_model.pending_completions().pop().unwrap();
856 let message = completion.messages.last().unwrap();
857 assert_eq!(
858 message.content,
859 vec![
860 language_model::MessageContent::ToolResult(LanguageModelToolResult {
861 tool_use_id: tool_call_auth_1.tool_call.tool_call_id.0.to_string().into(),
862 tool_name: ToolRequiringPermission::NAME.into(),
863 is_error: false,
864 content: "Allowed".into(),
865 output: Some("Allowed".into())
866 }),
867 language_model::MessageContent::ToolResult(LanguageModelToolResult {
868 tool_use_id: tool_call_auth_2.tool_call.tool_call_id.0.to_string().into(),
869 tool_name: ToolRequiringPermission::NAME.into(),
870 is_error: true,
871 content: "Permission to run tool denied by user".into(),
872 output: Some("Permission to run tool denied by user".into())
873 })
874 ]
875 );
876
877 // Simulate yet another tool call.
878 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
879 LanguageModelToolUse {
880 id: "tool_id_3".into(),
881 name: ToolRequiringPermission::NAME.into(),
882 raw_input: "{}".into(),
883 input: json!({}),
884 is_input_complete: true,
885 thought_signature: None,
886 },
887 ));
888 fake_model.end_last_completion_stream();
889
890 // Respond by always allowing tools - send transformed option_id
891 // (UI transforms "always:tool_requiring_permission" to "always_allow:tool_requiring_permission")
892 let tool_call_auth_3 = next_tool_call_authorization(&mut events).await;
893 tool_call_auth_3
894 .response
895 .send(acp::PermissionOptionId::new("always_allow:tool_requiring_permission").into())
896 .unwrap();
897 cx.run_until_parked();
898 let completion = fake_model.pending_completions().pop().unwrap();
899 let message = completion.messages.last().unwrap();
900 assert_eq!(
901 message.content,
902 vec![language_model::MessageContent::ToolResult(
903 LanguageModelToolResult {
904 tool_use_id: tool_call_auth_3.tool_call.tool_call_id.0.to_string().into(),
905 tool_name: ToolRequiringPermission::NAME.into(),
906 is_error: false,
907 content: "Allowed".into(),
908 output: Some("Allowed".into())
909 }
910 )]
911 );
912
913 // Simulate a final tool call, ensuring we don't trigger authorization.
914 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
915 LanguageModelToolUse {
916 id: "tool_id_4".into(),
917 name: ToolRequiringPermission::NAME.into(),
918 raw_input: "{}".into(),
919 input: json!({}),
920 is_input_complete: true,
921 thought_signature: None,
922 },
923 ));
924 fake_model.end_last_completion_stream();
925 cx.run_until_parked();
926 let completion = fake_model.pending_completions().pop().unwrap();
927 let message = completion.messages.last().unwrap();
928 assert_eq!(
929 message.content,
930 vec![language_model::MessageContent::ToolResult(
931 LanguageModelToolResult {
932 tool_use_id: "tool_id_4".into(),
933 tool_name: ToolRequiringPermission::NAME.into(),
934 is_error: false,
935 content: "Allowed".into(),
936 output: Some("Allowed".into())
937 }
938 )]
939 );
940}
941
942#[gpui::test]
943async fn test_tool_hallucination(cx: &mut TestAppContext) {
944 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
945 let fake_model = model.as_fake();
946
947 let mut events = thread
948 .update(cx, |thread, cx| {
949 thread.send(UserMessageId::new(), ["abc"], cx)
950 })
951 .unwrap();
952 cx.run_until_parked();
953 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
954 LanguageModelToolUse {
955 id: "tool_id_1".into(),
956 name: "nonexistent_tool".into(),
957 raw_input: "{}".into(),
958 input: json!({}),
959 is_input_complete: true,
960 thought_signature: None,
961 },
962 ));
963 fake_model.end_last_completion_stream();
964
965 let tool_call = expect_tool_call(&mut events).await;
966 assert_eq!(tool_call.title, "nonexistent_tool");
967 assert_eq!(tool_call.status, acp::ToolCallStatus::Pending);
968 let update = expect_tool_call_update_fields(&mut events).await;
969 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
970}
971
972async fn expect_tool_call(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::ToolCall {
973 let event = events
974 .next()
975 .await
976 .expect("no tool call authorization event received")
977 .unwrap();
978 match event {
979 ThreadEvent::ToolCall(tool_call) => tool_call,
980 event => {
981 panic!("Unexpected event {event:?}");
982 }
983 }
984}
985
986async fn expect_tool_call_update_fields(
987 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
988) -> acp::ToolCallUpdate {
989 let event = events
990 .next()
991 .await
992 .expect("no tool call authorization event received")
993 .unwrap();
994 match event {
995 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update)) => update,
996 event => {
997 panic!("Unexpected event {event:?}");
998 }
999 }
1000}
1001
1002async fn expect_plan(events: &mut UnboundedReceiver<Result<ThreadEvent>>) -> acp::Plan {
1003 let event = events
1004 .next()
1005 .await
1006 .expect("no plan event received")
1007 .unwrap();
1008 match event {
1009 ThreadEvent::Plan(plan) => plan,
1010 event => {
1011 panic!("Unexpected event {event:?}");
1012 }
1013 }
1014}
1015
1016async fn next_tool_call_authorization(
1017 events: &mut UnboundedReceiver<Result<ThreadEvent>>,
1018) -> ToolCallAuthorization {
1019 loop {
1020 let event = events
1021 .next()
1022 .await
1023 .expect("no tool call authorization event received")
1024 .unwrap();
1025 if let ThreadEvent::ToolCallAuthorization(tool_call_authorization) = event {
1026 let permission_kinds = tool_call_authorization
1027 .options
1028 .first_option_of_kind(acp::PermissionOptionKind::AllowAlways)
1029 .map(|option| option.kind);
1030 let allow_once = tool_call_authorization
1031 .options
1032 .first_option_of_kind(acp::PermissionOptionKind::AllowOnce)
1033 .map(|option| option.kind);
1034
1035 assert_eq!(
1036 permission_kinds,
1037 Some(acp::PermissionOptionKind::AllowAlways)
1038 );
1039 assert_eq!(allow_once, Some(acp::PermissionOptionKind::AllowOnce));
1040 return tool_call_authorization;
1041 }
1042 }
1043}
1044
1045#[test]
1046fn test_permission_options_terminal_with_pattern() {
1047 let permission_options = ToolPermissionContext::new(
1048 TerminalTool::NAME,
1049 vec!["cargo build --release".to_string()],
1050 )
1051 .build_permission_options();
1052
1053 let PermissionOptions::Dropdown(choices) = permission_options else {
1054 panic!("Expected dropdown permission options");
1055 };
1056
1057 assert_eq!(choices.len(), 3);
1058 let labels: Vec<&str> = choices
1059 .iter()
1060 .map(|choice| choice.allow.name.as_ref())
1061 .collect();
1062 assert!(labels.contains(&"Always for terminal"));
1063 assert!(labels.contains(&"Always for `cargo build` commands"));
1064 assert!(labels.contains(&"Only this time"));
1065}
1066
1067#[test]
1068fn test_permission_options_terminal_command_with_flag_second_token() {
1069 let permission_options =
1070 ToolPermissionContext::new(TerminalTool::NAME, vec!["ls -la".to_string()])
1071 .build_permission_options();
1072
1073 let PermissionOptions::Dropdown(choices) = permission_options else {
1074 panic!("Expected dropdown permission options");
1075 };
1076
1077 assert_eq!(choices.len(), 3);
1078 let labels: Vec<&str> = choices
1079 .iter()
1080 .map(|choice| choice.allow.name.as_ref())
1081 .collect();
1082 assert!(labels.contains(&"Always for terminal"));
1083 assert!(labels.contains(&"Always for `ls` commands"));
1084 assert!(labels.contains(&"Only this time"));
1085}
1086
1087#[test]
1088fn test_permission_options_terminal_single_word_command() {
1089 let permission_options =
1090 ToolPermissionContext::new(TerminalTool::NAME, vec!["whoami".to_string()])
1091 .build_permission_options();
1092
1093 let PermissionOptions::Dropdown(choices) = permission_options else {
1094 panic!("Expected dropdown permission options");
1095 };
1096
1097 assert_eq!(choices.len(), 3);
1098 let labels: Vec<&str> = choices
1099 .iter()
1100 .map(|choice| choice.allow.name.as_ref())
1101 .collect();
1102 assert!(labels.contains(&"Always for terminal"));
1103 assert!(labels.contains(&"Always for `whoami` commands"));
1104 assert!(labels.contains(&"Only this time"));
1105}
1106
1107#[test]
1108fn test_permission_options_edit_file_with_path_pattern() {
1109 let permission_options =
1110 ToolPermissionContext::new(EditFileTool::NAME, vec!["src/main.rs".to_string()])
1111 .build_permission_options();
1112
1113 let PermissionOptions::Dropdown(choices) = permission_options else {
1114 panic!("Expected dropdown permission options");
1115 };
1116
1117 let labels: Vec<&str> = choices
1118 .iter()
1119 .map(|choice| choice.allow.name.as_ref())
1120 .collect();
1121 assert!(labels.contains(&"Always for edit file"));
1122 assert!(labels.contains(&"Always for `src/`"));
1123}
1124
1125#[test]
1126fn test_permission_options_fetch_with_domain_pattern() {
1127 let permission_options =
1128 ToolPermissionContext::new(FetchTool::NAME, vec!["https://docs.rs/gpui".to_string()])
1129 .build_permission_options();
1130
1131 let PermissionOptions::Dropdown(choices) = permission_options else {
1132 panic!("Expected dropdown permission options");
1133 };
1134
1135 let labels: Vec<&str> = choices
1136 .iter()
1137 .map(|choice| choice.allow.name.as_ref())
1138 .collect();
1139 assert!(labels.contains(&"Always for fetch"));
1140 assert!(labels.contains(&"Always for `docs.rs`"));
1141}
1142
1143#[test]
1144fn test_permission_options_without_pattern() {
1145 let permission_options = ToolPermissionContext::new(
1146 TerminalTool::NAME,
1147 vec!["./deploy.sh --production".to_string()],
1148 )
1149 .build_permission_options();
1150
1151 let PermissionOptions::Dropdown(choices) = permission_options else {
1152 panic!("Expected dropdown permission options");
1153 };
1154
1155 assert_eq!(choices.len(), 2);
1156 let labels: Vec<&str> = choices
1157 .iter()
1158 .map(|choice| choice.allow.name.as_ref())
1159 .collect();
1160 assert!(labels.contains(&"Always for terminal"));
1161 assert!(labels.contains(&"Only this time"));
1162 assert!(!labels.iter().any(|label| label.contains("commands")));
1163}
1164
1165#[test]
1166fn test_permission_options_symlink_target_are_flat_once_only() {
1167 let permission_options =
1168 ToolPermissionContext::symlink_target(EditFileTool::NAME, vec!["/outside/file.txt".into()])
1169 .build_permission_options();
1170
1171 let PermissionOptions::Flat(options) = permission_options else {
1172 panic!("Expected flat permission options for symlink target authorization");
1173 };
1174
1175 assert_eq!(options.len(), 2);
1176 assert!(options.iter().any(|option| {
1177 option.option_id.0.as_ref() == "allow"
1178 && option.kind == acp::PermissionOptionKind::AllowOnce
1179 }));
1180 assert!(options.iter().any(|option| {
1181 option.option_id.0.as_ref() == "deny"
1182 && option.kind == acp::PermissionOptionKind::RejectOnce
1183 }));
1184}
1185
1186#[test]
1187fn test_permission_option_ids_for_terminal() {
1188 let permission_options = ToolPermissionContext::new(
1189 TerminalTool::NAME,
1190 vec!["cargo build --release".to_string()],
1191 )
1192 .build_permission_options();
1193
1194 let PermissionOptions::Dropdown(choices) = permission_options else {
1195 panic!("Expected dropdown permission options");
1196 };
1197
1198 // Expect 3 choices: always-tool, always-pattern, once
1199 assert_eq!(choices.len(), 3);
1200
1201 // First two choices both use the tool-level option IDs
1202 assert_eq!(
1203 choices[0].allow.option_id.0.as_ref(),
1204 "always_allow:terminal"
1205 );
1206 assert_eq!(choices[0].deny.option_id.0.as_ref(), "always_deny:terminal");
1207 assert!(choices[0].sub_patterns.is_empty());
1208
1209 assert_eq!(
1210 choices[1].allow.option_id.0.as_ref(),
1211 "always_allow:terminal"
1212 );
1213 assert_eq!(choices[1].deny.option_id.0.as_ref(), "always_deny:terminal");
1214 assert_eq!(choices[1].sub_patterns, vec!["^cargo\\s+build(\\s|$)"]);
1215
1216 // Third choice is the one-time allow/deny
1217 assert_eq!(choices[2].allow.option_id.0.as_ref(), "allow");
1218 assert_eq!(choices[2].deny.option_id.0.as_ref(), "deny");
1219 assert!(choices[2].sub_patterns.is_empty());
1220}
1221
1222#[test]
1223fn test_permission_options_terminal_pipeline_produces_dropdown_with_patterns() {
1224 let permission_options = ToolPermissionContext::new(
1225 TerminalTool::NAME,
1226 vec!["cargo test 2>&1 | tail".to_string()],
1227 )
1228 .build_permission_options();
1229
1230 let PermissionOptions::DropdownWithPatterns {
1231 choices,
1232 patterns,
1233 tool_name,
1234 } = permission_options
1235 else {
1236 panic!("Expected DropdownWithPatterns permission options for pipeline command");
1237 };
1238
1239 assert_eq!(tool_name, TerminalTool::NAME);
1240
1241 // Should have "Always for terminal" and "Only this time" choices
1242 assert_eq!(choices.len(), 2);
1243 let labels: Vec<&str> = choices
1244 .iter()
1245 .map(|choice| choice.allow.name.as_ref())
1246 .collect();
1247 assert!(labels.contains(&"Always for terminal"));
1248 assert!(labels.contains(&"Only this time"));
1249
1250 // Should have per-command patterns for "cargo test" and "tail"
1251 assert_eq!(patterns.len(), 2);
1252 let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1253 assert!(pattern_names.contains(&"cargo test"));
1254 assert!(pattern_names.contains(&"tail"));
1255
1256 // Verify patterns are valid regex patterns
1257 let regex_patterns: Vec<&str> = patterns.iter().map(|cp| cp.pattern.as_str()).collect();
1258 assert!(regex_patterns.contains(&"^cargo\\s+test(\\s|$)"));
1259 assert!(regex_patterns.contains(&"^tail\\b"));
1260}
1261
1262#[test]
1263fn test_permission_options_terminal_pipeline_with_chaining() {
1264 let permission_options = ToolPermissionContext::new(
1265 TerminalTool::NAME,
1266 vec!["npm install && npm test | tail".to_string()],
1267 )
1268 .build_permission_options();
1269
1270 let PermissionOptions::DropdownWithPatterns { patterns, .. } = permission_options else {
1271 panic!("Expected DropdownWithPatterns for chained pipeline command");
1272 };
1273
1274 // With subcommand-aware patterns, "npm install" and "npm test" are distinct
1275 assert_eq!(patterns.len(), 3);
1276 let pattern_names: Vec<&str> = patterns.iter().map(|cp| cp.display_name.as_str()).collect();
1277 assert!(pattern_names.contains(&"npm install"));
1278 assert!(pattern_names.contains(&"npm test"));
1279 assert!(pattern_names.contains(&"tail"));
1280}
1281
1282#[gpui::test]
1283#[cfg_attr(not(feature = "e2e"), ignore)]
1284async fn test_concurrent_tool_calls(cx: &mut TestAppContext) {
1285 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1286
1287 // Test concurrent tool calls with different delay times
1288 let events = thread
1289 .update(cx, |thread, cx| {
1290 thread.add_tool(DelayTool);
1291 thread.send(
1292 UserMessageId::new(),
1293 [
1294 "Call the delay tool twice in the same message.",
1295 "Once with 100ms. Once with 300ms.",
1296 "When both timers are complete, describe the outputs.",
1297 ],
1298 cx,
1299 )
1300 })
1301 .unwrap()
1302 .collect()
1303 .await;
1304
1305 let stop_reasons = stop_events(events);
1306 assert_eq!(stop_reasons, vec![acp::StopReason::EndTurn]);
1307
1308 thread.update(cx, |thread, _cx| {
1309 let last_message = thread.last_received_or_pending_message().unwrap();
1310 let agent_message = last_message.as_agent_message().unwrap();
1311 let text = agent_message
1312 .content
1313 .iter()
1314 .filter_map(|content| {
1315 if let AgentMessageContent::Text(text) = content {
1316 Some(text.as_str())
1317 } else {
1318 None
1319 }
1320 })
1321 .collect::<String>();
1322
1323 assert!(text.contains("Ding"));
1324 });
1325}
1326
1327#[gpui::test]
1328async fn test_profiles(cx: &mut TestAppContext) {
1329 let ThreadTest {
1330 model, thread, fs, ..
1331 } = setup(cx, TestModel::Fake).await;
1332 let fake_model = model.as_fake();
1333
1334 thread.update(cx, |thread, _cx| {
1335 thread.add_tool(DelayTool);
1336 thread.add_tool(EchoTool);
1337 thread.add_tool(InfiniteTool);
1338 });
1339
1340 // Override profiles and wait for settings to be loaded.
1341 fs.insert_file(
1342 paths::settings_file(),
1343 json!({
1344 "agent": {
1345 "profiles": {
1346 "test-1": {
1347 "name": "Test Profile 1",
1348 "tools": {
1349 EchoTool::NAME: true,
1350 DelayTool::NAME: true,
1351 }
1352 },
1353 "test-2": {
1354 "name": "Test Profile 2",
1355 "tools": {
1356 InfiniteTool::NAME: true,
1357 }
1358 }
1359 }
1360 }
1361 })
1362 .to_string()
1363 .into_bytes(),
1364 )
1365 .await;
1366 cx.run_until_parked();
1367
1368 // Test that test-1 profile (default) has echo and delay tools
1369 thread
1370 .update(cx, |thread, cx| {
1371 thread.set_profile(AgentProfileId("test-1".into()), cx);
1372 thread.send(UserMessageId::new(), ["test"], cx)
1373 })
1374 .unwrap();
1375 cx.run_until_parked();
1376
1377 let mut pending_completions = fake_model.pending_completions();
1378 assert_eq!(pending_completions.len(), 1);
1379 let completion = pending_completions.pop().unwrap();
1380 let tool_names: Vec<String> = completion
1381 .tools
1382 .iter()
1383 .map(|tool| tool.name.clone())
1384 .collect();
1385 assert_eq!(tool_names, vec![DelayTool::NAME, EchoTool::NAME]);
1386 fake_model.end_last_completion_stream();
1387
1388 // Switch to test-2 profile, and verify that it has only the infinite tool.
1389 thread
1390 .update(cx, |thread, cx| {
1391 thread.set_profile(AgentProfileId("test-2".into()), cx);
1392 thread.send(UserMessageId::new(), ["test2"], cx)
1393 })
1394 .unwrap();
1395 cx.run_until_parked();
1396 let mut pending_completions = fake_model.pending_completions();
1397 assert_eq!(pending_completions.len(), 1);
1398 let completion = pending_completions.pop().unwrap();
1399 let tool_names: Vec<String> = completion
1400 .tools
1401 .iter()
1402 .map(|tool| tool.name.clone())
1403 .collect();
1404 assert_eq!(tool_names, vec![InfiniteTool::NAME]);
1405}
1406
1407#[gpui::test]
1408async fn test_mcp_tools(cx: &mut TestAppContext) {
1409 let ThreadTest {
1410 model,
1411 thread,
1412 context_server_store,
1413 fs,
1414 ..
1415 } = setup(cx, TestModel::Fake).await;
1416 let fake_model = model.as_fake();
1417
1418 // Override profiles and wait for settings to be loaded.
1419 fs.insert_file(
1420 paths::settings_file(),
1421 json!({
1422 "agent": {
1423 "tool_permissions": { "default": "allow" },
1424 "profiles": {
1425 "test": {
1426 "name": "Test Profile",
1427 "enable_all_context_servers": true,
1428 "tools": {
1429 EchoTool::NAME: true,
1430 }
1431 },
1432 }
1433 }
1434 })
1435 .to_string()
1436 .into_bytes(),
1437 )
1438 .await;
1439 cx.run_until_parked();
1440 thread.update(cx, |thread, cx| {
1441 thread.set_profile(AgentProfileId("test".into()), cx)
1442 });
1443
1444 let mut mcp_tool_calls = setup_context_server(
1445 "test_server",
1446 vec![context_server::types::Tool {
1447 name: "echo".into(),
1448 description: None,
1449 input_schema: serde_json::to_value(EchoTool::input_schema(
1450 LanguageModelToolSchemaFormat::JsonSchema,
1451 ))
1452 .unwrap(),
1453 output_schema: None,
1454 annotations: None,
1455 }],
1456 &context_server_store,
1457 cx,
1458 );
1459
1460 let events = thread.update(cx, |thread, cx| {
1461 thread.send(UserMessageId::new(), ["Hey"], cx).unwrap()
1462 });
1463 cx.run_until_parked();
1464
1465 // Simulate the model calling the MCP tool.
1466 let completion = fake_model.pending_completions().pop().unwrap();
1467 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1468 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1469 LanguageModelToolUse {
1470 id: "tool_1".into(),
1471 name: "echo".into(),
1472 raw_input: json!({"text": "test"}).to_string(),
1473 input: json!({"text": "test"}),
1474 is_input_complete: true,
1475 thought_signature: None,
1476 },
1477 ));
1478 fake_model.end_last_completion_stream();
1479 cx.run_until_parked();
1480
1481 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1482 assert_eq!(tool_call_params.name, "echo");
1483 assert_eq!(tool_call_params.arguments, Some(json!({"text": "test"})));
1484 tool_call_response
1485 .send(context_server::types::CallToolResponse {
1486 content: vec![context_server::types::ToolResponseContent::Text {
1487 text: "test".into(),
1488 }],
1489 is_error: None,
1490 meta: None,
1491 structured_content: None,
1492 })
1493 .unwrap();
1494 cx.run_until_parked();
1495
1496 assert_eq!(tool_names_for_completion(&completion), vec!["echo"]);
1497 fake_model.send_last_completion_stream_text_chunk("Done!");
1498 fake_model.end_last_completion_stream();
1499 events.collect::<Vec<_>>().await;
1500
1501 // Send again after adding the echo tool, ensuring the name collision is resolved.
1502 let events = thread.update(cx, |thread, cx| {
1503 thread.add_tool(EchoTool);
1504 thread.send(UserMessageId::new(), ["Go"], cx).unwrap()
1505 });
1506 cx.run_until_parked();
1507 let completion = fake_model.pending_completions().pop().unwrap();
1508 assert_eq!(
1509 tool_names_for_completion(&completion),
1510 vec!["echo", "test_server_echo"]
1511 );
1512 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1513 LanguageModelToolUse {
1514 id: "tool_2".into(),
1515 name: "test_server_echo".into(),
1516 raw_input: json!({"text": "mcp"}).to_string(),
1517 input: json!({"text": "mcp"}),
1518 is_input_complete: true,
1519 thought_signature: None,
1520 },
1521 ));
1522 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1523 LanguageModelToolUse {
1524 id: "tool_3".into(),
1525 name: "echo".into(),
1526 raw_input: json!({"text": "native"}).to_string(),
1527 input: json!({"text": "native"}),
1528 is_input_complete: true,
1529 thought_signature: None,
1530 },
1531 ));
1532 fake_model.end_last_completion_stream();
1533 cx.run_until_parked();
1534
1535 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1536 assert_eq!(tool_call_params.name, "echo");
1537 assert_eq!(tool_call_params.arguments, Some(json!({"text": "mcp"})));
1538 tool_call_response
1539 .send(context_server::types::CallToolResponse {
1540 content: vec![context_server::types::ToolResponseContent::Text { text: "mcp".into() }],
1541 is_error: None,
1542 meta: None,
1543 structured_content: None,
1544 })
1545 .unwrap();
1546 cx.run_until_parked();
1547
1548 // Ensure the tool results were inserted with the correct names.
1549 let completion = fake_model.pending_completions().pop().unwrap();
1550 assert_eq!(
1551 completion.messages.last().unwrap().content,
1552 vec![
1553 MessageContent::ToolResult(LanguageModelToolResult {
1554 tool_use_id: "tool_3".into(),
1555 tool_name: "echo".into(),
1556 is_error: false,
1557 content: "native".into(),
1558 output: Some("native".into()),
1559 },),
1560 MessageContent::ToolResult(LanguageModelToolResult {
1561 tool_use_id: "tool_2".into(),
1562 tool_name: "test_server_echo".into(),
1563 is_error: false,
1564 content: "mcp".into(),
1565 output: Some("mcp".into()),
1566 },),
1567 ]
1568 );
1569 fake_model.end_last_completion_stream();
1570 events.collect::<Vec<_>>().await;
1571}
1572
1573#[gpui::test]
1574async fn test_mcp_tool_result_displayed_when_server_disconnected(cx: &mut TestAppContext) {
1575 let ThreadTest {
1576 model,
1577 thread,
1578 context_server_store,
1579 fs,
1580 ..
1581 } = setup(cx, TestModel::Fake).await;
1582 let fake_model = model.as_fake();
1583
1584 // Setup settings to allow MCP tools
1585 fs.insert_file(
1586 paths::settings_file(),
1587 json!({
1588 "agent": {
1589 "always_allow_tool_actions": true,
1590 "profiles": {
1591 "test": {
1592 "name": "Test Profile",
1593 "enable_all_context_servers": true,
1594 "tools": {}
1595 },
1596 }
1597 }
1598 })
1599 .to_string()
1600 .into_bytes(),
1601 )
1602 .await;
1603 cx.run_until_parked();
1604 thread.update(cx, |thread, cx| {
1605 thread.set_profile(AgentProfileId("test".into()), cx)
1606 });
1607
1608 // Setup a context server with a tool
1609 let mut mcp_tool_calls = setup_context_server(
1610 "github_server",
1611 vec![context_server::types::Tool {
1612 name: "issue_read".into(),
1613 description: Some("Read a GitHub issue".into()),
1614 input_schema: json!({
1615 "type": "object",
1616 "properties": {
1617 "issue_url": { "type": "string" }
1618 }
1619 }),
1620 output_schema: None,
1621 annotations: None,
1622 }],
1623 &context_server_store,
1624 cx,
1625 );
1626
1627 // Send a message and have the model call the MCP tool
1628 let events = thread.update(cx, |thread, cx| {
1629 thread
1630 .send(UserMessageId::new(), ["Read issue #47404"], cx)
1631 .unwrap()
1632 });
1633 cx.run_until_parked();
1634
1635 // Verify the MCP tool is available to the model
1636 let completion = fake_model.pending_completions().pop().unwrap();
1637 assert_eq!(
1638 tool_names_for_completion(&completion),
1639 vec!["issue_read"],
1640 "MCP tool should be available"
1641 );
1642
1643 // Simulate the model calling the MCP tool
1644 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
1645 LanguageModelToolUse {
1646 id: "tool_1".into(),
1647 name: "issue_read".into(),
1648 raw_input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"})
1649 .to_string(),
1650 input: json!({"issue_url": "https://github.com/zed-industries/zed/issues/47404"}),
1651 is_input_complete: true,
1652 thought_signature: None,
1653 },
1654 ));
1655 fake_model.end_last_completion_stream();
1656 cx.run_until_parked();
1657
1658 // The MCP server receives the tool call and responds with content
1659 let expected_tool_output = "Issue #47404: Tool call results are cleared upon app close";
1660 let (tool_call_params, tool_call_response) = mcp_tool_calls.next().await.unwrap();
1661 assert_eq!(tool_call_params.name, "issue_read");
1662 tool_call_response
1663 .send(context_server::types::CallToolResponse {
1664 content: vec![context_server::types::ToolResponseContent::Text {
1665 text: expected_tool_output.into(),
1666 }],
1667 is_error: None,
1668 meta: None,
1669 structured_content: None,
1670 })
1671 .unwrap();
1672 cx.run_until_parked();
1673
1674 // After tool completes, the model continues with a new completion request
1675 // that includes the tool results. We need to respond to this.
1676 let _completion = fake_model.pending_completions().pop().unwrap();
1677 fake_model.send_last_completion_stream_text_chunk("I found the issue!");
1678 fake_model
1679 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
1680 fake_model.end_last_completion_stream();
1681 events.collect::<Vec<_>>().await;
1682
1683 // Verify the tool result is stored in the thread by checking the markdown output.
1684 // The tool result is in the first assistant message (not the last one, which is
1685 // the model's response after the tool completed).
1686 thread.update(cx, |thread, _cx| {
1687 let markdown = thread.to_markdown();
1688 assert!(
1689 markdown.contains("**Tool Result**: issue_read"),
1690 "Thread should contain tool result header"
1691 );
1692 assert!(
1693 markdown.contains(expected_tool_output),
1694 "Thread should contain tool output: {}",
1695 expected_tool_output
1696 );
1697 });
1698
1699 // Simulate app restart: disconnect the MCP server.
1700 // After restart, the MCP server won't be connected yet when the thread is replayed.
1701 context_server_store.update(cx, |store, cx| {
1702 let _ = store.stop_server(&ContextServerId("github_server".into()), cx);
1703 });
1704 cx.run_until_parked();
1705
1706 // Replay the thread (this is what happens when loading a saved thread)
1707 let mut replay_events = thread.update(cx, |thread, cx| thread.replay(cx));
1708
1709 let mut found_tool_call = None;
1710 let mut found_tool_call_update_with_output = None;
1711
1712 while let Some(event) = replay_events.next().await {
1713 let event = event.unwrap();
1714 match &event {
1715 ThreadEvent::ToolCall(tc) if tc.tool_call_id.to_string() == "tool_1" => {
1716 found_tool_call = Some(tc.clone());
1717 }
1718 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(update))
1719 if update.tool_call_id.to_string() == "tool_1" =>
1720 {
1721 if update.fields.raw_output.is_some() {
1722 found_tool_call_update_with_output = Some(update.clone());
1723 }
1724 }
1725 _ => {}
1726 }
1727 }
1728
1729 // The tool call should be found
1730 assert!(
1731 found_tool_call.is_some(),
1732 "Tool call should be emitted during replay"
1733 );
1734
1735 assert!(
1736 found_tool_call_update_with_output.is_some(),
1737 "ToolCallUpdate with raw_output should be emitted even when MCP server is disconnected."
1738 );
1739
1740 let update = found_tool_call_update_with_output.unwrap();
1741 assert_eq!(
1742 update.fields.raw_output,
1743 Some(expected_tool_output.into()),
1744 "raw_output should contain the saved tool result"
1745 );
1746
1747 // Also verify the status is correct (completed, not failed)
1748 assert_eq!(
1749 update.fields.status,
1750 Some(acp::ToolCallStatus::Completed),
1751 "Tool call status should reflect the original completion status"
1752 );
1753}
1754
1755#[gpui::test]
1756async fn test_mcp_tool_truncation(cx: &mut TestAppContext) {
1757 let ThreadTest {
1758 model,
1759 thread,
1760 context_server_store,
1761 fs,
1762 ..
1763 } = setup(cx, TestModel::Fake).await;
1764 let fake_model = model.as_fake();
1765
1766 // Set up a profile with all tools enabled
1767 fs.insert_file(
1768 paths::settings_file(),
1769 json!({
1770 "agent": {
1771 "profiles": {
1772 "test": {
1773 "name": "Test Profile",
1774 "enable_all_context_servers": true,
1775 "tools": {
1776 EchoTool::NAME: true,
1777 DelayTool::NAME: true,
1778 WordListTool::NAME: true,
1779 ToolRequiringPermission::NAME: true,
1780 InfiniteTool::NAME: true,
1781 }
1782 },
1783 }
1784 }
1785 })
1786 .to_string()
1787 .into_bytes(),
1788 )
1789 .await;
1790 cx.run_until_parked();
1791
1792 thread.update(cx, |thread, cx| {
1793 thread.set_profile(AgentProfileId("test".into()), cx);
1794 thread.add_tool(EchoTool);
1795 thread.add_tool(DelayTool);
1796 thread.add_tool(WordListTool);
1797 thread.add_tool(ToolRequiringPermission);
1798 thread.add_tool(InfiniteTool);
1799 });
1800
1801 // Set up multiple context servers with some overlapping tool names
1802 let _server1_calls = setup_context_server(
1803 "xxx",
1804 vec![
1805 context_server::types::Tool {
1806 name: "echo".into(), // Conflicts with native EchoTool
1807 description: None,
1808 input_schema: serde_json::to_value(EchoTool::input_schema(
1809 LanguageModelToolSchemaFormat::JsonSchema,
1810 ))
1811 .unwrap(),
1812 output_schema: None,
1813 annotations: None,
1814 },
1815 context_server::types::Tool {
1816 name: "unique_tool_1".into(),
1817 description: None,
1818 input_schema: json!({"type": "object", "properties": {}}),
1819 output_schema: None,
1820 annotations: None,
1821 },
1822 ],
1823 &context_server_store,
1824 cx,
1825 );
1826
1827 let _server2_calls = setup_context_server(
1828 "yyy",
1829 vec![
1830 context_server::types::Tool {
1831 name: "echo".into(), // Also conflicts with native EchoTool
1832 description: None,
1833 input_schema: serde_json::to_value(EchoTool::input_schema(
1834 LanguageModelToolSchemaFormat::JsonSchema,
1835 ))
1836 .unwrap(),
1837 output_schema: None,
1838 annotations: None,
1839 },
1840 context_server::types::Tool {
1841 name: "unique_tool_2".into(),
1842 description: None,
1843 input_schema: json!({"type": "object", "properties": {}}),
1844 output_schema: None,
1845 annotations: None,
1846 },
1847 context_server::types::Tool {
1848 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1849 description: None,
1850 input_schema: json!({"type": "object", "properties": {}}),
1851 output_schema: None,
1852 annotations: None,
1853 },
1854 context_server::types::Tool {
1855 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1856 description: None,
1857 input_schema: json!({"type": "object", "properties": {}}),
1858 output_schema: None,
1859 annotations: None,
1860 },
1861 ],
1862 &context_server_store,
1863 cx,
1864 );
1865 let _server3_calls = setup_context_server(
1866 "zzz",
1867 vec![
1868 context_server::types::Tool {
1869 name: "a".repeat(MAX_TOOL_NAME_LENGTH - 2),
1870 description: None,
1871 input_schema: json!({"type": "object", "properties": {}}),
1872 output_schema: None,
1873 annotations: None,
1874 },
1875 context_server::types::Tool {
1876 name: "b".repeat(MAX_TOOL_NAME_LENGTH - 1),
1877 description: None,
1878 input_schema: json!({"type": "object", "properties": {}}),
1879 output_schema: None,
1880 annotations: None,
1881 },
1882 context_server::types::Tool {
1883 name: "c".repeat(MAX_TOOL_NAME_LENGTH + 1),
1884 description: None,
1885 input_schema: json!({"type": "object", "properties": {}}),
1886 output_schema: None,
1887 annotations: None,
1888 },
1889 ],
1890 &context_server_store,
1891 cx,
1892 );
1893
1894 // Server with spaces in name - tests snake_case conversion for API compatibility
1895 let _server4_calls = setup_context_server(
1896 "Azure DevOps",
1897 vec![context_server::types::Tool {
1898 name: "echo".into(), // Also conflicts - will be disambiguated as azure_dev_ops_echo
1899 description: None,
1900 input_schema: serde_json::to_value(EchoTool::input_schema(
1901 LanguageModelToolSchemaFormat::JsonSchema,
1902 ))
1903 .unwrap(),
1904 output_schema: None,
1905 annotations: None,
1906 }],
1907 &context_server_store,
1908 cx,
1909 );
1910
1911 thread
1912 .update(cx, |thread, cx| {
1913 thread.send(UserMessageId::new(), ["Go"], cx)
1914 })
1915 .unwrap();
1916 cx.run_until_parked();
1917 let completion = fake_model.pending_completions().pop().unwrap();
1918 assert_eq!(
1919 tool_names_for_completion(&completion),
1920 vec![
1921 "azure_dev_ops_echo",
1922 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1923 "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1924 "delay",
1925 "echo",
1926 "infinite",
1927 "tool_requiring_permission",
1928 "unique_tool_1",
1929 "unique_tool_2",
1930 "word_list",
1931 "xxx_echo",
1932 "y_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1933 "yyy_echo",
1934 "z_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1935 ]
1936 );
1937}
1938
1939#[gpui::test]
1940#[cfg_attr(not(feature = "e2e"), ignore)]
1941async fn test_cancellation(cx: &mut TestAppContext) {
1942 let ThreadTest { thread, .. } = setup(cx, TestModel::Sonnet4).await;
1943
1944 let mut events = thread
1945 .update(cx, |thread, cx| {
1946 thread.add_tool(InfiniteTool);
1947 thread.add_tool(EchoTool);
1948 thread.send(
1949 UserMessageId::new(),
1950 ["Call the echo tool, then call the infinite tool, then explain their output"],
1951 cx,
1952 )
1953 })
1954 .unwrap();
1955
1956 // Wait until both tools are called.
1957 let mut expected_tools = vec!["Echo", "Infinite Tool"];
1958 let mut echo_id = None;
1959 let mut echo_completed = false;
1960 while let Some(event) = events.next().await {
1961 match event.unwrap() {
1962 ThreadEvent::ToolCall(tool_call) => {
1963 assert_eq!(tool_call.title, expected_tools.remove(0));
1964 if tool_call.title == "Echo" {
1965 echo_id = Some(tool_call.tool_call_id);
1966 }
1967 }
1968 ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
1969 acp::ToolCallUpdate {
1970 tool_call_id,
1971 fields:
1972 acp::ToolCallUpdateFields {
1973 status: Some(acp::ToolCallStatus::Completed),
1974 ..
1975 },
1976 ..
1977 },
1978 )) if Some(&tool_call_id) == echo_id.as_ref() => {
1979 echo_completed = true;
1980 }
1981 _ => {}
1982 }
1983
1984 if expected_tools.is_empty() && echo_completed {
1985 break;
1986 }
1987 }
1988
1989 // Cancel the current send and ensure that the event stream is closed, even
1990 // if one of the tools is still running.
1991 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1992 let events = events.collect::<Vec<_>>().await;
1993 let last_event = events.last();
1994 assert!(
1995 matches!(
1996 last_event,
1997 Some(Ok(ThreadEvent::Stop(acp::StopReason::Cancelled)))
1998 ),
1999 "unexpected event {last_event:?}"
2000 );
2001
2002 // Ensure we can still send a new message after cancellation.
2003 let events = thread
2004 .update(cx, |thread, cx| {
2005 thread.send(
2006 UserMessageId::new(),
2007 ["Testing: reply with 'Hello' then stop."],
2008 cx,
2009 )
2010 })
2011 .unwrap()
2012 .collect::<Vec<_>>()
2013 .await;
2014 thread.update(cx, |thread, _cx| {
2015 let message = thread.last_received_or_pending_message().unwrap();
2016 let agent_message = message.as_agent_message().unwrap();
2017 assert_eq!(
2018 agent_message.content,
2019 vec![AgentMessageContent::Text("Hello".to_string())]
2020 );
2021 });
2022 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2023}
2024
2025#[gpui::test]
2026async fn test_terminal_tool_cancellation_captures_output(cx: &mut TestAppContext) {
2027 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2028 always_allow_tools(cx);
2029 let fake_model = model.as_fake();
2030
2031 let environment = Rc::new(cx.update(|cx| {
2032 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2033 }));
2034 let handle = environment.terminal_handle.clone().unwrap();
2035
2036 let mut events = thread
2037 .update(cx, |thread, cx| {
2038 thread.add_tool(crate::TerminalTool::new(
2039 thread.project().clone(),
2040 environment,
2041 ));
2042 thread.send(UserMessageId::new(), ["run a command"], cx)
2043 })
2044 .unwrap();
2045
2046 cx.run_until_parked();
2047
2048 // Simulate the model calling the terminal tool
2049 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2050 LanguageModelToolUse {
2051 id: "terminal_tool_1".into(),
2052 name: TerminalTool::NAME.into(),
2053 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2054 input: json!({"command": "sleep 1000", "cd": "."}),
2055 is_input_complete: true,
2056 thought_signature: None,
2057 },
2058 ));
2059 fake_model.end_last_completion_stream();
2060
2061 // Wait for the terminal tool to start running
2062 wait_for_terminal_tool_started(&mut events, cx).await;
2063
2064 // Cancel the thread while the terminal is running
2065 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2066
2067 // Collect remaining events, driving the executor to let cancellation complete
2068 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2069
2070 // Verify the terminal was killed
2071 assert!(
2072 handle.was_killed(),
2073 "expected terminal handle to be killed on cancellation"
2074 );
2075
2076 // Verify we got a cancellation stop event
2077 assert_eq!(
2078 stop_events(remaining_events),
2079 vec![acp::StopReason::Cancelled],
2080 );
2081
2082 // Verify the tool result contains the terminal output, not just "Tool canceled by user"
2083 thread.update(cx, |thread, _cx| {
2084 let message = thread.last_received_or_pending_message().unwrap();
2085 let agent_message = message.as_agent_message().unwrap();
2086
2087 let tool_use = agent_message
2088 .content
2089 .iter()
2090 .find_map(|content| match content {
2091 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2092 _ => None,
2093 })
2094 .expect("expected tool use in agent message");
2095
2096 let tool_result = agent_message
2097 .tool_results
2098 .get(&tool_use.id)
2099 .expect("expected tool result");
2100
2101 let result_text = match &tool_result.content {
2102 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2103 _ => panic!("expected text content in tool result"),
2104 };
2105
2106 // "partial output" comes from FakeTerminalHandle's output field
2107 assert!(
2108 result_text.contains("partial output"),
2109 "expected tool result to contain terminal output, got: {result_text}"
2110 );
2111 // Match the actual format from process_content in terminal_tool.rs
2112 assert!(
2113 result_text.contains("The user stopped this command"),
2114 "expected tool result to indicate user stopped, got: {result_text}"
2115 );
2116 });
2117
2118 // Verify we can send a new message after cancellation
2119 verify_thread_recovery(&thread, &fake_model, cx).await;
2120}
2121
2122#[gpui::test]
2123async fn test_cancellation_aware_tool_responds_to_cancellation(cx: &mut TestAppContext) {
2124 // This test verifies that tools which properly handle cancellation via
2125 // `event_stream.cancelled_by_user()` (like edit_file_tool) respond promptly
2126 // to cancellation and report that they were cancelled.
2127 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2128 always_allow_tools(cx);
2129 let fake_model = model.as_fake();
2130
2131 let (tool, was_cancelled) = CancellationAwareTool::new();
2132
2133 let mut events = thread
2134 .update(cx, |thread, cx| {
2135 thread.add_tool(tool);
2136 thread.send(
2137 UserMessageId::new(),
2138 ["call the cancellation aware tool"],
2139 cx,
2140 )
2141 })
2142 .unwrap();
2143
2144 cx.run_until_parked();
2145
2146 // Simulate the model calling the cancellation-aware tool
2147 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2148 LanguageModelToolUse {
2149 id: "cancellation_aware_1".into(),
2150 name: "cancellation_aware".into(),
2151 raw_input: r#"{}"#.into(),
2152 input: json!({}),
2153 is_input_complete: true,
2154 thought_signature: None,
2155 },
2156 ));
2157 fake_model.end_last_completion_stream();
2158
2159 cx.run_until_parked();
2160
2161 // Wait for the tool call to be reported
2162 let mut tool_started = false;
2163 let deadline = cx.executor().num_cpus() * 100;
2164 for _ in 0..deadline {
2165 cx.run_until_parked();
2166
2167 while let Some(Some(event)) = events.next().now_or_never() {
2168 if let Ok(ThreadEvent::ToolCall(tool_call)) = &event {
2169 if tool_call.title == "Cancellation Aware Tool" {
2170 tool_started = true;
2171 break;
2172 }
2173 }
2174 }
2175
2176 if tool_started {
2177 break;
2178 }
2179
2180 cx.background_executor
2181 .timer(Duration::from_millis(10))
2182 .await;
2183 }
2184 assert!(tool_started, "expected cancellation aware tool to start");
2185
2186 // Cancel the thread and wait for it to complete
2187 let cancel_task = thread.update(cx, |thread, cx| thread.cancel(cx));
2188
2189 // The cancel task should complete promptly because the tool handles cancellation
2190 let timeout = cx.background_executor.timer(Duration::from_secs(5));
2191 futures::select! {
2192 _ = cancel_task.fuse() => {}
2193 _ = timeout.fuse() => {
2194 panic!("cancel task timed out - tool did not respond to cancellation");
2195 }
2196 }
2197
2198 // Verify the tool detected cancellation via its flag
2199 assert!(
2200 was_cancelled.load(std::sync::atomic::Ordering::SeqCst),
2201 "tool should have detected cancellation via event_stream.cancelled_by_user()"
2202 );
2203
2204 // Collect remaining events
2205 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2206
2207 // Verify we got a cancellation stop event
2208 assert_eq!(
2209 stop_events(remaining_events),
2210 vec![acp::StopReason::Cancelled],
2211 );
2212
2213 // Verify we can send a new message after cancellation
2214 verify_thread_recovery(&thread, &fake_model, cx).await;
2215}
2216
2217/// Helper to verify thread can recover after cancellation by sending a simple message.
2218async fn verify_thread_recovery(
2219 thread: &Entity<Thread>,
2220 fake_model: &FakeLanguageModel,
2221 cx: &mut TestAppContext,
2222) {
2223 let events = thread
2224 .update(cx, |thread, cx| {
2225 thread.send(
2226 UserMessageId::new(),
2227 ["Testing: reply with 'Hello' then stop."],
2228 cx,
2229 )
2230 })
2231 .unwrap();
2232 cx.run_until_parked();
2233 fake_model.send_last_completion_stream_text_chunk("Hello");
2234 fake_model
2235 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2236 fake_model.end_last_completion_stream();
2237
2238 let events = events.collect::<Vec<_>>().await;
2239 thread.update(cx, |thread, _cx| {
2240 let message = thread.last_received_or_pending_message().unwrap();
2241 let agent_message = message.as_agent_message().unwrap();
2242 assert_eq!(
2243 agent_message.content,
2244 vec![AgentMessageContent::Text("Hello".to_string())]
2245 );
2246 });
2247 assert_eq!(stop_events(events), vec![acp::StopReason::EndTurn]);
2248}
2249
2250/// Waits for a terminal tool to start by watching for a ToolCallUpdate with terminal content.
2251async fn wait_for_terminal_tool_started(
2252 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2253 cx: &mut TestAppContext,
2254) {
2255 let deadline = cx.executor().num_cpus() * 100; // Scale with available parallelism
2256 for _ in 0..deadline {
2257 cx.run_until_parked();
2258
2259 while let Some(Some(event)) = events.next().now_or_never() {
2260 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2261 update,
2262 ))) = &event
2263 {
2264 if update.fields.content.as_ref().is_some_and(|content| {
2265 content
2266 .iter()
2267 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2268 }) {
2269 return;
2270 }
2271 }
2272 }
2273
2274 cx.background_executor
2275 .timer(Duration::from_millis(10))
2276 .await;
2277 }
2278 panic!("terminal tool did not start within the expected time");
2279}
2280
2281/// Collects events until a Stop event is received, driving the executor to completion.
2282async fn collect_events_until_stop(
2283 events: &mut mpsc::UnboundedReceiver<Result<ThreadEvent>>,
2284 cx: &mut TestAppContext,
2285) -> Vec<Result<ThreadEvent>> {
2286 let mut collected = Vec::new();
2287 let deadline = cx.executor().num_cpus() * 200;
2288
2289 for _ in 0..deadline {
2290 cx.executor().advance_clock(Duration::from_millis(10));
2291 cx.run_until_parked();
2292
2293 while let Some(Some(event)) = events.next().now_or_never() {
2294 let is_stop = matches!(&event, Ok(ThreadEvent::Stop(_)));
2295 collected.push(event);
2296 if is_stop {
2297 return collected;
2298 }
2299 }
2300 }
2301 panic!(
2302 "did not receive Stop event within the expected time; collected {} events",
2303 collected.len()
2304 );
2305}
2306
2307#[gpui::test]
2308async fn test_truncate_while_terminal_tool_running(cx: &mut TestAppContext) {
2309 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2310 always_allow_tools(cx);
2311 let fake_model = model.as_fake();
2312
2313 let environment = Rc::new(cx.update(|cx| {
2314 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2315 }));
2316 let handle = environment.terminal_handle.clone().unwrap();
2317
2318 let message_id = UserMessageId::new();
2319 let mut events = thread
2320 .update(cx, |thread, cx| {
2321 thread.add_tool(crate::TerminalTool::new(
2322 thread.project().clone(),
2323 environment,
2324 ));
2325 thread.send(message_id.clone(), ["run a command"], cx)
2326 })
2327 .unwrap();
2328
2329 cx.run_until_parked();
2330
2331 // Simulate the model calling the terminal tool
2332 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2333 LanguageModelToolUse {
2334 id: "terminal_tool_1".into(),
2335 name: TerminalTool::NAME.into(),
2336 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2337 input: json!({"command": "sleep 1000", "cd": "."}),
2338 is_input_complete: true,
2339 thought_signature: None,
2340 },
2341 ));
2342 fake_model.end_last_completion_stream();
2343
2344 // Wait for the terminal tool to start running
2345 wait_for_terminal_tool_started(&mut events, cx).await;
2346
2347 // Truncate the thread while the terminal is running
2348 thread
2349 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2350 .unwrap();
2351
2352 // Drive the executor to let cancellation complete
2353 let _ = collect_events_until_stop(&mut events, cx).await;
2354
2355 // Verify the terminal was killed
2356 assert!(
2357 handle.was_killed(),
2358 "expected terminal handle to be killed on truncate"
2359 );
2360
2361 // Verify the thread is empty after truncation
2362 thread.update(cx, |thread, _cx| {
2363 assert_eq!(
2364 thread.to_markdown(),
2365 "",
2366 "expected thread to be empty after truncating the only message"
2367 );
2368 });
2369
2370 // Verify we can send a new message after truncation
2371 verify_thread_recovery(&thread, &fake_model, cx).await;
2372}
2373
2374#[gpui::test]
2375async fn test_cancel_multiple_concurrent_terminal_tools(cx: &mut TestAppContext) {
2376 // Tests that cancellation properly kills all running terminal tools when multiple are active.
2377 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2378 always_allow_tools(cx);
2379 let fake_model = model.as_fake();
2380
2381 let environment = Rc::new(MultiTerminalEnvironment::new());
2382
2383 let mut events = thread
2384 .update(cx, |thread, cx| {
2385 thread.add_tool(crate::TerminalTool::new(
2386 thread.project().clone(),
2387 environment.clone(),
2388 ));
2389 thread.send(UserMessageId::new(), ["run multiple commands"], cx)
2390 })
2391 .unwrap();
2392
2393 cx.run_until_parked();
2394
2395 // Simulate the model calling two terminal tools
2396 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2397 LanguageModelToolUse {
2398 id: "terminal_tool_1".into(),
2399 name: TerminalTool::NAME.into(),
2400 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2401 input: json!({"command": "sleep 1000", "cd": "."}),
2402 is_input_complete: true,
2403 thought_signature: None,
2404 },
2405 ));
2406 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2407 LanguageModelToolUse {
2408 id: "terminal_tool_2".into(),
2409 name: TerminalTool::NAME.into(),
2410 raw_input: r#"{"command": "sleep 2000", "cd": "."}"#.into(),
2411 input: json!({"command": "sleep 2000", "cd": "."}),
2412 is_input_complete: true,
2413 thought_signature: None,
2414 },
2415 ));
2416 fake_model.end_last_completion_stream();
2417
2418 // Wait for both terminal tools to start by counting terminal content updates
2419 let mut terminals_started = 0;
2420 let deadline = cx.executor().num_cpus() * 100;
2421 for _ in 0..deadline {
2422 cx.run_until_parked();
2423
2424 while let Some(Some(event)) = events.next().now_or_never() {
2425 if let Ok(ThreadEvent::ToolCallUpdate(acp_thread::ToolCallUpdate::UpdateFields(
2426 update,
2427 ))) = &event
2428 {
2429 if update.fields.content.as_ref().is_some_and(|content| {
2430 content
2431 .iter()
2432 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
2433 }) {
2434 terminals_started += 1;
2435 if terminals_started >= 2 {
2436 break;
2437 }
2438 }
2439 }
2440 }
2441 if terminals_started >= 2 {
2442 break;
2443 }
2444
2445 cx.background_executor
2446 .timer(Duration::from_millis(10))
2447 .await;
2448 }
2449 assert!(
2450 terminals_started >= 2,
2451 "expected 2 terminal tools to start, got {terminals_started}"
2452 );
2453
2454 // Cancel the thread while both terminals are running
2455 thread.update(cx, |thread, cx| thread.cancel(cx)).detach();
2456
2457 // Collect remaining events
2458 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2459
2460 // Verify both terminal handles were killed
2461 let handles = environment.handles();
2462 assert_eq!(
2463 handles.len(),
2464 2,
2465 "expected 2 terminal handles to be created"
2466 );
2467 assert!(
2468 handles[0].was_killed(),
2469 "expected first terminal handle to be killed on cancellation"
2470 );
2471 assert!(
2472 handles[1].was_killed(),
2473 "expected second terminal handle to be killed on cancellation"
2474 );
2475
2476 // Verify we got a cancellation stop event
2477 assert_eq!(
2478 stop_events(remaining_events),
2479 vec![acp::StopReason::Cancelled],
2480 );
2481}
2482
2483#[gpui::test]
2484async fn test_terminal_tool_stopped_via_terminal_card_button(cx: &mut TestAppContext) {
2485 // Tests that clicking the stop button on the terminal card (as opposed to the main
2486 // cancel button) properly reports user stopped via the was_stopped_by_user path.
2487 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2488 always_allow_tools(cx);
2489 let fake_model = model.as_fake();
2490
2491 let environment = Rc::new(cx.update(|cx| {
2492 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2493 }));
2494 let handle = environment.terminal_handle.clone().unwrap();
2495
2496 let mut events = thread
2497 .update(cx, |thread, cx| {
2498 thread.add_tool(crate::TerminalTool::new(
2499 thread.project().clone(),
2500 environment,
2501 ));
2502 thread.send(UserMessageId::new(), ["run a command"], cx)
2503 })
2504 .unwrap();
2505
2506 cx.run_until_parked();
2507
2508 // Simulate the model calling the terminal tool
2509 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2510 LanguageModelToolUse {
2511 id: "terminal_tool_1".into(),
2512 name: TerminalTool::NAME.into(),
2513 raw_input: r#"{"command": "sleep 1000", "cd": "."}"#.into(),
2514 input: json!({"command": "sleep 1000", "cd": "."}),
2515 is_input_complete: true,
2516 thought_signature: None,
2517 },
2518 ));
2519 fake_model.end_last_completion_stream();
2520
2521 // Wait for the terminal tool to start running
2522 wait_for_terminal_tool_started(&mut events, cx).await;
2523
2524 // Simulate user clicking stop on the terminal card itself.
2525 // This sets the flag and signals exit (simulating what the real UI would do).
2526 handle.set_stopped_by_user(true);
2527 handle.killed.store(true, Ordering::SeqCst);
2528 handle.signal_exit();
2529
2530 // Wait for the tool to complete
2531 cx.run_until_parked();
2532
2533 // The thread continues after tool completion - simulate the model ending its turn
2534 fake_model
2535 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2536 fake_model.end_last_completion_stream();
2537
2538 // Collect remaining events
2539 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2540
2541 // Verify we got an EndTurn (not Cancelled, since we didn't cancel the thread)
2542 assert_eq!(
2543 stop_events(remaining_events),
2544 vec![acp::StopReason::EndTurn],
2545 );
2546
2547 // Verify the tool result indicates user stopped
2548 thread.update(cx, |thread, _cx| {
2549 let message = thread.last_received_or_pending_message().unwrap();
2550 let agent_message = message.as_agent_message().unwrap();
2551
2552 let tool_use = agent_message
2553 .content
2554 .iter()
2555 .find_map(|content| match content {
2556 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2557 _ => None,
2558 })
2559 .expect("expected tool use in agent message");
2560
2561 let tool_result = agent_message
2562 .tool_results
2563 .get(&tool_use.id)
2564 .expect("expected tool result");
2565
2566 let result_text = match &tool_result.content {
2567 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2568 _ => panic!("expected text content in tool result"),
2569 };
2570
2571 assert!(
2572 result_text.contains("The user stopped this command"),
2573 "expected tool result to indicate user stopped, got: {result_text}"
2574 );
2575 });
2576}
2577
2578#[gpui::test]
2579async fn test_terminal_tool_timeout_expires(cx: &mut TestAppContext) {
2580 // Tests that when a timeout is configured and expires, the tool result indicates timeout.
2581 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2582 always_allow_tools(cx);
2583 let fake_model = model.as_fake();
2584
2585 let environment = Rc::new(cx.update(|cx| {
2586 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
2587 }));
2588 let handle = environment.terminal_handle.clone().unwrap();
2589
2590 let mut events = thread
2591 .update(cx, |thread, cx| {
2592 thread.add_tool(crate::TerminalTool::new(
2593 thread.project().clone(),
2594 environment,
2595 ));
2596 thread.send(UserMessageId::new(), ["run a command with timeout"], cx)
2597 })
2598 .unwrap();
2599
2600 cx.run_until_parked();
2601
2602 // Simulate the model calling the terminal tool with a short timeout
2603 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
2604 LanguageModelToolUse {
2605 id: "terminal_tool_1".into(),
2606 name: TerminalTool::NAME.into(),
2607 raw_input: r#"{"command": "sleep 1000", "cd": ".", "timeout_ms": 100}"#.into(),
2608 input: json!({"command": "sleep 1000", "cd": ".", "timeout_ms": 100}),
2609 is_input_complete: true,
2610 thought_signature: None,
2611 },
2612 ));
2613 fake_model.end_last_completion_stream();
2614
2615 // Wait for the terminal tool to start running
2616 wait_for_terminal_tool_started(&mut events, cx).await;
2617
2618 // Advance clock past the timeout
2619 cx.executor().advance_clock(Duration::from_millis(200));
2620 cx.run_until_parked();
2621
2622 // The thread continues after tool completion - simulate the model ending its turn
2623 fake_model
2624 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2625 fake_model.end_last_completion_stream();
2626
2627 // Collect remaining events
2628 let remaining_events = collect_events_until_stop(&mut events, cx).await;
2629
2630 // Verify the terminal was killed due to timeout
2631 assert!(
2632 handle.was_killed(),
2633 "expected terminal handle to be killed on timeout"
2634 );
2635
2636 // Verify we got an EndTurn (the tool completed, just with timeout)
2637 assert_eq!(
2638 stop_events(remaining_events),
2639 vec![acp::StopReason::EndTurn],
2640 );
2641
2642 // Verify the tool result indicates timeout, not user stopped
2643 thread.update(cx, |thread, _cx| {
2644 let message = thread.last_received_or_pending_message().unwrap();
2645 let agent_message = message.as_agent_message().unwrap();
2646
2647 let tool_use = agent_message
2648 .content
2649 .iter()
2650 .find_map(|content| match content {
2651 AgentMessageContent::ToolUse(tool_use) => Some(tool_use),
2652 _ => None,
2653 })
2654 .expect("expected tool use in agent message");
2655
2656 let tool_result = agent_message
2657 .tool_results
2658 .get(&tool_use.id)
2659 .expect("expected tool result");
2660
2661 let result_text = match &tool_result.content {
2662 language_model::LanguageModelToolResultContent::Text(text) => text.to_string(),
2663 _ => panic!("expected text content in tool result"),
2664 };
2665
2666 assert!(
2667 result_text.contains("timed out"),
2668 "expected tool result to indicate timeout, got: {result_text}"
2669 );
2670 assert!(
2671 !result_text.contains("The user stopped"),
2672 "tool result should not mention user stopped when it timed out, got: {result_text}"
2673 );
2674 });
2675}
2676
2677#[gpui::test]
2678async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
2679 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2680 let fake_model = model.as_fake();
2681
2682 let events_1 = thread
2683 .update(cx, |thread, cx| {
2684 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2685 })
2686 .unwrap();
2687 cx.run_until_parked();
2688 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2689 cx.run_until_parked();
2690
2691 let events_2 = thread
2692 .update(cx, |thread, cx| {
2693 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2694 })
2695 .unwrap();
2696 cx.run_until_parked();
2697 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2698 fake_model
2699 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2700 fake_model.end_last_completion_stream();
2701
2702 let events_1 = events_1.collect::<Vec<_>>().await;
2703 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2704 let events_2 = events_2.collect::<Vec<_>>().await;
2705 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2706}
2707
2708#[gpui::test]
2709async fn test_retry_cancelled_promptly_on_new_send(cx: &mut TestAppContext) {
2710 // Regression test: when a completion fails with a retryable error (e.g. upstream 500),
2711 // the retry loop waits on a timer. If the user switches models and sends a new message
2712 // during that delay, the old turn should exit immediately instead of retrying with the
2713 // stale model.
2714 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2715 let model_a = model.as_fake();
2716
2717 // Start a turn with model_a.
2718 let events_1 = thread
2719 .update(cx, |thread, cx| {
2720 thread.send(UserMessageId::new(), ["Hello"], cx)
2721 })
2722 .unwrap();
2723 cx.run_until_parked();
2724 assert_eq!(model_a.completion_count(), 1);
2725
2726 // Model returns a retryable upstream 500. The turn enters the retry delay.
2727 model_a.send_last_completion_stream_error(
2728 LanguageModelCompletionError::UpstreamProviderError {
2729 message: "Internal server error".to_string(),
2730 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
2731 retry_after: None,
2732 },
2733 );
2734 model_a.end_last_completion_stream();
2735 cx.run_until_parked();
2736
2737 // The old completion was consumed; model_a has no pending requests yet because the
2738 // retry timer hasn't fired.
2739 assert_eq!(model_a.completion_count(), 0);
2740
2741 // Switch to model_b and send a new message. This cancels the old turn.
2742 let model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
2743 "fake", "model-b", "Model B", false,
2744 ));
2745 thread.update(cx, |thread, cx| {
2746 thread.set_model(model_b.clone(), cx);
2747 });
2748 let events_2 = thread
2749 .update(cx, |thread, cx| {
2750 thread.send(UserMessageId::new(), ["Continue"], cx)
2751 })
2752 .unwrap();
2753 cx.run_until_parked();
2754
2755 // model_b should have received its completion request.
2756 assert_eq!(model_b.as_fake().completion_count(), 1);
2757
2758 // Advance the clock well past the retry delay (BASE_RETRY_DELAY = 5s).
2759 cx.executor().advance_clock(Duration::from_secs(10));
2760 cx.run_until_parked();
2761
2762 // model_a must NOT have received another completion request — the cancelled turn
2763 // should have exited during the retry delay rather than retrying with the old model.
2764 assert_eq!(
2765 model_a.completion_count(),
2766 0,
2767 "old model should not receive a retry request after cancellation"
2768 );
2769
2770 // Complete model_b's turn.
2771 model_b
2772 .as_fake()
2773 .send_last_completion_stream_text_chunk("Done!");
2774 model_b
2775 .as_fake()
2776 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2777 model_b.as_fake().end_last_completion_stream();
2778
2779 let events_1 = events_1.collect::<Vec<_>>().await;
2780 assert_eq!(stop_events(events_1), vec![acp::StopReason::Cancelled]);
2781
2782 let events_2 = events_2.collect::<Vec<_>>().await;
2783 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2784}
2785
2786#[gpui::test]
2787async fn test_subsequent_successful_sends_dont_cancel(cx: &mut TestAppContext) {
2788 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2789 let fake_model = model.as_fake();
2790
2791 let events_1 = thread
2792 .update(cx, |thread, cx| {
2793 thread.send(UserMessageId::new(), ["Hello 1"], cx)
2794 })
2795 .unwrap();
2796 cx.run_until_parked();
2797 fake_model.send_last_completion_stream_text_chunk("Hey 1!");
2798 fake_model
2799 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2800 fake_model.end_last_completion_stream();
2801 let events_1 = events_1.collect::<Vec<_>>().await;
2802
2803 let events_2 = thread
2804 .update(cx, |thread, cx| {
2805 thread.send(UserMessageId::new(), ["Hello 2"], cx)
2806 })
2807 .unwrap();
2808 cx.run_until_parked();
2809 fake_model.send_last_completion_stream_text_chunk("Hey 2!");
2810 fake_model
2811 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::EndTurn));
2812 fake_model.end_last_completion_stream();
2813 let events_2 = events_2.collect::<Vec<_>>().await;
2814
2815 assert_eq!(stop_events(events_1), vec![acp::StopReason::EndTurn]);
2816 assert_eq!(stop_events(events_2), vec![acp::StopReason::EndTurn]);
2817}
2818
2819#[gpui::test]
2820async fn test_refusal(cx: &mut TestAppContext) {
2821 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2822 let fake_model = model.as_fake();
2823
2824 let events = thread
2825 .update(cx, |thread, cx| {
2826 thread.send(UserMessageId::new(), ["Hello"], cx)
2827 })
2828 .unwrap();
2829 cx.run_until_parked();
2830 thread.read_with(cx, |thread, _| {
2831 assert_eq!(
2832 thread.to_markdown(),
2833 indoc! {"
2834 ## User
2835
2836 Hello
2837 "}
2838 );
2839 });
2840
2841 fake_model.send_last_completion_stream_text_chunk("Hey!");
2842 cx.run_until_parked();
2843 thread.read_with(cx, |thread, _| {
2844 assert_eq!(
2845 thread.to_markdown(),
2846 indoc! {"
2847 ## User
2848
2849 Hello
2850
2851 ## Assistant
2852
2853 Hey!
2854 "}
2855 );
2856 });
2857
2858 // If the model refuses to continue, the thread should remove all the messages after the last user message.
2859 fake_model
2860 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::Refusal));
2861 let events = events.collect::<Vec<_>>().await;
2862 assert_eq!(stop_events(events), vec![acp::StopReason::Refusal]);
2863 thread.read_with(cx, |thread, _| {
2864 assert_eq!(thread.to_markdown(), "");
2865 });
2866}
2867
2868#[gpui::test]
2869async fn test_truncate_first_message(cx: &mut TestAppContext) {
2870 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2871 let fake_model = model.as_fake();
2872
2873 let message_id = UserMessageId::new();
2874 thread
2875 .update(cx, |thread, cx| {
2876 thread.send(message_id.clone(), ["Hello"], cx)
2877 })
2878 .unwrap();
2879 cx.run_until_parked();
2880 thread.read_with(cx, |thread, _| {
2881 assert_eq!(
2882 thread.to_markdown(),
2883 indoc! {"
2884 ## User
2885
2886 Hello
2887 "}
2888 );
2889 assert_eq!(thread.latest_token_usage(), None);
2890 });
2891
2892 fake_model.send_last_completion_stream_text_chunk("Hey!");
2893 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2894 language_model::TokenUsage {
2895 input_tokens: 32_000,
2896 output_tokens: 16_000,
2897 cache_creation_input_tokens: 0,
2898 cache_read_input_tokens: 0,
2899 },
2900 ));
2901 cx.run_until_parked();
2902 thread.read_with(cx, |thread, _| {
2903 assert_eq!(
2904 thread.to_markdown(),
2905 indoc! {"
2906 ## User
2907
2908 Hello
2909
2910 ## Assistant
2911
2912 Hey!
2913 "}
2914 );
2915 assert_eq!(
2916 thread.latest_token_usage(),
2917 Some(acp_thread::TokenUsage {
2918 used_tokens: 32_000 + 16_000,
2919 max_tokens: 1_000_000,
2920 max_output_tokens: None,
2921 input_tokens: 32_000,
2922 output_tokens: 16_000,
2923 })
2924 );
2925 });
2926
2927 thread
2928 .update(cx, |thread, cx| thread.truncate(message_id, cx))
2929 .unwrap();
2930 cx.run_until_parked();
2931 thread.read_with(cx, |thread, _| {
2932 assert_eq!(thread.to_markdown(), "");
2933 assert_eq!(thread.latest_token_usage(), None);
2934 });
2935
2936 // Ensure we can still send a new message after truncation.
2937 thread
2938 .update(cx, |thread, cx| {
2939 thread.send(UserMessageId::new(), ["Hi"], cx)
2940 })
2941 .unwrap();
2942 thread.update(cx, |thread, _cx| {
2943 assert_eq!(
2944 thread.to_markdown(),
2945 indoc! {"
2946 ## User
2947
2948 Hi
2949 "}
2950 );
2951 });
2952 cx.run_until_parked();
2953 fake_model.send_last_completion_stream_text_chunk("Ahoy!");
2954 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
2955 language_model::TokenUsage {
2956 input_tokens: 40_000,
2957 output_tokens: 20_000,
2958 cache_creation_input_tokens: 0,
2959 cache_read_input_tokens: 0,
2960 },
2961 ));
2962 cx.run_until_parked();
2963 thread.read_with(cx, |thread, _| {
2964 assert_eq!(
2965 thread.to_markdown(),
2966 indoc! {"
2967 ## User
2968
2969 Hi
2970
2971 ## Assistant
2972
2973 Ahoy!
2974 "}
2975 );
2976
2977 assert_eq!(
2978 thread.latest_token_usage(),
2979 Some(acp_thread::TokenUsage {
2980 used_tokens: 40_000 + 20_000,
2981 max_tokens: 1_000_000,
2982 max_output_tokens: None,
2983 input_tokens: 40_000,
2984 output_tokens: 20_000,
2985 })
2986 );
2987 });
2988}
2989
2990#[gpui::test]
2991async fn test_truncate_second_message(cx: &mut TestAppContext) {
2992 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
2993 let fake_model = model.as_fake();
2994
2995 thread
2996 .update(cx, |thread, cx| {
2997 thread.send(UserMessageId::new(), ["Message 1"], cx)
2998 })
2999 .unwrap();
3000 cx.run_until_parked();
3001 fake_model.send_last_completion_stream_text_chunk("Message 1 response");
3002 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3003 language_model::TokenUsage {
3004 input_tokens: 32_000,
3005 output_tokens: 16_000,
3006 cache_creation_input_tokens: 0,
3007 cache_read_input_tokens: 0,
3008 },
3009 ));
3010 fake_model.end_last_completion_stream();
3011 cx.run_until_parked();
3012
3013 let assert_first_message_state = |cx: &mut TestAppContext| {
3014 thread.clone().read_with(cx, |thread, _| {
3015 assert_eq!(
3016 thread.to_markdown(),
3017 indoc! {"
3018 ## User
3019
3020 Message 1
3021
3022 ## Assistant
3023
3024 Message 1 response
3025 "}
3026 );
3027
3028 assert_eq!(
3029 thread.latest_token_usage(),
3030 Some(acp_thread::TokenUsage {
3031 used_tokens: 32_000 + 16_000,
3032 max_tokens: 1_000_000,
3033 max_output_tokens: None,
3034 input_tokens: 32_000,
3035 output_tokens: 16_000,
3036 })
3037 );
3038 });
3039 };
3040
3041 assert_first_message_state(cx);
3042
3043 let second_message_id = UserMessageId::new();
3044 thread
3045 .update(cx, |thread, cx| {
3046 thread.send(second_message_id.clone(), ["Message 2"], cx)
3047 })
3048 .unwrap();
3049 cx.run_until_parked();
3050
3051 fake_model.send_last_completion_stream_text_chunk("Message 2 response");
3052 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
3053 language_model::TokenUsage {
3054 input_tokens: 40_000,
3055 output_tokens: 20_000,
3056 cache_creation_input_tokens: 0,
3057 cache_read_input_tokens: 0,
3058 },
3059 ));
3060 fake_model.end_last_completion_stream();
3061 cx.run_until_parked();
3062
3063 thread.read_with(cx, |thread, _| {
3064 assert_eq!(
3065 thread.to_markdown(),
3066 indoc! {"
3067 ## User
3068
3069 Message 1
3070
3071 ## Assistant
3072
3073 Message 1 response
3074
3075 ## User
3076
3077 Message 2
3078
3079 ## Assistant
3080
3081 Message 2 response
3082 "}
3083 );
3084
3085 assert_eq!(
3086 thread.latest_token_usage(),
3087 Some(acp_thread::TokenUsage {
3088 used_tokens: 40_000 + 20_000,
3089 max_tokens: 1_000_000,
3090 max_output_tokens: None,
3091 input_tokens: 40_000,
3092 output_tokens: 20_000,
3093 })
3094 );
3095 });
3096
3097 thread
3098 .update(cx, |thread, cx| thread.truncate(second_message_id, cx))
3099 .unwrap();
3100 cx.run_until_parked();
3101
3102 assert_first_message_state(cx);
3103}
3104
3105#[gpui::test]
3106async fn test_title_generation(cx: &mut TestAppContext) {
3107 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3108 let fake_model = model.as_fake();
3109
3110 let summary_model = Arc::new(FakeLanguageModel::default());
3111 thread.update(cx, |thread, cx| {
3112 thread.set_summarization_model(Some(summary_model.clone()), cx)
3113 });
3114
3115 let send = thread
3116 .update(cx, |thread, cx| {
3117 thread.send(UserMessageId::new(), ["Hello"], cx)
3118 })
3119 .unwrap();
3120 cx.run_until_parked();
3121
3122 fake_model.send_last_completion_stream_text_chunk("Hey!");
3123 fake_model.end_last_completion_stream();
3124 cx.run_until_parked();
3125 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "New Thread"));
3126
3127 // Ensure the summary model has been invoked to generate a title.
3128 summary_model.send_last_completion_stream_text_chunk("Hello ");
3129 summary_model.send_last_completion_stream_text_chunk("world\nG");
3130 summary_model.send_last_completion_stream_text_chunk("oodnight Moon");
3131 summary_model.end_last_completion_stream();
3132 send.collect::<Vec<_>>().await;
3133 cx.run_until_parked();
3134 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3135
3136 // Send another message, ensuring no title is generated this time.
3137 let send = thread
3138 .update(cx, |thread, cx| {
3139 thread.send(UserMessageId::new(), ["Hello again"], cx)
3140 })
3141 .unwrap();
3142 cx.run_until_parked();
3143 fake_model.send_last_completion_stream_text_chunk("Hey again!");
3144 fake_model.end_last_completion_stream();
3145 cx.run_until_parked();
3146 assert_eq!(summary_model.pending_completions(), Vec::new());
3147 send.collect::<Vec<_>>().await;
3148 thread.read_with(cx, |thread, _| assert_eq!(thread.title(), "Hello world"));
3149}
3150
3151#[gpui::test]
3152async fn test_building_request_with_pending_tools(cx: &mut TestAppContext) {
3153 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3154 let fake_model = model.as_fake();
3155
3156 let _events = thread
3157 .update(cx, |thread, cx| {
3158 thread.add_tool(ToolRequiringPermission);
3159 thread.add_tool(EchoTool);
3160 thread.send(UserMessageId::new(), ["Hey!"], cx)
3161 })
3162 .unwrap();
3163 cx.run_until_parked();
3164
3165 let permission_tool_use = LanguageModelToolUse {
3166 id: "tool_id_1".into(),
3167 name: ToolRequiringPermission::NAME.into(),
3168 raw_input: "{}".into(),
3169 input: json!({}),
3170 is_input_complete: true,
3171 thought_signature: None,
3172 };
3173 let echo_tool_use = LanguageModelToolUse {
3174 id: "tool_id_2".into(),
3175 name: EchoTool::NAME.into(),
3176 raw_input: json!({"text": "test"}).to_string(),
3177 input: json!({"text": "test"}),
3178 is_input_complete: true,
3179 thought_signature: None,
3180 };
3181 fake_model.send_last_completion_stream_text_chunk("Hi!");
3182 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3183 permission_tool_use,
3184 ));
3185 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3186 echo_tool_use.clone(),
3187 ));
3188 fake_model.end_last_completion_stream();
3189 cx.run_until_parked();
3190
3191 // Ensure pending tools are skipped when building a request.
3192 let request = thread
3193 .read_with(cx, |thread, cx| {
3194 thread.build_completion_request(CompletionIntent::EditFile, cx)
3195 })
3196 .unwrap();
3197 assert_eq!(
3198 request.messages[1..],
3199 vec![
3200 LanguageModelRequestMessage {
3201 role: Role::User,
3202 content: vec!["Hey!".into()],
3203 cache: true,
3204 reasoning_details: None,
3205 },
3206 LanguageModelRequestMessage {
3207 role: Role::Assistant,
3208 content: vec![
3209 MessageContent::Text("Hi!".into()),
3210 MessageContent::ToolUse(echo_tool_use.clone())
3211 ],
3212 cache: false,
3213 reasoning_details: None,
3214 },
3215 LanguageModelRequestMessage {
3216 role: Role::User,
3217 content: vec![MessageContent::ToolResult(LanguageModelToolResult {
3218 tool_use_id: echo_tool_use.id.clone(),
3219 tool_name: echo_tool_use.name,
3220 is_error: false,
3221 content: "test".into(),
3222 output: Some("test".into())
3223 })],
3224 cache: false,
3225 reasoning_details: None,
3226 },
3227 ],
3228 );
3229}
3230
3231#[gpui::test]
3232async fn test_agent_connection(cx: &mut TestAppContext) {
3233 cx.update(settings::init);
3234 let templates = Templates::new();
3235
3236 // Initialize language model system with test provider
3237 cx.update(|cx| {
3238 gpui_tokio::init(cx);
3239
3240 let http_client = FakeHttpClient::with_404_response();
3241 let clock = Arc::new(clock::FakeSystemClock::new());
3242 let client = Client::new(clock, http_client, cx);
3243 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3244 language_model::init(user_store.clone(), client.clone(), cx);
3245 language_models::init(user_store, client.clone(), cx);
3246 LanguageModelRegistry::test(cx);
3247 });
3248 cx.executor().forbid_parking();
3249
3250 // Create a project for new_thread
3251 let fake_fs = cx.update(|cx| fs::FakeFs::new(cx.background_executor().clone()));
3252 fake_fs.insert_tree(path!("/test"), json!({})).await;
3253 let project = Project::test(fake_fs.clone(), [Path::new("/test")], cx).await;
3254 let cwd = PathList::new(&[Path::new("/test")]);
3255 let thread_store = cx.new(|cx| ThreadStore::new(cx));
3256
3257 // Create agent and connection
3258 let agent = cx
3259 .update(|cx| NativeAgent::new(thread_store, templates.clone(), None, fake_fs.clone(), cx));
3260 let connection = NativeAgentConnection(agent.clone());
3261
3262 // Create a thread using new_thread
3263 let connection_rc = Rc::new(connection.clone());
3264 let acp_thread = cx
3265 .update(|cx| connection_rc.new_session(project, cwd, cx))
3266 .await
3267 .expect("new_thread should succeed");
3268
3269 // Get the session_id from the AcpThread
3270 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
3271
3272 // Test model_selector returns Some
3273 let selector_opt = connection.model_selector(&session_id);
3274 assert!(
3275 selector_opt.is_some(),
3276 "agent should always support ModelSelector"
3277 );
3278 let selector = selector_opt.unwrap();
3279
3280 // Test list_models
3281 let listed_models = cx
3282 .update(|cx| selector.list_models(cx))
3283 .await
3284 .expect("list_models should succeed");
3285 let AgentModelList::Grouped(listed_models) = listed_models else {
3286 panic!("Unexpected model list type");
3287 };
3288 assert!(!listed_models.is_empty(), "should have at least one model");
3289 assert_eq!(
3290 listed_models[&AgentModelGroupName("Fake".into())][0]
3291 .id
3292 .0
3293 .as_ref(),
3294 "fake/fake"
3295 );
3296
3297 // Test selected_model returns the default
3298 let model = cx
3299 .update(|cx| selector.selected_model(cx))
3300 .await
3301 .expect("selected_model should succeed");
3302 let model = cx
3303 .update(|cx| agent.read(cx).models().model_from_id(&model.id))
3304 .unwrap();
3305 let model = model.as_fake();
3306 assert_eq!(model.id().0, "fake", "should return default model");
3307
3308 let request = acp_thread.update(cx, |thread, cx| thread.send(vec!["abc".into()], cx));
3309 cx.run_until_parked();
3310 model.send_last_completion_stream_text_chunk("def");
3311 cx.run_until_parked();
3312 acp_thread.read_with(cx, |thread, cx| {
3313 assert_eq!(
3314 thread.to_markdown(cx),
3315 indoc! {"
3316 ## User
3317
3318 abc
3319
3320 ## Assistant
3321
3322 def
3323
3324 "}
3325 )
3326 });
3327
3328 // Test cancel
3329 cx.update(|cx| connection.cancel(&session_id, cx));
3330 request.await.expect("prompt should fail gracefully");
3331
3332 // Explicitly close the session and drop the ACP thread.
3333 cx.update(|cx| Rc::new(connection.clone()).close_session(&session_id, cx))
3334 .await
3335 .unwrap();
3336 drop(acp_thread);
3337 let result = cx
3338 .update(|cx| {
3339 connection.prompt(
3340 Some(acp_thread::UserMessageId::new()),
3341 acp::PromptRequest::new(session_id.clone(), vec!["ghi".into()]),
3342 cx,
3343 )
3344 })
3345 .await;
3346 assert_eq!(
3347 result.as_ref().unwrap_err().to_string(),
3348 "Session not found",
3349 "unexpected result: {:?}",
3350 result
3351 );
3352}
3353
3354#[gpui::test]
3355async fn test_tool_updates_to_completion(cx: &mut TestAppContext) {
3356 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3357 thread.update(cx, |thread, _cx| thread.add_tool(EchoTool));
3358 let fake_model = model.as_fake();
3359
3360 let mut events = thread
3361 .update(cx, |thread, cx| {
3362 thread.send(UserMessageId::new(), ["Echo something"], cx)
3363 })
3364 .unwrap();
3365 cx.run_until_parked();
3366
3367 // Simulate streaming partial input.
3368 let input = json!({});
3369 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3370 LanguageModelToolUse {
3371 id: "1".into(),
3372 name: EchoTool::NAME.into(),
3373 raw_input: input.to_string(),
3374 input,
3375 is_input_complete: false,
3376 thought_signature: None,
3377 },
3378 ));
3379
3380 // Input streaming completed
3381 let input = json!({ "text": "Hello!" });
3382 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3383 LanguageModelToolUse {
3384 id: "1".into(),
3385 name: "echo".into(),
3386 raw_input: input.to_string(),
3387 input,
3388 is_input_complete: true,
3389 thought_signature: None,
3390 },
3391 ));
3392 fake_model.end_last_completion_stream();
3393 cx.run_until_parked();
3394
3395 let tool_call = expect_tool_call(&mut events).await;
3396 assert_eq!(
3397 tool_call,
3398 acp::ToolCall::new("1", "Echo")
3399 .raw_input(json!({}))
3400 .meta(acp::Meta::from_iter([("tool_name".into(), "echo".into())]))
3401 );
3402 let update = expect_tool_call_update_fields(&mut events).await;
3403 assert_eq!(
3404 update,
3405 acp::ToolCallUpdate::new(
3406 "1",
3407 acp::ToolCallUpdateFields::new()
3408 .title("Echo")
3409 .kind(acp::ToolKind::Other)
3410 .raw_input(json!({ "text": "Hello!"}))
3411 )
3412 );
3413 let update = expect_tool_call_update_fields(&mut events).await;
3414 assert_eq!(
3415 update,
3416 acp::ToolCallUpdate::new(
3417 "1",
3418 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3419 )
3420 );
3421 let update = expect_tool_call_update_fields(&mut events).await;
3422 assert_eq!(
3423 update,
3424 acp::ToolCallUpdate::new(
3425 "1",
3426 acp::ToolCallUpdateFields::new()
3427 .status(acp::ToolCallStatus::Completed)
3428 .raw_output("Hello!")
3429 )
3430 );
3431}
3432
3433#[gpui::test]
3434async fn test_update_plan_tool_updates_thread_events(cx: &mut TestAppContext) {
3435 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3436 thread.update(cx, |thread, _cx| thread.add_tool(UpdatePlanTool));
3437 let fake_model = model.as_fake();
3438
3439 let mut events = thread
3440 .update(cx, |thread, cx| {
3441 thread.send(UserMessageId::new(), ["Make a plan"], cx)
3442 })
3443 .unwrap();
3444 cx.run_until_parked();
3445
3446 let input = json!({
3447 "plan": [
3448 {
3449 "step": "Inspect the code",
3450 "status": "completed",
3451 "priority": "high"
3452 },
3453 {
3454 "step": "Implement the tool",
3455 "status": "in_progress"
3456 },
3457 {
3458 "step": "Run tests",
3459 "status": "pending",
3460 "priority": "low"
3461 }
3462 ]
3463 });
3464 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3465 LanguageModelToolUse {
3466 id: "plan_1".into(),
3467 name: UpdatePlanTool::NAME.into(),
3468 raw_input: input.to_string(),
3469 input,
3470 is_input_complete: true,
3471 thought_signature: None,
3472 },
3473 ));
3474 fake_model.end_last_completion_stream();
3475 cx.run_until_parked();
3476
3477 let tool_call = expect_tool_call(&mut events).await;
3478 assert_eq!(
3479 tool_call,
3480 acp::ToolCall::new("plan_1", "Update plan")
3481 .kind(acp::ToolKind::Think)
3482 .raw_input(json!({
3483 "plan": [
3484 {
3485 "step": "Inspect the code",
3486 "status": "completed",
3487 "priority": "high"
3488 },
3489 {
3490 "step": "Implement the tool",
3491 "status": "in_progress"
3492 },
3493 {
3494 "step": "Run tests",
3495 "status": "pending",
3496 "priority": "low"
3497 }
3498 ]
3499 }))
3500 .meta(acp::Meta::from_iter([(
3501 "tool_name".into(),
3502 "update_plan".into()
3503 )]))
3504 );
3505
3506 let update = expect_tool_call_update_fields(&mut events).await;
3507 assert_eq!(
3508 update,
3509 acp::ToolCallUpdate::new(
3510 "plan_1",
3511 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::InProgress)
3512 )
3513 );
3514
3515 let plan = expect_plan(&mut events).await;
3516 assert_eq!(
3517 plan,
3518 acp::Plan::new(vec![
3519 acp::PlanEntry::new(
3520 "Inspect the code",
3521 acp::PlanEntryPriority::High,
3522 acp::PlanEntryStatus::Completed,
3523 ),
3524 acp::PlanEntry::new(
3525 "Implement the tool",
3526 acp::PlanEntryPriority::Medium,
3527 acp::PlanEntryStatus::InProgress,
3528 ),
3529 acp::PlanEntry::new(
3530 "Run tests",
3531 acp::PlanEntryPriority::Low,
3532 acp::PlanEntryStatus::Pending,
3533 ),
3534 ])
3535 );
3536
3537 let update = expect_tool_call_update_fields(&mut events).await;
3538 assert_eq!(
3539 update,
3540 acp::ToolCallUpdate::new(
3541 "plan_1",
3542 acp::ToolCallUpdateFields::new()
3543 .status(acp::ToolCallStatus::Completed)
3544 .raw_output("Plan updated")
3545 )
3546 );
3547}
3548
3549#[gpui::test]
3550async fn test_send_no_retry_on_success(cx: &mut TestAppContext) {
3551 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3552 let fake_model = model.as_fake();
3553
3554 let mut events = thread
3555 .update(cx, |thread, cx| {
3556 thread.send(UserMessageId::new(), ["Hello!"], cx)
3557 })
3558 .unwrap();
3559 cx.run_until_parked();
3560
3561 fake_model.send_last_completion_stream_text_chunk("Hey!");
3562 fake_model.end_last_completion_stream();
3563
3564 let mut retry_events = Vec::new();
3565 while let Some(Ok(event)) = events.next().await {
3566 match event {
3567 ThreadEvent::Retry(retry_status) => {
3568 retry_events.push(retry_status);
3569 }
3570 ThreadEvent::Stop(..) => break,
3571 _ => {}
3572 }
3573 }
3574
3575 assert_eq!(retry_events.len(), 0);
3576 thread.read_with(cx, |thread, _cx| {
3577 assert_eq!(
3578 thread.to_markdown(),
3579 indoc! {"
3580 ## User
3581
3582 Hello!
3583
3584 ## Assistant
3585
3586 Hey!
3587 "}
3588 )
3589 });
3590}
3591
3592#[gpui::test]
3593async fn test_send_retry_on_error(cx: &mut TestAppContext) {
3594 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3595 let fake_model = model.as_fake();
3596
3597 let mut events = thread
3598 .update(cx, |thread, cx| {
3599 thread.send(UserMessageId::new(), ["Hello!"], cx)
3600 })
3601 .unwrap();
3602 cx.run_until_parked();
3603
3604 fake_model.send_last_completion_stream_text_chunk("Hey,");
3605 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3606 provider: LanguageModelProviderName::new("Anthropic"),
3607 retry_after: Some(Duration::from_secs(3)),
3608 });
3609 fake_model.end_last_completion_stream();
3610
3611 cx.executor().advance_clock(Duration::from_secs(3));
3612 cx.run_until_parked();
3613
3614 fake_model.send_last_completion_stream_text_chunk("there!");
3615 fake_model.end_last_completion_stream();
3616 cx.run_until_parked();
3617
3618 let mut retry_events = Vec::new();
3619 while let Some(Ok(event)) = events.next().await {
3620 match event {
3621 ThreadEvent::Retry(retry_status) => {
3622 retry_events.push(retry_status);
3623 }
3624 ThreadEvent::Stop(..) => break,
3625 _ => {}
3626 }
3627 }
3628
3629 assert_eq!(retry_events.len(), 1);
3630 assert!(matches!(
3631 retry_events[0],
3632 acp_thread::RetryStatus { attempt: 1, .. }
3633 ));
3634 thread.read_with(cx, |thread, _cx| {
3635 assert_eq!(
3636 thread.to_markdown(),
3637 indoc! {"
3638 ## User
3639
3640 Hello!
3641
3642 ## Assistant
3643
3644 Hey,
3645
3646 [resume]
3647
3648 ## Assistant
3649
3650 there!
3651 "}
3652 )
3653 });
3654}
3655
3656#[gpui::test]
3657async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
3658 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3659 let fake_model = model.as_fake();
3660
3661 let events = thread
3662 .update(cx, |thread, cx| {
3663 thread.add_tool(EchoTool);
3664 thread.send(UserMessageId::new(), ["Call the echo tool!"], cx)
3665 })
3666 .unwrap();
3667 cx.run_until_parked();
3668
3669 let tool_use_1 = LanguageModelToolUse {
3670 id: "tool_1".into(),
3671 name: EchoTool::NAME.into(),
3672 raw_input: json!({"text": "test"}).to_string(),
3673 input: json!({"text": "test"}),
3674 is_input_complete: true,
3675 thought_signature: None,
3676 };
3677 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
3678 tool_use_1.clone(),
3679 ));
3680 fake_model.send_last_completion_stream_error(LanguageModelCompletionError::ServerOverloaded {
3681 provider: LanguageModelProviderName::new("Anthropic"),
3682 retry_after: Some(Duration::from_secs(3)),
3683 });
3684 fake_model.end_last_completion_stream();
3685
3686 cx.executor().advance_clock(Duration::from_secs(3));
3687 let completion = fake_model.pending_completions().pop().unwrap();
3688 assert_eq!(
3689 completion.messages[1..],
3690 vec![
3691 LanguageModelRequestMessage {
3692 role: Role::User,
3693 content: vec!["Call the echo tool!".into()],
3694 cache: false,
3695 reasoning_details: None,
3696 },
3697 LanguageModelRequestMessage {
3698 role: Role::Assistant,
3699 content: vec![language_model::MessageContent::ToolUse(tool_use_1.clone())],
3700 cache: false,
3701 reasoning_details: None,
3702 },
3703 LanguageModelRequestMessage {
3704 role: Role::User,
3705 content: vec![language_model::MessageContent::ToolResult(
3706 LanguageModelToolResult {
3707 tool_use_id: tool_use_1.id.clone(),
3708 tool_name: tool_use_1.name.clone(),
3709 is_error: false,
3710 content: "test".into(),
3711 output: Some("test".into())
3712 }
3713 )],
3714 cache: true,
3715 reasoning_details: None,
3716 },
3717 ]
3718 );
3719
3720 fake_model.send_last_completion_stream_text_chunk("Done");
3721 fake_model.end_last_completion_stream();
3722 cx.run_until_parked();
3723 events.collect::<Vec<_>>().await;
3724 thread.read_with(cx, |thread, _cx| {
3725 assert_eq!(
3726 thread.last_received_or_pending_message(),
3727 Some(Message::Agent(AgentMessage {
3728 content: vec![AgentMessageContent::Text("Done".into())],
3729 tool_results: IndexMap::default(),
3730 reasoning_details: None,
3731 }))
3732 );
3733 })
3734}
3735
3736#[gpui::test]
3737async fn test_send_max_retries_exceeded(cx: &mut TestAppContext) {
3738 let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
3739 let fake_model = model.as_fake();
3740
3741 let mut events = thread
3742 .update(cx, |thread, cx| {
3743 thread.send(UserMessageId::new(), ["Hello!"], cx)
3744 })
3745 .unwrap();
3746 cx.run_until_parked();
3747
3748 for _ in 0..crate::thread::MAX_RETRY_ATTEMPTS + 1 {
3749 fake_model.send_last_completion_stream_error(
3750 LanguageModelCompletionError::ServerOverloaded {
3751 provider: LanguageModelProviderName::new("Anthropic"),
3752 retry_after: Some(Duration::from_secs(3)),
3753 },
3754 );
3755 fake_model.end_last_completion_stream();
3756 cx.executor().advance_clock(Duration::from_secs(3));
3757 cx.run_until_parked();
3758 }
3759
3760 let mut errors = Vec::new();
3761 let mut retry_events = Vec::new();
3762 while let Some(event) = events.next().await {
3763 match event {
3764 Ok(ThreadEvent::Retry(retry_status)) => {
3765 retry_events.push(retry_status);
3766 }
3767 Ok(ThreadEvent::Stop(..)) => break,
3768 Err(error) => errors.push(error),
3769 _ => {}
3770 }
3771 }
3772
3773 assert_eq!(
3774 retry_events.len(),
3775 crate::thread::MAX_RETRY_ATTEMPTS as usize
3776 );
3777 for i in 0..crate::thread::MAX_RETRY_ATTEMPTS as usize {
3778 assert_eq!(retry_events[i].attempt, i + 1);
3779 }
3780 assert_eq!(errors.len(), 1);
3781 let error = errors[0]
3782 .downcast_ref::<LanguageModelCompletionError>()
3783 .unwrap();
3784 assert!(matches!(
3785 error,
3786 LanguageModelCompletionError::ServerOverloaded { .. }
3787 ));
3788}
3789
3790#[gpui::test]
3791async fn test_streaming_tool_completes_when_llm_stream_ends_without_final_input(
3792 cx: &mut TestAppContext,
3793) {
3794 init_test(cx);
3795 always_allow_tools(cx);
3796
3797 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
3798 let fake_model = model.as_fake();
3799
3800 thread.update(cx, |thread, _cx| {
3801 thread.add_tool(StreamingEchoTool::new());
3802 });
3803
3804 let _events = thread
3805 .update(cx, |thread, cx| {
3806 thread.send(UserMessageId::new(), ["Use the streaming_echo tool"], cx)
3807 })
3808 .unwrap();
3809 cx.run_until_parked();
3810
3811 // Send a partial tool use (is_input_complete = false), simulating the LLM
3812 // streaming input for a tool.
3813 let tool_use = LanguageModelToolUse {
3814 id: "tool_1".into(),
3815 name: "streaming_echo".into(),
3816 raw_input: r#"{"text": "partial"}"#.into(),
3817 input: json!({"text": "partial"}),
3818 is_input_complete: false,
3819 thought_signature: None,
3820 };
3821 fake_model
3822 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
3823 cx.run_until_parked();
3824
3825 // Send a stream error WITHOUT ever sending is_input_complete = true.
3826 // Before the fix, this would deadlock: the tool waits for more partials
3827 // (or cancellation), run_turn_internal waits for the tool, and the sender
3828 // keeping the channel open lives inside RunningTurn.
3829 fake_model.send_last_completion_stream_error(
3830 LanguageModelCompletionError::UpstreamProviderError {
3831 message: "Internal server error".to_string(),
3832 status: http_client::StatusCode::INTERNAL_SERVER_ERROR,
3833 retry_after: None,
3834 },
3835 );
3836 fake_model.end_last_completion_stream();
3837
3838 // Advance past the retry delay so run_turn_internal retries.
3839 cx.executor().advance_clock(Duration::from_secs(5));
3840 cx.run_until_parked();
3841
3842 // The retry request should contain the streaming tool's error result,
3843 // proving the tool terminated and its result was forwarded.
3844 let completion = fake_model
3845 .pending_completions()
3846 .pop()
3847 .expect("No running turn");
3848 assert_eq!(
3849 completion.messages[1..],
3850 vec![
3851 LanguageModelRequestMessage {
3852 role: Role::User,
3853 content: vec!["Use the streaming_echo tool".into()],
3854 cache: false,
3855 reasoning_details: None,
3856 },
3857 LanguageModelRequestMessage {
3858 role: Role::Assistant,
3859 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
3860 cache: false,
3861 reasoning_details: None,
3862 },
3863 LanguageModelRequestMessage {
3864 role: Role::User,
3865 content: vec![language_model::MessageContent::ToolResult(
3866 LanguageModelToolResult {
3867 tool_use_id: tool_use.id.clone(),
3868 tool_name: tool_use.name,
3869 is_error: true,
3870 content: "Failed to receive tool input: tool input was not fully received"
3871 .into(),
3872 output: Some(
3873 "Failed to receive tool input: tool input was not fully received"
3874 .into()
3875 ),
3876 }
3877 )],
3878 cache: true,
3879 reasoning_details: None,
3880 },
3881 ]
3882 );
3883
3884 // Finish the retry round so the turn completes cleanly.
3885 fake_model.send_last_completion_stream_text_chunk("Done");
3886 fake_model.end_last_completion_stream();
3887 cx.run_until_parked();
3888
3889 thread.read_with(cx, |thread, _cx| {
3890 assert!(
3891 thread.is_turn_complete(),
3892 "Thread should not be stuck; the turn should have completed",
3893 );
3894 });
3895}
3896
3897/// Filters out the stop events for asserting against in tests
3898fn stop_events(result_events: Vec<Result<ThreadEvent>>) -> Vec<acp::StopReason> {
3899 result_events
3900 .into_iter()
3901 .filter_map(|event| match event.unwrap() {
3902 ThreadEvent::Stop(stop_reason) => Some(stop_reason),
3903 _ => None,
3904 })
3905 .collect()
3906}
3907
3908struct ThreadTest {
3909 model: Arc<dyn LanguageModel>,
3910 thread: Entity<Thread>,
3911 project_context: Entity<ProjectContext>,
3912 context_server_store: Entity<ContextServerStore>,
3913 fs: Arc<FakeFs>,
3914}
3915
3916enum TestModel {
3917 Sonnet4,
3918 Fake,
3919}
3920
3921impl TestModel {
3922 fn id(&self) -> LanguageModelId {
3923 match self {
3924 TestModel::Sonnet4 => LanguageModelId("claude-sonnet-4-latest".into()),
3925 TestModel::Fake => unreachable!(),
3926 }
3927 }
3928}
3929
3930async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
3931 cx.executor().allow_parking();
3932
3933 let fs = FakeFs::new(cx.background_executor.clone());
3934 fs.create_dir(paths::settings_file().parent().unwrap())
3935 .await
3936 .unwrap();
3937 fs.insert_file(
3938 paths::settings_file(),
3939 json!({
3940 "agent": {
3941 "default_profile": "test-profile",
3942 "profiles": {
3943 "test-profile": {
3944 "name": "Test Profile",
3945 "tools": {
3946 EchoTool::NAME: true,
3947 DelayTool::NAME: true,
3948 WordListTool::NAME: true,
3949 ToolRequiringPermission::NAME: true,
3950 InfiniteTool::NAME: true,
3951 CancellationAwareTool::NAME: true,
3952 StreamingEchoTool::NAME: true,
3953 StreamingFailingEchoTool::NAME: true,
3954 TerminalTool::NAME: true,
3955 UpdatePlanTool::NAME: true,
3956 }
3957 }
3958 }
3959 }
3960 })
3961 .to_string()
3962 .into_bytes(),
3963 )
3964 .await;
3965
3966 cx.update(|cx| {
3967 settings::init(cx);
3968
3969 match model {
3970 TestModel::Fake => {}
3971 TestModel::Sonnet4 => {
3972 gpui_tokio::init(cx);
3973 let http_client = ReqwestClient::user_agent("agent tests").unwrap();
3974 cx.set_http_client(Arc::new(http_client));
3975 let client = Client::production(cx);
3976 let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
3977 language_model::init(user_store.clone(), client.clone(), cx);
3978 language_models::init(user_store, client.clone(), cx);
3979 }
3980 };
3981
3982 watch_settings(fs.clone(), cx);
3983 });
3984
3985 let templates = Templates::new();
3986
3987 fs.insert_tree(path!("/test"), json!({})).await;
3988 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3989
3990 let model = cx
3991 .update(|cx| {
3992 if let TestModel::Fake = model {
3993 Task::ready(Arc::new(FakeLanguageModel::default()) as Arc<_>)
3994 } else {
3995 let model_id = model.id();
3996 let models = LanguageModelRegistry::read_global(cx);
3997 let model = models
3998 .available_models(cx)
3999 .find(|model| model.id() == model_id)
4000 .unwrap();
4001
4002 let provider = models.provider(&model.provider_id()).unwrap();
4003 let authenticated = provider.authenticate(cx);
4004
4005 cx.spawn(async move |_cx| {
4006 authenticated.await.unwrap();
4007 model
4008 })
4009 }
4010 })
4011 .await;
4012
4013 let project_context = cx.new(|_cx| ProjectContext::default());
4014 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
4015 let context_server_registry =
4016 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
4017 let thread = cx.new(|cx| {
4018 Thread::new(
4019 project,
4020 project_context.clone(),
4021 context_server_registry,
4022 templates,
4023 Some(model.clone()),
4024 cx,
4025 )
4026 });
4027 ThreadTest {
4028 model,
4029 thread,
4030 project_context,
4031 context_server_store,
4032 fs,
4033 }
4034}
4035
4036#[cfg(test)]
4037#[ctor::ctor]
4038fn init_logger() {
4039 if std::env::var("RUST_LOG").is_ok() {
4040 env_logger::init();
4041 }
4042}
4043
4044fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
4045 let fs = fs.clone();
4046 cx.spawn({
4047 async move |cx| {
4048 let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
4049 cx.background_executor(),
4050 fs,
4051 paths::settings_file().clone(),
4052 );
4053 let _watcher_task = watcher_task;
4054
4055 while let Some(new_settings_content) = new_settings_content_rx.next().await {
4056 cx.update(|cx| {
4057 SettingsStore::update_global(cx, |settings, cx| {
4058 settings.set_user_settings(&new_settings_content, cx)
4059 })
4060 })
4061 .ok();
4062 }
4063 }
4064 })
4065 .detach();
4066}
4067
4068fn tool_names_for_completion(completion: &LanguageModelRequest) -> Vec<String> {
4069 completion
4070 .tools
4071 .iter()
4072 .map(|tool| tool.name.clone())
4073 .collect()
4074}
4075
4076fn setup_context_server(
4077 name: &'static str,
4078 tools: Vec<context_server::types::Tool>,
4079 context_server_store: &Entity<ContextServerStore>,
4080 cx: &mut TestAppContext,
4081) -> mpsc::UnboundedReceiver<(
4082 context_server::types::CallToolParams,
4083 oneshot::Sender<context_server::types::CallToolResponse>,
4084)> {
4085 cx.update(|cx| {
4086 let mut settings = ProjectSettings::get_global(cx).clone();
4087 settings.context_servers.insert(
4088 name.into(),
4089 project::project_settings::ContextServerSettings::Stdio {
4090 enabled: true,
4091 remote: false,
4092 command: ContextServerCommand {
4093 path: "somebinary".into(),
4094 args: Vec::new(),
4095 env: None,
4096 timeout: None,
4097 },
4098 },
4099 );
4100 ProjectSettings::override_global(settings, cx);
4101 });
4102
4103 let (mcp_tool_calls_tx, mcp_tool_calls_rx) = mpsc::unbounded();
4104 let fake_transport = context_server::test::create_fake_transport(name, cx.executor())
4105 .on_request::<context_server::types::requests::Initialize, _>(move |_params| async move {
4106 context_server::types::InitializeResponse {
4107 protocol_version: context_server::types::ProtocolVersion(
4108 context_server::types::LATEST_PROTOCOL_VERSION.to_string(),
4109 ),
4110 server_info: context_server::types::Implementation {
4111 name: name.into(),
4112 version: "1.0.0".to_string(),
4113 },
4114 capabilities: context_server::types::ServerCapabilities {
4115 tools: Some(context_server::types::ToolsCapabilities {
4116 list_changed: Some(true),
4117 }),
4118 ..Default::default()
4119 },
4120 meta: None,
4121 }
4122 })
4123 .on_request::<context_server::types::requests::ListTools, _>(move |_params| {
4124 let tools = tools.clone();
4125 async move {
4126 context_server::types::ListToolsResponse {
4127 tools,
4128 next_cursor: None,
4129 meta: None,
4130 }
4131 }
4132 })
4133 .on_request::<context_server::types::requests::CallTool, _>(move |params| {
4134 let mcp_tool_calls_tx = mcp_tool_calls_tx.clone();
4135 async move {
4136 let (response_tx, response_rx) = oneshot::channel();
4137 mcp_tool_calls_tx
4138 .unbounded_send((params, response_tx))
4139 .unwrap();
4140 response_rx.await.unwrap()
4141 }
4142 });
4143 context_server_store.update(cx, |store, cx| {
4144 store.start_server(
4145 Arc::new(ContextServer::new(
4146 ContextServerId(name.into()),
4147 Arc::new(fake_transport),
4148 )),
4149 cx,
4150 );
4151 });
4152 cx.run_until_parked();
4153 mcp_tool_calls_rx
4154}
4155
4156#[gpui::test]
4157async fn test_tokens_before_message(cx: &mut TestAppContext) {
4158 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4159 let fake_model = model.as_fake();
4160
4161 // First message
4162 let message_1_id = UserMessageId::new();
4163 thread
4164 .update(cx, |thread, cx| {
4165 thread.send(message_1_id.clone(), ["First message"], cx)
4166 })
4167 .unwrap();
4168 cx.run_until_parked();
4169
4170 // Before any response, tokens_before_message should return None for first message
4171 thread.read_with(cx, |thread, _| {
4172 assert_eq!(
4173 thread.tokens_before_message(&message_1_id),
4174 None,
4175 "First message should have no tokens before it"
4176 );
4177 });
4178
4179 // Complete first message with usage
4180 fake_model.send_last_completion_stream_text_chunk("Response 1");
4181 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4182 language_model::TokenUsage {
4183 input_tokens: 100,
4184 output_tokens: 50,
4185 cache_creation_input_tokens: 0,
4186 cache_read_input_tokens: 0,
4187 },
4188 ));
4189 fake_model.end_last_completion_stream();
4190 cx.run_until_parked();
4191
4192 // First message still has no tokens before it
4193 thread.read_with(cx, |thread, _| {
4194 assert_eq!(
4195 thread.tokens_before_message(&message_1_id),
4196 None,
4197 "First message should still have no tokens before it after response"
4198 );
4199 });
4200
4201 // Second message
4202 let message_2_id = UserMessageId::new();
4203 thread
4204 .update(cx, |thread, cx| {
4205 thread.send(message_2_id.clone(), ["Second message"], cx)
4206 })
4207 .unwrap();
4208 cx.run_until_parked();
4209
4210 // Second message should have first message's input tokens before it
4211 thread.read_with(cx, |thread, _| {
4212 assert_eq!(
4213 thread.tokens_before_message(&message_2_id),
4214 Some(100),
4215 "Second message should have 100 tokens before it (from first request)"
4216 );
4217 });
4218
4219 // Complete second message
4220 fake_model.send_last_completion_stream_text_chunk("Response 2");
4221 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4222 language_model::TokenUsage {
4223 input_tokens: 250, // Total for this request (includes previous context)
4224 output_tokens: 75,
4225 cache_creation_input_tokens: 0,
4226 cache_read_input_tokens: 0,
4227 },
4228 ));
4229 fake_model.end_last_completion_stream();
4230 cx.run_until_parked();
4231
4232 // Third message
4233 let message_3_id = UserMessageId::new();
4234 thread
4235 .update(cx, |thread, cx| {
4236 thread.send(message_3_id.clone(), ["Third message"], cx)
4237 })
4238 .unwrap();
4239 cx.run_until_parked();
4240
4241 // Third message should have second message's input tokens (250) before it
4242 thread.read_with(cx, |thread, _| {
4243 assert_eq!(
4244 thread.tokens_before_message(&message_3_id),
4245 Some(250),
4246 "Third message should have 250 tokens before it (from second request)"
4247 );
4248 // Second message should still have 100
4249 assert_eq!(
4250 thread.tokens_before_message(&message_2_id),
4251 Some(100),
4252 "Second message should still have 100 tokens before it"
4253 );
4254 // First message still has none
4255 assert_eq!(
4256 thread.tokens_before_message(&message_1_id),
4257 None,
4258 "First message should still have no tokens before it"
4259 );
4260 });
4261}
4262
4263#[gpui::test]
4264async fn test_tokens_before_message_after_truncate(cx: &mut TestAppContext) {
4265 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
4266 let fake_model = model.as_fake();
4267
4268 // Set up three messages with responses
4269 let message_1_id = UserMessageId::new();
4270 thread
4271 .update(cx, |thread, cx| {
4272 thread.send(message_1_id.clone(), ["Message 1"], cx)
4273 })
4274 .unwrap();
4275 cx.run_until_parked();
4276 fake_model.send_last_completion_stream_text_chunk("Response 1");
4277 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4278 language_model::TokenUsage {
4279 input_tokens: 100,
4280 output_tokens: 50,
4281 cache_creation_input_tokens: 0,
4282 cache_read_input_tokens: 0,
4283 },
4284 ));
4285 fake_model.end_last_completion_stream();
4286 cx.run_until_parked();
4287
4288 let message_2_id = UserMessageId::new();
4289 thread
4290 .update(cx, |thread, cx| {
4291 thread.send(message_2_id.clone(), ["Message 2"], cx)
4292 })
4293 .unwrap();
4294 cx.run_until_parked();
4295 fake_model.send_last_completion_stream_text_chunk("Response 2");
4296 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
4297 language_model::TokenUsage {
4298 input_tokens: 250,
4299 output_tokens: 75,
4300 cache_creation_input_tokens: 0,
4301 cache_read_input_tokens: 0,
4302 },
4303 ));
4304 fake_model.end_last_completion_stream();
4305 cx.run_until_parked();
4306
4307 // Verify initial state
4308 thread.read_with(cx, |thread, _| {
4309 assert_eq!(thread.tokens_before_message(&message_2_id), Some(100));
4310 });
4311
4312 // Truncate at message 2 (removes message 2 and everything after)
4313 thread
4314 .update(cx, |thread, cx| thread.truncate(message_2_id.clone(), cx))
4315 .unwrap();
4316 cx.run_until_parked();
4317
4318 // After truncation, message_2_id no longer exists, so lookup should return None
4319 thread.read_with(cx, |thread, _| {
4320 assert_eq!(
4321 thread.tokens_before_message(&message_2_id),
4322 None,
4323 "After truncation, message 2 no longer exists"
4324 );
4325 // Message 1 still exists but has no tokens before it
4326 assert_eq!(
4327 thread.tokens_before_message(&message_1_id),
4328 None,
4329 "First message still has no tokens before it"
4330 );
4331 });
4332}
4333
4334#[gpui::test]
4335async fn test_terminal_tool_permission_rules(cx: &mut TestAppContext) {
4336 init_test(cx);
4337
4338 let fs = FakeFs::new(cx.executor());
4339 fs.insert_tree("/root", json!({})).await;
4340 let project = Project::test(fs, ["/root".as_ref()], cx).await;
4341
4342 // Test 1: Deny rule blocks command
4343 {
4344 let environment = Rc::new(cx.update(|cx| {
4345 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
4346 }));
4347
4348 cx.update(|cx| {
4349 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4350 settings.tool_permissions.tools.insert(
4351 TerminalTool::NAME.into(),
4352 agent_settings::ToolRules {
4353 default: Some(settings::ToolPermissionMode::Confirm),
4354 always_allow: vec![],
4355 always_deny: vec![
4356 agent_settings::CompiledRegex::new(r"rm\s+-rf", false).unwrap(),
4357 ],
4358 always_confirm: vec![],
4359 invalid_patterns: vec![],
4360 },
4361 );
4362 agent_settings::AgentSettings::override_global(settings, cx);
4363 });
4364
4365 #[allow(clippy::arc_with_non_send_sync)]
4366 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4367 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4368
4369 let task = cx.update(|cx| {
4370 tool.run(
4371 ToolInput::resolved(crate::TerminalToolInput {
4372 command: "rm -rf /".to_string(),
4373 cd: ".".to_string(),
4374 timeout_ms: None,
4375 }),
4376 event_stream,
4377 cx,
4378 )
4379 });
4380
4381 let result = task.await;
4382 assert!(
4383 result.is_err(),
4384 "expected command to be blocked by deny rule"
4385 );
4386 let err_msg = result.unwrap_err().to_lowercase();
4387 assert!(
4388 err_msg.contains("blocked"),
4389 "error should mention the command was blocked"
4390 );
4391 }
4392
4393 // Test 2: Allow rule skips confirmation (and overrides default: Deny)
4394 {
4395 let environment = Rc::new(cx.update(|cx| {
4396 FakeThreadEnvironment::default()
4397 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4398 }));
4399
4400 cx.update(|cx| {
4401 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4402 settings.tool_permissions.tools.insert(
4403 TerminalTool::NAME.into(),
4404 agent_settings::ToolRules {
4405 default: Some(settings::ToolPermissionMode::Deny),
4406 always_allow: vec![
4407 agent_settings::CompiledRegex::new(r"^echo\s", false).unwrap(),
4408 ],
4409 always_deny: vec![],
4410 always_confirm: vec![],
4411 invalid_patterns: vec![],
4412 },
4413 );
4414 agent_settings::AgentSettings::override_global(settings, cx);
4415 });
4416
4417 #[allow(clippy::arc_with_non_send_sync)]
4418 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4419 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4420
4421 let task = cx.update(|cx| {
4422 tool.run(
4423 ToolInput::resolved(crate::TerminalToolInput {
4424 command: "echo hello".to_string(),
4425 cd: ".".to_string(),
4426 timeout_ms: None,
4427 }),
4428 event_stream,
4429 cx,
4430 )
4431 });
4432
4433 let update = rx.expect_update_fields().await;
4434 assert!(
4435 update.content.iter().any(|blocks| {
4436 blocks
4437 .iter()
4438 .any(|c| matches!(c, acp::ToolCallContent::Terminal(_)))
4439 }),
4440 "expected terminal content (allow rule should skip confirmation and override default deny)"
4441 );
4442
4443 let result = task.await;
4444 assert!(
4445 result.is_ok(),
4446 "expected command to succeed without confirmation"
4447 );
4448 }
4449
4450 // Test 3: global default: allow does NOT override always_confirm patterns
4451 {
4452 let environment = Rc::new(cx.update(|cx| {
4453 FakeThreadEnvironment::default()
4454 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4455 }));
4456
4457 cx.update(|cx| {
4458 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4459 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4460 settings.tool_permissions.tools.insert(
4461 TerminalTool::NAME.into(),
4462 agent_settings::ToolRules {
4463 default: Some(settings::ToolPermissionMode::Allow),
4464 always_allow: vec![],
4465 always_deny: vec![],
4466 always_confirm: vec![
4467 agent_settings::CompiledRegex::new(r"sudo", false).unwrap(),
4468 ],
4469 invalid_patterns: vec![],
4470 },
4471 );
4472 agent_settings::AgentSettings::override_global(settings, cx);
4473 });
4474
4475 #[allow(clippy::arc_with_non_send_sync)]
4476 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4477 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
4478
4479 let _task = cx.update(|cx| {
4480 tool.run(
4481 ToolInput::resolved(crate::TerminalToolInput {
4482 command: "sudo rm file".to_string(),
4483 cd: ".".to_string(),
4484 timeout_ms: None,
4485 }),
4486 event_stream,
4487 cx,
4488 )
4489 });
4490
4491 // With global default: allow, confirm patterns are still respected
4492 // The expect_authorization() call will panic if no authorization is requested,
4493 // which validates that the confirm pattern still triggers confirmation
4494 let _auth = rx.expect_authorization().await;
4495
4496 drop(_task);
4497 }
4498
4499 // Test 4: tool-specific default: deny is respected even with global default: allow
4500 {
4501 let environment = Rc::new(cx.update(|cx| {
4502 FakeThreadEnvironment::default()
4503 .with_terminal(FakeTerminalHandle::new_with_immediate_exit(cx, 0))
4504 }));
4505
4506 cx.update(|cx| {
4507 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
4508 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
4509 settings.tool_permissions.tools.insert(
4510 TerminalTool::NAME.into(),
4511 agent_settings::ToolRules {
4512 default: Some(settings::ToolPermissionMode::Deny),
4513 always_allow: vec![],
4514 always_deny: vec![],
4515 always_confirm: vec![],
4516 invalid_patterns: vec![],
4517 },
4518 );
4519 agent_settings::AgentSettings::override_global(settings, cx);
4520 });
4521
4522 #[allow(clippy::arc_with_non_send_sync)]
4523 let tool = Arc::new(crate::TerminalTool::new(project.clone(), environment));
4524 let (event_stream, _rx) = crate::ToolCallEventStream::test();
4525
4526 let task = cx.update(|cx| {
4527 tool.run(
4528 ToolInput::resolved(crate::TerminalToolInput {
4529 command: "echo hello".to_string(),
4530 cd: ".".to_string(),
4531 timeout_ms: None,
4532 }),
4533 event_stream,
4534 cx,
4535 )
4536 });
4537
4538 // tool-specific default: deny is respected even with global default: allow
4539 let result = task.await;
4540 assert!(
4541 result.is_err(),
4542 "expected command to be blocked by tool-specific deny default"
4543 );
4544 let err_msg = result.unwrap_err().to_lowercase();
4545 assert!(
4546 err_msg.contains("disabled"),
4547 "error should mention the tool is disabled, got: {err_msg}"
4548 );
4549 }
4550}
4551
4552#[gpui::test]
4553async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) {
4554 init_test(cx);
4555 cx.update(|cx| {
4556 LanguageModelRegistry::test(cx);
4557 });
4558 cx.update(|cx| {
4559 cx.update_flags(true, vec!["subagents".to_string()]);
4560 });
4561
4562 let fs = FakeFs::new(cx.executor());
4563 fs.insert_tree(
4564 "/",
4565 json!({
4566 "a": {
4567 "b.md": "Lorem"
4568 }
4569 }),
4570 )
4571 .await;
4572 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4573 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4574 let agent = cx.update(|cx| {
4575 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4576 });
4577 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4578
4579 let acp_thread = cx
4580 .update(|cx| {
4581 connection
4582 .clone()
4583 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4584 })
4585 .await
4586 .unwrap();
4587 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4588 let thread = agent.read_with(cx, |agent, _| {
4589 agent.sessions.get(&session_id).unwrap().thread.clone()
4590 });
4591 let model = Arc::new(FakeLanguageModel::default());
4592
4593 // Ensure empty threads are not saved, even if they get mutated.
4594 thread.update(cx, |thread, cx| {
4595 thread.set_model(model.clone(), cx);
4596 });
4597 cx.run_until_parked();
4598
4599 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4600 cx.run_until_parked();
4601 model.send_last_completion_stream_text_chunk("spawning subagent");
4602 let subagent_tool_input = SpawnAgentToolInput {
4603 label: "label".to_string(),
4604 message: "subagent task prompt".to_string(),
4605 session_id: None,
4606 };
4607 let subagent_tool_use = LanguageModelToolUse {
4608 id: "subagent_1".into(),
4609 name: SpawnAgentTool::NAME.into(),
4610 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4611 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4612 is_input_complete: true,
4613 thought_signature: None,
4614 };
4615 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4616 subagent_tool_use,
4617 ));
4618 model.end_last_completion_stream();
4619
4620 cx.run_until_parked();
4621
4622 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4623 thread
4624 .running_subagent_ids(cx)
4625 .get(0)
4626 .expect("subagent thread should be running")
4627 .clone()
4628 });
4629
4630 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4631 agent
4632 .sessions
4633 .get(&subagent_session_id)
4634 .expect("subagent session should exist")
4635 .acp_thread
4636 .clone()
4637 });
4638
4639 model.send_last_completion_stream_text_chunk("subagent task response");
4640 model.end_last_completion_stream();
4641
4642 cx.run_until_parked();
4643
4644 assert_eq!(
4645 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4646 indoc! {"
4647 ## User
4648
4649 subagent task prompt
4650
4651 ## Assistant
4652
4653 subagent task response
4654
4655 "}
4656 );
4657
4658 model.send_last_completion_stream_text_chunk("Response");
4659 model.end_last_completion_stream();
4660
4661 send.await.unwrap();
4662
4663 assert_eq!(
4664 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4665 indoc! {r#"
4666 ## User
4667
4668 Prompt
4669
4670 ## Assistant
4671
4672 spawning subagent
4673
4674 **Tool Call: label**
4675 Status: Completed
4676
4677 subagent task response
4678
4679 ## Assistant
4680
4681 Response
4682
4683 "#},
4684 );
4685}
4686
4687#[gpui::test]
4688async fn test_subagent_tool_output_does_not_include_thinking(cx: &mut TestAppContext) {
4689 init_test(cx);
4690 cx.update(|cx| {
4691 LanguageModelRegistry::test(cx);
4692 });
4693 cx.update(|cx| {
4694 cx.update_flags(true, vec!["subagents".to_string()]);
4695 });
4696
4697 let fs = FakeFs::new(cx.executor());
4698 fs.insert_tree(
4699 "/",
4700 json!({
4701 "a": {
4702 "b.md": "Lorem"
4703 }
4704 }),
4705 )
4706 .await;
4707 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4708 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4709 let agent = cx.update(|cx| {
4710 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4711 });
4712 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4713
4714 let acp_thread = cx
4715 .update(|cx| {
4716 connection
4717 .clone()
4718 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4719 })
4720 .await
4721 .unwrap();
4722 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4723 let thread = agent.read_with(cx, |agent, _| {
4724 agent.sessions.get(&session_id).unwrap().thread.clone()
4725 });
4726 let model = Arc::new(FakeLanguageModel::default());
4727
4728 // Ensure empty threads are not saved, even if they get mutated.
4729 thread.update(cx, |thread, cx| {
4730 thread.set_model(model.clone(), cx);
4731 });
4732 cx.run_until_parked();
4733
4734 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4735 cx.run_until_parked();
4736 model.send_last_completion_stream_text_chunk("spawning subagent");
4737 let subagent_tool_input = SpawnAgentToolInput {
4738 label: "label".to_string(),
4739 message: "subagent task prompt".to_string(),
4740 session_id: None,
4741 };
4742 let subagent_tool_use = LanguageModelToolUse {
4743 id: "subagent_1".into(),
4744 name: SpawnAgentTool::NAME.into(),
4745 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4746 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4747 is_input_complete: true,
4748 thought_signature: None,
4749 };
4750 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4751 subagent_tool_use,
4752 ));
4753 model.end_last_completion_stream();
4754
4755 cx.run_until_parked();
4756
4757 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4758 thread
4759 .running_subagent_ids(cx)
4760 .get(0)
4761 .expect("subagent thread should be running")
4762 .clone()
4763 });
4764
4765 let subagent_thread = agent.read_with(cx, |agent, _cx| {
4766 agent
4767 .sessions
4768 .get(&subagent_session_id)
4769 .expect("subagent session should exist")
4770 .acp_thread
4771 .clone()
4772 });
4773
4774 model.send_last_completion_stream_text_chunk("subagent task response 1");
4775 model.send_last_completion_stream_event(LanguageModelCompletionEvent::Thinking {
4776 text: "thinking more about the subagent task".into(),
4777 signature: None,
4778 });
4779 model.send_last_completion_stream_text_chunk("subagent task response 2");
4780 model.end_last_completion_stream();
4781
4782 cx.run_until_parked();
4783
4784 assert_eq!(
4785 subagent_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4786 indoc! {"
4787 ## User
4788
4789 subagent task prompt
4790
4791 ## Assistant
4792
4793 subagent task response 1
4794
4795 <thinking>
4796 thinking more about the subagent task
4797 </thinking>
4798
4799 subagent task response 2
4800
4801 "}
4802 );
4803
4804 model.send_last_completion_stream_text_chunk("Response");
4805 model.end_last_completion_stream();
4806
4807 send.await.unwrap();
4808
4809 assert_eq!(
4810 acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
4811 indoc! {r#"
4812 ## User
4813
4814 Prompt
4815
4816 ## Assistant
4817
4818 spawning subagent
4819
4820 **Tool Call: label**
4821 Status: Completed
4822
4823 subagent task response 1
4824
4825 subagent task response 2
4826
4827 ## Assistant
4828
4829 Response
4830
4831 "#},
4832 );
4833}
4834
4835#[gpui::test]
4836async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAppContext) {
4837 init_test(cx);
4838 cx.update(|cx| {
4839 LanguageModelRegistry::test(cx);
4840 });
4841 cx.update(|cx| {
4842 cx.update_flags(true, vec!["subagents".to_string()]);
4843 });
4844
4845 let fs = FakeFs::new(cx.executor());
4846 fs.insert_tree(
4847 "/",
4848 json!({
4849 "a": {
4850 "b.md": "Lorem"
4851 }
4852 }),
4853 )
4854 .await;
4855 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4856 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4857 let agent = cx.update(|cx| {
4858 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4859 });
4860 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4861
4862 let acp_thread = cx
4863 .update(|cx| {
4864 connection
4865 .clone()
4866 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4867 })
4868 .await
4869 .unwrap();
4870 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
4871 let thread = agent.read_with(cx, |agent, _| {
4872 agent.sessions.get(&session_id).unwrap().thread.clone()
4873 });
4874 let model = Arc::new(FakeLanguageModel::default());
4875
4876 // Ensure empty threads are not saved, even if they get mutated.
4877 thread.update(cx, |thread, cx| {
4878 thread.set_model(model.clone(), cx);
4879 });
4880 cx.run_until_parked();
4881
4882 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
4883 cx.run_until_parked();
4884 model.send_last_completion_stream_text_chunk("spawning subagent");
4885 let subagent_tool_input = SpawnAgentToolInput {
4886 label: "label".to_string(),
4887 message: "subagent task prompt".to_string(),
4888 session_id: None,
4889 };
4890 let subagent_tool_use = LanguageModelToolUse {
4891 id: "subagent_1".into(),
4892 name: SpawnAgentTool::NAME.into(),
4893 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
4894 input: serde_json::to_value(&subagent_tool_input).unwrap(),
4895 is_input_complete: true,
4896 thought_signature: None,
4897 };
4898 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
4899 subagent_tool_use,
4900 ));
4901 model.end_last_completion_stream();
4902
4903 cx.run_until_parked();
4904
4905 let subagent_session_id = thread.read_with(cx, |thread, cx| {
4906 thread
4907 .running_subagent_ids(cx)
4908 .get(0)
4909 .expect("subagent thread should be running")
4910 .clone()
4911 });
4912 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
4913 agent
4914 .sessions
4915 .get(&subagent_session_id)
4916 .expect("subagent session should exist")
4917 .acp_thread
4918 .clone()
4919 });
4920
4921 // model.send_last_completion_stream_text_chunk("subagent task response");
4922 // model.end_last_completion_stream();
4923
4924 // cx.run_until_parked();
4925
4926 acp_thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4927
4928 cx.run_until_parked();
4929
4930 send.await.unwrap();
4931
4932 acp_thread.read_with(cx, |thread, cx| {
4933 assert_eq!(thread.status(), ThreadStatus::Idle);
4934 assert_eq!(
4935 thread.to_markdown(cx),
4936 indoc! {"
4937 ## User
4938
4939 Prompt
4940
4941 ## Assistant
4942
4943 spawning subagent
4944
4945 **Tool Call: label**
4946 Status: Canceled
4947
4948 "}
4949 );
4950 });
4951 subagent_acp_thread.read_with(cx, |thread, cx| {
4952 assert_eq!(thread.status(), ThreadStatus::Idle);
4953 assert_eq!(
4954 thread.to_markdown(cx),
4955 indoc! {"
4956 ## User
4957
4958 subagent task prompt
4959
4960 "}
4961 );
4962 });
4963}
4964
4965#[gpui::test]
4966async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) {
4967 init_test(cx);
4968 cx.update(|cx| {
4969 LanguageModelRegistry::test(cx);
4970 });
4971 cx.update(|cx| {
4972 cx.update_flags(true, vec!["subagents".to_string()]);
4973 });
4974
4975 let fs = FakeFs::new(cx.executor());
4976 fs.insert_tree(
4977 "/",
4978 json!({
4979 "a": {
4980 "b.md": "Lorem"
4981 }
4982 }),
4983 )
4984 .await;
4985 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
4986 let thread_store = cx.new(|cx| ThreadStore::new(cx));
4987 let agent = cx.update(|cx| {
4988 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
4989 });
4990 let connection = Rc::new(NativeAgentConnection(agent.clone()));
4991
4992 let acp_thread = cx
4993 .update(|cx| {
4994 connection
4995 .clone()
4996 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
4997 })
4998 .await
4999 .unwrap();
5000 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5001 let thread = agent.read_with(cx, |agent, _| {
5002 agent.sessions.get(&session_id).unwrap().thread.clone()
5003 });
5004 let model = Arc::new(FakeLanguageModel::default());
5005
5006 thread.update(cx, |thread, cx| {
5007 thread.set_model(model.clone(), cx);
5008 });
5009 cx.run_until_parked();
5010
5011 // === First turn: create subagent ===
5012 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5013 cx.run_until_parked();
5014 model.send_last_completion_stream_text_chunk("spawning subagent");
5015 let subagent_tool_input = SpawnAgentToolInput {
5016 label: "initial task".to_string(),
5017 message: "do the first task".to_string(),
5018 session_id: None,
5019 };
5020 let subagent_tool_use = LanguageModelToolUse {
5021 id: "subagent_1".into(),
5022 name: SpawnAgentTool::NAME.into(),
5023 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5024 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5025 is_input_complete: true,
5026 thought_signature: None,
5027 };
5028 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5029 subagent_tool_use,
5030 ));
5031 model.end_last_completion_stream();
5032
5033 cx.run_until_parked();
5034
5035 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5036 thread
5037 .running_subagent_ids(cx)
5038 .get(0)
5039 .expect("subagent thread should be running")
5040 .clone()
5041 });
5042
5043 let subagent_acp_thread = agent.read_with(cx, |agent, _cx| {
5044 agent
5045 .sessions
5046 .get(&subagent_session_id)
5047 .expect("subagent session should exist")
5048 .acp_thread
5049 .clone()
5050 });
5051
5052 // Subagent responds
5053 model.send_last_completion_stream_text_chunk("first task response");
5054 model.end_last_completion_stream();
5055
5056 cx.run_until_parked();
5057
5058 // Parent model responds to complete first turn
5059 model.send_last_completion_stream_text_chunk("First response");
5060 model.end_last_completion_stream();
5061
5062 send.await.unwrap();
5063
5064 // Verify subagent is no longer running
5065 thread.read_with(cx, |thread, cx| {
5066 assert!(
5067 thread.running_subagent_ids(cx).is_empty(),
5068 "subagent should not be running after completion"
5069 );
5070 });
5071
5072 // === Second turn: resume subagent with session_id ===
5073 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5074 cx.run_until_parked();
5075 model.send_last_completion_stream_text_chunk("resuming subagent");
5076 let resume_tool_input = SpawnAgentToolInput {
5077 label: "follow-up task".to_string(),
5078 message: "do the follow-up task".to_string(),
5079 session_id: Some(subagent_session_id.clone()),
5080 };
5081 let resume_tool_use = LanguageModelToolUse {
5082 id: "subagent_2".into(),
5083 name: SpawnAgentTool::NAME.into(),
5084 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5085 input: serde_json::to_value(&resume_tool_input).unwrap(),
5086 is_input_complete: true,
5087 thought_signature: None,
5088 };
5089 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5090 model.end_last_completion_stream();
5091
5092 cx.run_until_parked();
5093
5094 // Subagent should be running again with the same session
5095 thread.read_with(cx, |thread, cx| {
5096 let running = thread.running_subagent_ids(cx);
5097 assert_eq!(running.len(), 1, "subagent should be running");
5098 assert_eq!(running[0], subagent_session_id, "should be same session");
5099 });
5100
5101 // Subagent responds to follow-up
5102 model.send_last_completion_stream_text_chunk("follow-up task response");
5103 model.end_last_completion_stream();
5104
5105 cx.run_until_parked();
5106
5107 // Parent model responds to complete second turn
5108 model.send_last_completion_stream_text_chunk("Second response");
5109 model.end_last_completion_stream();
5110
5111 send2.await.unwrap();
5112
5113 // Verify subagent is no longer running
5114 thread.read_with(cx, |thread, cx| {
5115 assert!(
5116 thread.running_subagent_ids(cx).is_empty(),
5117 "subagent should not be running after resume completion"
5118 );
5119 });
5120
5121 // Verify the subagent's acp thread has both conversation turns
5122 assert_eq!(
5123 subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)),
5124 indoc! {"
5125 ## User
5126
5127 do the first task
5128
5129 ## Assistant
5130
5131 first task response
5132
5133 ## User
5134
5135 do the follow-up task
5136
5137 ## Assistant
5138
5139 follow-up task response
5140
5141 "}
5142 );
5143}
5144
5145#[gpui::test]
5146async fn test_subagent_thread_inherits_parent_thread_properties(cx: &mut TestAppContext) {
5147 init_test(cx);
5148
5149 cx.update(|cx| {
5150 cx.update_flags(true, vec!["subagents".to_string()]);
5151 });
5152
5153 let fs = FakeFs::new(cx.executor());
5154 fs.insert_tree(path!("/test"), json!({})).await;
5155 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5156 let project_context = cx.new(|_cx| ProjectContext::default());
5157 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5158 let context_server_registry =
5159 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5160 let model = Arc::new(FakeLanguageModel::default());
5161
5162 let parent_thread = cx.new(|cx| {
5163 Thread::new(
5164 project.clone(),
5165 project_context,
5166 context_server_registry,
5167 Templates::new(),
5168 Some(model.clone()),
5169 cx,
5170 )
5171 });
5172
5173 let subagent_thread = cx.new(|cx| Thread::new_subagent(&parent_thread, cx));
5174 subagent_thread.read_with(cx, |subagent_thread, cx| {
5175 assert!(subagent_thread.is_subagent());
5176 assert_eq!(subagent_thread.depth(), 1);
5177 assert_eq!(
5178 subagent_thread.model().map(|model| model.id()),
5179 Some(model.id())
5180 );
5181 assert_eq!(
5182 subagent_thread.parent_thread_id(),
5183 Some(parent_thread.read(cx).id().clone())
5184 );
5185 });
5186}
5187
5188#[gpui::test]
5189async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppContext) {
5190 init_test(cx);
5191
5192 cx.update(|cx| {
5193 cx.update_flags(true, vec!["subagents".to_string()]);
5194 });
5195
5196 let fs = FakeFs::new(cx.executor());
5197 fs.insert_tree(path!("/test"), json!({})).await;
5198 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5199 let project_context = cx.new(|_cx| ProjectContext::default());
5200 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5201 let context_server_registry =
5202 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5203 let model = Arc::new(FakeLanguageModel::default());
5204 let environment = Rc::new(cx.update(|cx| {
5205 FakeThreadEnvironment::default().with_terminal(FakeTerminalHandle::new_never_exits(cx))
5206 }));
5207
5208 let deep_parent_thread = cx.new(|cx| {
5209 let mut thread = Thread::new(
5210 project.clone(),
5211 project_context,
5212 context_server_registry,
5213 Templates::new(),
5214 Some(model.clone()),
5215 cx,
5216 );
5217 thread.set_subagent_context(SubagentContext {
5218 parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
5219 depth: MAX_SUBAGENT_DEPTH - 1,
5220 });
5221 thread
5222 });
5223 let deep_subagent_thread = cx.new(|cx| {
5224 let mut thread = Thread::new_subagent(&deep_parent_thread, cx);
5225 thread.add_default_tools(environment, cx);
5226 thread
5227 });
5228
5229 deep_subagent_thread.read_with(cx, |thread, _| {
5230 assert_eq!(thread.depth(), MAX_SUBAGENT_DEPTH);
5231 assert!(
5232 !thread.has_registered_tool(SpawnAgentTool::NAME),
5233 "subagent tool should not be present at max depth"
5234 );
5235 });
5236}
5237
5238#[gpui::test]
5239async fn test_parent_cancel_stops_subagent(cx: &mut TestAppContext) {
5240 init_test(cx);
5241
5242 cx.update(|cx| {
5243 cx.update_flags(true, vec!["subagents".to_string()]);
5244 });
5245
5246 let fs = FakeFs::new(cx.executor());
5247 fs.insert_tree(path!("/test"), json!({})).await;
5248 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5249 let project_context = cx.new(|_cx| ProjectContext::default());
5250 let context_server_store = project.read_with(cx, |project, _| project.context_server_store());
5251 let context_server_registry =
5252 cx.new(|cx| ContextServerRegistry::new(context_server_store.clone(), cx));
5253 let model = Arc::new(FakeLanguageModel::default());
5254
5255 let parent = cx.new(|cx| {
5256 Thread::new(
5257 project.clone(),
5258 project_context.clone(),
5259 context_server_registry.clone(),
5260 Templates::new(),
5261 Some(model.clone()),
5262 cx,
5263 )
5264 });
5265
5266 let subagent = cx.new(|cx| Thread::new_subagent(&parent, cx));
5267
5268 parent.update(cx, |thread, _cx| {
5269 thread.register_running_subagent(subagent.downgrade());
5270 });
5271
5272 subagent
5273 .update(cx, |thread, cx| {
5274 thread.send(UserMessageId::new(), ["Do work".to_string()], cx)
5275 })
5276 .unwrap();
5277 cx.run_until_parked();
5278
5279 subagent.read_with(cx, |thread, _| {
5280 assert!(!thread.is_turn_complete(), "subagent should be running");
5281 });
5282
5283 parent.update(cx, |thread, cx| {
5284 thread.cancel(cx).detach();
5285 });
5286
5287 subagent.read_with(cx, |thread, _| {
5288 assert!(
5289 thread.is_turn_complete(),
5290 "subagent should be cancelled when parent cancels"
5291 );
5292 });
5293}
5294
5295#[gpui::test]
5296async fn test_subagent_context_window_warning(cx: &mut TestAppContext) {
5297 init_test(cx);
5298 cx.update(|cx| {
5299 LanguageModelRegistry::test(cx);
5300 });
5301 cx.update(|cx| {
5302 cx.update_flags(true, vec!["subagents".to_string()]);
5303 });
5304
5305 let fs = FakeFs::new(cx.executor());
5306 fs.insert_tree(
5307 "/",
5308 json!({
5309 "a": {
5310 "b.md": "Lorem"
5311 }
5312 }),
5313 )
5314 .await;
5315 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5316 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5317 let agent = cx.update(|cx| {
5318 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5319 });
5320 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5321
5322 let acp_thread = cx
5323 .update(|cx| {
5324 connection
5325 .clone()
5326 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5327 })
5328 .await
5329 .unwrap();
5330 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5331 let thread = agent.read_with(cx, |agent, _| {
5332 agent.sessions.get(&session_id).unwrap().thread.clone()
5333 });
5334 let model = Arc::new(FakeLanguageModel::default());
5335
5336 thread.update(cx, |thread, cx| {
5337 thread.set_model(model.clone(), cx);
5338 });
5339 cx.run_until_parked();
5340
5341 // Start the parent turn
5342 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5343 cx.run_until_parked();
5344 model.send_last_completion_stream_text_chunk("spawning subagent");
5345 let subagent_tool_input = SpawnAgentToolInput {
5346 label: "label".to_string(),
5347 message: "subagent task prompt".to_string(),
5348 session_id: None,
5349 };
5350 let subagent_tool_use = LanguageModelToolUse {
5351 id: "subagent_1".into(),
5352 name: SpawnAgentTool::NAME.into(),
5353 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5354 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5355 is_input_complete: true,
5356 thought_signature: None,
5357 };
5358 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5359 subagent_tool_use,
5360 ));
5361 model.end_last_completion_stream();
5362
5363 cx.run_until_parked();
5364
5365 // Verify subagent is running
5366 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5367 thread
5368 .running_subagent_ids(cx)
5369 .get(0)
5370 .expect("subagent thread should be running")
5371 .clone()
5372 });
5373
5374 // Send a usage update that crosses the warning threshold (80% of 1,000,000)
5375 model.send_last_completion_stream_text_chunk("partial work");
5376 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5377 TokenUsage {
5378 input_tokens: 850_000,
5379 output_tokens: 0,
5380 cache_creation_input_tokens: 0,
5381 cache_read_input_tokens: 0,
5382 },
5383 ));
5384
5385 cx.run_until_parked();
5386
5387 // The subagent should no longer be running
5388 thread.read_with(cx, |thread, cx| {
5389 assert!(
5390 thread.running_subagent_ids(cx).is_empty(),
5391 "subagent should be stopped after context window warning"
5392 );
5393 });
5394
5395 // The parent model should get a new completion request to respond to the tool error
5396 model.send_last_completion_stream_text_chunk("Response after warning");
5397 model.end_last_completion_stream();
5398
5399 send.await.unwrap();
5400
5401 // Verify the parent thread shows the warning error in the tool call
5402 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5403 assert!(
5404 markdown.contains("nearing the end of its context window"),
5405 "tool output should contain context window warning message, got:\n{markdown}"
5406 );
5407 assert!(
5408 markdown.contains("Status: Failed"),
5409 "tool call should have Failed status, got:\n{markdown}"
5410 );
5411
5412 // Verify the subagent session still exists (can be resumed)
5413 agent.read_with(cx, |agent, _cx| {
5414 assert!(
5415 agent.sessions.contains_key(&subagent_session_id),
5416 "subagent session should still exist for potential resume"
5417 );
5418 });
5419}
5420
5421#[gpui::test]
5422async fn test_subagent_no_context_window_warning_when_already_at_warning(cx: &mut TestAppContext) {
5423 init_test(cx);
5424 cx.update(|cx| {
5425 LanguageModelRegistry::test(cx);
5426 });
5427 cx.update(|cx| {
5428 cx.update_flags(true, vec!["subagents".to_string()]);
5429 });
5430
5431 let fs = FakeFs::new(cx.executor());
5432 fs.insert_tree(
5433 "/",
5434 json!({
5435 "a": {
5436 "b.md": "Lorem"
5437 }
5438 }),
5439 )
5440 .await;
5441 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5442 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5443 let agent = cx.update(|cx| {
5444 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5445 });
5446 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5447
5448 let acp_thread = cx
5449 .update(|cx| {
5450 connection
5451 .clone()
5452 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5453 })
5454 .await
5455 .unwrap();
5456 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5457 let thread = agent.read_with(cx, |agent, _| {
5458 agent.sessions.get(&session_id).unwrap().thread.clone()
5459 });
5460 let model = Arc::new(FakeLanguageModel::default());
5461
5462 thread.update(cx, |thread, cx| {
5463 thread.set_model(model.clone(), cx);
5464 });
5465 cx.run_until_parked();
5466
5467 // === First turn: create subagent, trigger context window warning ===
5468 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx));
5469 cx.run_until_parked();
5470 model.send_last_completion_stream_text_chunk("spawning subagent");
5471 let subagent_tool_input = SpawnAgentToolInput {
5472 label: "initial task".to_string(),
5473 message: "do the first task".to_string(),
5474 session_id: None,
5475 };
5476 let subagent_tool_use = LanguageModelToolUse {
5477 id: "subagent_1".into(),
5478 name: SpawnAgentTool::NAME.into(),
5479 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5480 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5481 is_input_complete: true,
5482 thought_signature: None,
5483 };
5484 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5485 subagent_tool_use,
5486 ));
5487 model.end_last_completion_stream();
5488
5489 cx.run_until_parked();
5490
5491 let subagent_session_id = thread.read_with(cx, |thread, cx| {
5492 thread
5493 .running_subagent_ids(cx)
5494 .get(0)
5495 .expect("subagent thread should be running")
5496 .clone()
5497 });
5498
5499 // Subagent sends a usage update that crosses the warning threshold.
5500 // This triggers Normal→Warning, stopping the subagent.
5501 model.send_last_completion_stream_text_chunk("partial work");
5502 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5503 TokenUsage {
5504 input_tokens: 850_000,
5505 output_tokens: 0,
5506 cache_creation_input_tokens: 0,
5507 cache_read_input_tokens: 0,
5508 },
5509 ));
5510
5511 cx.run_until_parked();
5512
5513 // Verify the first turn was stopped with a context window warning
5514 thread.read_with(cx, |thread, cx| {
5515 assert!(
5516 thread.running_subagent_ids(cx).is_empty(),
5517 "subagent should be stopped after context window warning"
5518 );
5519 });
5520
5521 // Parent model responds to complete first turn
5522 model.send_last_completion_stream_text_chunk("First response");
5523 model.end_last_completion_stream();
5524
5525 send.await.unwrap();
5526
5527 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5528 assert!(
5529 markdown.contains("nearing the end of its context window"),
5530 "first turn should have context window warning, got:\n{markdown}"
5531 );
5532
5533 // === Second turn: resume the same subagent (now at Warning level) ===
5534 let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx));
5535 cx.run_until_parked();
5536 model.send_last_completion_stream_text_chunk("resuming subagent");
5537 let resume_tool_input = SpawnAgentToolInput {
5538 label: "follow-up task".to_string(),
5539 message: "do the follow-up task".to_string(),
5540 session_id: Some(subagent_session_id.clone()),
5541 };
5542 let resume_tool_use = LanguageModelToolUse {
5543 id: "subagent_2".into(),
5544 name: SpawnAgentTool::NAME.into(),
5545 raw_input: serde_json::to_string(&resume_tool_input).unwrap(),
5546 input: serde_json::to_value(&resume_tool_input).unwrap(),
5547 is_input_complete: true,
5548 thought_signature: None,
5549 };
5550 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use));
5551 model.end_last_completion_stream();
5552
5553 cx.run_until_parked();
5554
5555 // Subagent responds with tokens still at warning level (no worse).
5556 // Since ratio_before_prompt was already Warning, this should NOT
5557 // trigger the context window warning again.
5558 model.send_last_completion_stream_text_chunk("follow-up task response");
5559 model.send_last_completion_stream_event(LanguageModelCompletionEvent::UsageUpdate(
5560 TokenUsage {
5561 input_tokens: 870_000,
5562 output_tokens: 0,
5563 cache_creation_input_tokens: 0,
5564 cache_read_input_tokens: 0,
5565 },
5566 ));
5567 model.end_last_completion_stream();
5568
5569 cx.run_until_parked();
5570
5571 // Parent model responds to complete second turn
5572 model.send_last_completion_stream_text_chunk("Second response");
5573 model.end_last_completion_stream();
5574
5575 send2.await.unwrap();
5576
5577 // The resumed subagent should have completed normally since the ratio
5578 // didn't transition (it was Warning before and stayed at Warning)
5579 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5580 assert!(
5581 markdown.contains("follow-up task response"),
5582 "resumed subagent should complete normally when already at warning, got:\n{markdown}"
5583 );
5584 // The second tool call should NOT have a context window warning
5585 let second_tool_pos = markdown
5586 .find("follow-up task")
5587 .expect("should find follow-up tool call");
5588 let after_second_tool = &markdown[second_tool_pos..];
5589 assert!(
5590 !after_second_tool.contains("nearing the end of its context window"),
5591 "should NOT contain context window warning for resumed subagent at same level, got:\n{after_second_tool}"
5592 );
5593}
5594
5595#[gpui::test]
5596async fn test_subagent_error_propagation(cx: &mut TestAppContext) {
5597 init_test(cx);
5598 cx.update(|cx| {
5599 LanguageModelRegistry::test(cx);
5600 });
5601 cx.update(|cx| {
5602 cx.update_flags(true, vec!["subagents".to_string()]);
5603 });
5604
5605 let fs = FakeFs::new(cx.executor());
5606 fs.insert_tree(
5607 "/",
5608 json!({
5609 "a": {
5610 "b.md": "Lorem"
5611 }
5612 }),
5613 )
5614 .await;
5615 let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await;
5616 let thread_store = cx.new(|cx| ThreadStore::new(cx));
5617 let agent = cx.update(|cx| {
5618 NativeAgent::new(thread_store.clone(), Templates::new(), None, fs.clone(), cx)
5619 });
5620 let connection = Rc::new(NativeAgentConnection(agent.clone()));
5621
5622 let acp_thread = cx
5623 .update(|cx| {
5624 connection
5625 .clone()
5626 .new_session(project.clone(), PathList::new(&[Path::new("")]), cx)
5627 })
5628 .await
5629 .unwrap();
5630 let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone());
5631 let thread = agent.read_with(cx, |agent, _| {
5632 agent.sessions.get(&session_id).unwrap().thread.clone()
5633 });
5634 let model = Arc::new(FakeLanguageModel::default());
5635
5636 thread.update(cx, |thread, cx| {
5637 thread.set_model(model.clone(), cx);
5638 });
5639 cx.run_until_parked();
5640
5641 // Start the parent turn
5642 let send = acp_thread.update(cx, |thread, cx| thread.send_raw("Prompt", cx));
5643 cx.run_until_parked();
5644 model.send_last_completion_stream_text_chunk("spawning subagent");
5645 let subagent_tool_input = SpawnAgentToolInput {
5646 label: "label".to_string(),
5647 message: "subagent task prompt".to_string(),
5648 session_id: None,
5649 };
5650 let subagent_tool_use = LanguageModelToolUse {
5651 id: "subagent_1".into(),
5652 name: SpawnAgentTool::NAME.into(),
5653 raw_input: serde_json::to_string(&subagent_tool_input).unwrap(),
5654 input: serde_json::to_value(&subagent_tool_input).unwrap(),
5655 is_input_complete: true,
5656 thought_signature: None,
5657 };
5658 model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
5659 subagent_tool_use,
5660 ));
5661 model.end_last_completion_stream();
5662
5663 cx.run_until_parked();
5664
5665 // Verify subagent is running
5666 thread.read_with(cx, |thread, cx| {
5667 assert!(
5668 !thread.running_subagent_ids(cx).is_empty(),
5669 "subagent should be running"
5670 );
5671 });
5672
5673 // The subagent's model returns a non-retryable error
5674 model.send_last_completion_stream_error(LanguageModelCompletionError::PromptTooLarge {
5675 tokens: None,
5676 });
5677
5678 cx.run_until_parked();
5679
5680 // The subagent should no longer be running
5681 thread.read_with(cx, |thread, cx| {
5682 assert!(
5683 thread.running_subagent_ids(cx).is_empty(),
5684 "subagent should not be running after error"
5685 );
5686 });
5687
5688 // The parent model should get a new completion request to respond to the tool error
5689 model.send_last_completion_stream_text_chunk("Response after error");
5690 model.end_last_completion_stream();
5691
5692 send.await.unwrap();
5693
5694 // Verify the parent thread shows the error in the tool call
5695 let markdown = acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
5696 assert!(
5697 markdown.contains("Status: Failed"),
5698 "tool call should have Failed status after model error, got:\n{markdown}"
5699 );
5700}
5701
5702#[gpui::test]
5703async fn test_edit_file_tool_deny_rule_blocks_edit(cx: &mut TestAppContext) {
5704 init_test(cx);
5705
5706 let fs = FakeFs::new(cx.executor());
5707 fs.insert_tree("/root", json!({"sensitive_config.txt": "secret data"}))
5708 .await;
5709 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5710
5711 cx.update(|cx| {
5712 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5713 settings.tool_permissions.tools.insert(
5714 EditFileTool::NAME.into(),
5715 agent_settings::ToolRules {
5716 default: Some(settings::ToolPermissionMode::Allow),
5717 always_allow: vec![],
5718 always_deny: vec![agent_settings::CompiledRegex::new(r"sensitive", false).unwrap()],
5719 always_confirm: vec![],
5720 invalid_patterns: vec![],
5721 },
5722 );
5723 agent_settings::AgentSettings::override_global(settings, cx);
5724 });
5725
5726 let context_server_registry =
5727 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
5728 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
5729 let templates = crate::Templates::new();
5730 let thread = cx.new(|cx| {
5731 crate::Thread::new(
5732 project.clone(),
5733 cx.new(|_cx| prompt_store::ProjectContext::default()),
5734 context_server_registry,
5735 templates.clone(),
5736 None,
5737 cx,
5738 )
5739 });
5740
5741 #[allow(clippy::arc_with_non_send_sync)]
5742 let tool = Arc::new(crate::EditFileTool::new(
5743 project.clone(),
5744 thread.downgrade(),
5745 language_registry,
5746 templates,
5747 ));
5748 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5749
5750 let task = cx.update(|cx| {
5751 tool.run(
5752 ToolInput::resolved(crate::EditFileToolInput {
5753 display_description: "Edit sensitive file".to_string(),
5754 path: "root/sensitive_config.txt".into(),
5755 mode: crate::EditFileMode::Edit,
5756 }),
5757 event_stream,
5758 cx,
5759 )
5760 });
5761
5762 let result = task.await;
5763 assert!(result.is_err(), "expected edit to be blocked");
5764 assert!(
5765 result.unwrap_err().to_string().contains("blocked"),
5766 "error should mention the edit was blocked"
5767 );
5768}
5769
5770#[gpui::test]
5771async fn test_delete_path_tool_deny_rule_blocks_deletion(cx: &mut TestAppContext) {
5772 init_test(cx);
5773
5774 let fs = FakeFs::new(cx.executor());
5775 fs.insert_tree("/root", json!({"important_data.txt": "critical info"}))
5776 .await;
5777 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5778
5779 cx.update(|cx| {
5780 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5781 settings.tool_permissions.tools.insert(
5782 DeletePathTool::NAME.into(),
5783 agent_settings::ToolRules {
5784 default: Some(settings::ToolPermissionMode::Allow),
5785 always_allow: vec![],
5786 always_deny: vec![agent_settings::CompiledRegex::new(r"important", false).unwrap()],
5787 always_confirm: vec![],
5788 invalid_patterns: vec![],
5789 },
5790 );
5791 agent_settings::AgentSettings::override_global(settings, cx);
5792 });
5793
5794 let action_log = cx.new(|_cx| action_log::ActionLog::new(project.clone()));
5795
5796 #[allow(clippy::arc_with_non_send_sync)]
5797 let tool = Arc::new(crate::DeletePathTool::new(project, action_log));
5798 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5799
5800 let task = cx.update(|cx| {
5801 tool.run(
5802 ToolInput::resolved(crate::DeletePathToolInput {
5803 path: "root/important_data.txt".to_string(),
5804 }),
5805 event_stream,
5806 cx,
5807 )
5808 });
5809
5810 let result = task.await;
5811 assert!(result.is_err(), "expected deletion to be blocked");
5812 assert!(
5813 result.unwrap_err().contains("blocked"),
5814 "error should mention the deletion was blocked"
5815 );
5816}
5817
5818#[gpui::test]
5819async fn test_move_path_tool_denies_if_destination_denied(cx: &mut TestAppContext) {
5820 init_test(cx);
5821
5822 let fs = FakeFs::new(cx.executor());
5823 fs.insert_tree(
5824 "/root",
5825 json!({
5826 "safe.txt": "content",
5827 "protected": {}
5828 }),
5829 )
5830 .await;
5831 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5832
5833 cx.update(|cx| {
5834 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5835 settings.tool_permissions.tools.insert(
5836 MovePathTool::NAME.into(),
5837 agent_settings::ToolRules {
5838 default: Some(settings::ToolPermissionMode::Allow),
5839 always_allow: vec![],
5840 always_deny: vec![agent_settings::CompiledRegex::new(r"protected", false).unwrap()],
5841 always_confirm: vec![],
5842 invalid_patterns: vec![],
5843 },
5844 );
5845 agent_settings::AgentSettings::override_global(settings, cx);
5846 });
5847
5848 #[allow(clippy::arc_with_non_send_sync)]
5849 let tool = Arc::new(crate::MovePathTool::new(project));
5850 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5851
5852 let task = cx.update(|cx| {
5853 tool.run(
5854 ToolInput::resolved(crate::MovePathToolInput {
5855 source_path: "root/safe.txt".to_string(),
5856 destination_path: "root/protected/safe.txt".to_string(),
5857 }),
5858 event_stream,
5859 cx,
5860 )
5861 });
5862
5863 let result = task.await;
5864 assert!(
5865 result.is_err(),
5866 "expected move to be blocked due to destination path"
5867 );
5868 assert!(
5869 result.unwrap_err().contains("blocked"),
5870 "error should mention the move was blocked"
5871 );
5872}
5873
5874#[gpui::test]
5875async fn test_move_path_tool_denies_if_source_denied(cx: &mut TestAppContext) {
5876 init_test(cx);
5877
5878 let fs = FakeFs::new(cx.executor());
5879 fs.insert_tree(
5880 "/root",
5881 json!({
5882 "secret.txt": "secret content",
5883 "public": {}
5884 }),
5885 )
5886 .await;
5887 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5888
5889 cx.update(|cx| {
5890 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5891 settings.tool_permissions.tools.insert(
5892 MovePathTool::NAME.into(),
5893 agent_settings::ToolRules {
5894 default: Some(settings::ToolPermissionMode::Allow),
5895 always_allow: vec![],
5896 always_deny: vec![agent_settings::CompiledRegex::new(r"secret", false).unwrap()],
5897 always_confirm: vec![],
5898 invalid_patterns: vec![],
5899 },
5900 );
5901 agent_settings::AgentSettings::override_global(settings, cx);
5902 });
5903
5904 #[allow(clippy::arc_with_non_send_sync)]
5905 let tool = Arc::new(crate::MovePathTool::new(project));
5906 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5907
5908 let task = cx.update(|cx| {
5909 tool.run(
5910 ToolInput::resolved(crate::MovePathToolInput {
5911 source_path: "root/secret.txt".to_string(),
5912 destination_path: "root/public/not_secret.txt".to_string(),
5913 }),
5914 event_stream,
5915 cx,
5916 )
5917 });
5918
5919 let result = task.await;
5920 assert!(
5921 result.is_err(),
5922 "expected move to be blocked due to source path"
5923 );
5924 assert!(
5925 result.unwrap_err().contains("blocked"),
5926 "error should mention the move was blocked"
5927 );
5928}
5929
5930#[gpui::test]
5931async fn test_copy_path_tool_deny_rule_blocks_copy(cx: &mut TestAppContext) {
5932 init_test(cx);
5933
5934 let fs = FakeFs::new(cx.executor());
5935 fs.insert_tree(
5936 "/root",
5937 json!({
5938 "confidential.txt": "confidential data",
5939 "dest": {}
5940 }),
5941 )
5942 .await;
5943 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
5944
5945 cx.update(|cx| {
5946 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
5947 settings.tool_permissions.tools.insert(
5948 CopyPathTool::NAME.into(),
5949 agent_settings::ToolRules {
5950 default: Some(settings::ToolPermissionMode::Allow),
5951 always_allow: vec![],
5952 always_deny: vec![
5953 agent_settings::CompiledRegex::new(r"confidential", false).unwrap(),
5954 ],
5955 always_confirm: vec![],
5956 invalid_patterns: vec![],
5957 },
5958 );
5959 agent_settings::AgentSettings::override_global(settings, cx);
5960 });
5961
5962 #[allow(clippy::arc_with_non_send_sync)]
5963 let tool = Arc::new(crate::CopyPathTool::new(project));
5964 let (event_stream, _rx) = crate::ToolCallEventStream::test();
5965
5966 let task = cx.update(|cx| {
5967 tool.run(
5968 ToolInput::resolved(crate::CopyPathToolInput {
5969 source_path: "root/confidential.txt".to_string(),
5970 destination_path: "root/dest/copy.txt".to_string(),
5971 }),
5972 event_stream,
5973 cx,
5974 )
5975 });
5976
5977 let result = task.await;
5978 assert!(result.is_err(), "expected copy to be blocked");
5979 assert!(
5980 result.unwrap_err().contains("blocked"),
5981 "error should mention the copy was blocked"
5982 );
5983}
5984
5985#[gpui::test]
5986async fn test_save_file_tool_denies_if_any_path_denied(cx: &mut TestAppContext) {
5987 init_test(cx);
5988
5989 let fs = FakeFs::new(cx.executor());
5990 fs.insert_tree(
5991 "/root",
5992 json!({
5993 "normal.txt": "normal content",
5994 "readonly": {
5995 "config.txt": "readonly content"
5996 }
5997 }),
5998 )
5999 .await;
6000 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6001
6002 cx.update(|cx| {
6003 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6004 settings.tool_permissions.tools.insert(
6005 SaveFileTool::NAME.into(),
6006 agent_settings::ToolRules {
6007 default: Some(settings::ToolPermissionMode::Allow),
6008 always_allow: vec![],
6009 always_deny: vec![agent_settings::CompiledRegex::new(r"readonly", false).unwrap()],
6010 always_confirm: vec![],
6011 invalid_patterns: vec![],
6012 },
6013 );
6014 agent_settings::AgentSettings::override_global(settings, cx);
6015 });
6016
6017 #[allow(clippy::arc_with_non_send_sync)]
6018 let tool = Arc::new(crate::SaveFileTool::new(project));
6019 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6020
6021 let task = cx.update(|cx| {
6022 tool.run(
6023 ToolInput::resolved(crate::SaveFileToolInput {
6024 paths: vec![
6025 std::path::PathBuf::from("root/normal.txt"),
6026 std::path::PathBuf::from("root/readonly/config.txt"),
6027 ],
6028 }),
6029 event_stream,
6030 cx,
6031 )
6032 });
6033
6034 let result = task.await;
6035 assert!(
6036 result.is_err(),
6037 "expected save to be blocked due to denied path"
6038 );
6039 assert!(
6040 result.unwrap_err().contains("blocked"),
6041 "error should mention the save was blocked"
6042 );
6043}
6044
6045#[gpui::test]
6046async fn test_save_file_tool_respects_deny_rules(cx: &mut TestAppContext) {
6047 init_test(cx);
6048
6049 let fs = FakeFs::new(cx.executor());
6050 fs.insert_tree("/root", json!({"config.secret": "secret config"}))
6051 .await;
6052 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6053
6054 cx.update(|cx| {
6055 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6056 settings.tool_permissions.tools.insert(
6057 SaveFileTool::NAME.into(),
6058 agent_settings::ToolRules {
6059 default: Some(settings::ToolPermissionMode::Allow),
6060 always_allow: vec![],
6061 always_deny: vec![agent_settings::CompiledRegex::new(r"\.secret$", false).unwrap()],
6062 always_confirm: vec![],
6063 invalid_patterns: vec![],
6064 },
6065 );
6066 agent_settings::AgentSettings::override_global(settings, cx);
6067 });
6068
6069 #[allow(clippy::arc_with_non_send_sync)]
6070 let tool = Arc::new(crate::SaveFileTool::new(project));
6071 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6072
6073 let task = cx.update(|cx| {
6074 tool.run(
6075 ToolInput::resolved(crate::SaveFileToolInput {
6076 paths: vec![std::path::PathBuf::from("root/config.secret")],
6077 }),
6078 event_stream,
6079 cx,
6080 )
6081 });
6082
6083 let result = task.await;
6084 assert!(result.is_err(), "expected save to be blocked");
6085 assert!(
6086 result.unwrap_err().contains("blocked"),
6087 "error should mention the save was blocked"
6088 );
6089}
6090
6091#[gpui::test]
6092async fn test_web_search_tool_deny_rule_blocks_search(cx: &mut TestAppContext) {
6093 init_test(cx);
6094
6095 cx.update(|cx| {
6096 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6097 settings.tool_permissions.tools.insert(
6098 WebSearchTool::NAME.into(),
6099 agent_settings::ToolRules {
6100 default: Some(settings::ToolPermissionMode::Allow),
6101 always_allow: vec![],
6102 always_deny: vec![
6103 agent_settings::CompiledRegex::new(r"internal\.company", false).unwrap(),
6104 ],
6105 always_confirm: vec![],
6106 invalid_patterns: vec![],
6107 },
6108 );
6109 agent_settings::AgentSettings::override_global(settings, cx);
6110 });
6111
6112 #[allow(clippy::arc_with_non_send_sync)]
6113 let tool = Arc::new(crate::WebSearchTool);
6114 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6115
6116 let input: crate::WebSearchToolInput =
6117 serde_json::from_value(json!({"query": "internal.company.com secrets"})).unwrap();
6118
6119 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6120
6121 let result = task.await;
6122 assert!(result.is_err(), "expected search to be blocked");
6123 match result.unwrap_err() {
6124 crate::WebSearchToolOutput::Error { error } => {
6125 assert!(
6126 error.contains("blocked"),
6127 "error should mention the search was blocked"
6128 );
6129 }
6130 other => panic!("expected Error variant, got: {other:?}"),
6131 }
6132}
6133
6134#[gpui::test]
6135async fn test_edit_file_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6136 init_test(cx);
6137
6138 let fs = FakeFs::new(cx.executor());
6139 fs.insert_tree("/root", json!({"README.md": "# Hello"}))
6140 .await;
6141 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6142
6143 cx.update(|cx| {
6144 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6145 settings.tool_permissions.tools.insert(
6146 EditFileTool::NAME.into(),
6147 agent_settings::ToolRules {
6148 default: Some(settings::ToolPermissionMode::Confirm),
6149 always_allow: vec![agent_settings::CompiledRegex::new(r"\.md$", false).unwrap()],
6150 always_deny: vec![],
6151 always_confirm: vec![],
6152 invalid_patterns: vec![],
6153 },
6154 );
6155 agent_settings::AgentSettings::override_global(settings, cx);
6156 });
6157
6158 let context_server_registry =
6159 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6160 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6161 let templates = crate::Templates::new();
6162 let thread = cx.new(|cx| {
6163 crate::Thread::new(
6164 project.clone(),
6165 cx.new(|_cx| prompt_store::ProjectContext::default()),
6166 context_server_registry,
6167 templates.clone(),
6168 None,
6169 cx,
6170 )
6171 });
6172
6173 #[allow(clippy::arc_with_non_send_sync)]
6174 let tool = Arc::new(crate::EditFileTool::new(
6175 project,
6176 thread.downgrade(),
6177 language_registry,
6178 templates,
6179 ));
6180 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6181
6182 let _task = cx.update(|cx| {
6183 tool.run(
6184 ToolInput::resolved(crate::EditFileToolInput {
6185 display_description: "Edit README".to_string(),
6186 path: "root/README.md".into(),
6187 mode: crate::EditFileMode::Edit,
6188 }),
6189 event_stream,
6190 cx,
6191 )
6192 });
6193
6194 cx.run_until_parked();
6195
6196 let event = rx.try_next();
6197 assert!(
6198 !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6199 "expected no authorization request for allowed .md file"
6200 );
6201}
6202
6203#[gpui::test]
6204async fn test_edit_file_tool_allow_still_prompts_for_local_settings(cx: &mut TestAppContext) {
6205 init_test(cx);
6206
6207 let fs = FakeFs::new(cx.executor());
6208 fs.insert_tree(
6209 "/root",
6210 json!({
6211 ".zed": {
6212 "settings.json": "{}"
6213 },
6214 "README.md": "# Hello"
6215 }),
6216 )
6217 .await;
6218 let project = Project::test(fs.clone(), ["/root".as_ref()], cx).await;
6219
6220 cx.update(|cx| {
6221 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6222 settings.tool_permissions.default = settings::ToolPermissionMode::Allow;
6223 agent_settings::AgentSettings::override_global(settings, cx);
6224 });
6225
6226 let context_server_registry =
6227 cx.new(|cx| crate::ContextServerRegistry::new(project.read(cx).context_server_store(), cx));
6228 let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
6229 let templates = crate::Templates::new();
6230 let thread = cx.new(|cx| {
6231 crate::Thread::new(
6232 project.clone(),
6233 cx.new(|_cx| prompt_store::ProjectContext::default()),
6234 context_server_registry,
6235 templates.clone(),
6236 None,
6237 cx,
6238 )
6239 });
6240
6241 #[allow(clippy::arc_with_non_send_sync)]
6242 let tool = Arc::new(crate::EditFileTool::new(
6243 project,
6244 thread.downgrade(),
6245 language_registry,
6246 templates,
6247 ));
6248
6249 // Editing a file inside .zed/ should still prompt even with global default: allow,
6250 // because local settings paths are sensitive and require confirmation regardless.
6251 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6252 let _task = cx.update(|cx| {
6253 tool.run(
6254 ToolInput::resolved(crate::EditFileToolInput {
6255 display_description: "Edit local settings".to_string(),
6256 path: "root/.zed/settings.json".into(),
6257 mode: crate::EditFileMode::Edit,
6258 }),
6259 event_stream,
6260 cx,
6261 )
6262 });
6263
6264 let _update = rx.expect_update_fields().await;
6265 let _auth = rx.expect_authorization().await;
6266}
6267
6268#[gpui::test]
6269async fn test_fetch_tool_deny_rule_blocks_url(cx: &mut TestAppContext) {
6270 init_test(cx);
6271
6272 cx.update(|cx| {
6273 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6274 settings.tool_permissions.tools.insert(
6275 FetchTool::NAME.into(),
6276 agent_settings::ToolRules {
6277 default: Some(settings::ToolPermissionMode::Allow),
6278 always_allow: vec![],
6279 always_deny: vec![
6280 agent_settings::CompiledRegex::new(r"internal\.company\.com", false).unwrap(),
6281 ],
6282 always_confirm: vec![],
6283 invalid_patterns: vec![],
6284 },
6285 );
6286 agent_settings::AgentSettings::override_global(settings, cx);
6287 });
6288
6289 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6290
6291 #[allow(clippy::arc_with_non_send_sync)]
6292 let tool = Arc::new(crate::FetchTool::new(http_client));
6293 let (event_stream, _rx) = crate::ToolCallEventStream::test();
6294
6295 let input: crate::FetchToolInput =
6296 serde_json::from_value(json!({"url": "https://internal.company.com/api"})).unwrap();
6297
6298 let task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6299
6300 let result = task.await;
6301 assert!(result.is_err(), "expected fetch to be blocked");
6302 assert!(
6303 result.unwrap_err().contains("blocked"),
6304 "error should mention the fetch was blocked"
6305 );
6306}
6307
6308#[gpui::test]
6309async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) {
6310 init_test(cx);
6311
6312 cx.update(|cx| {
6313 let mut settings = agent_settings::AgentSettings::get_global(cx).clone();
6314 settings.tool_permissions.tools.insert(
6315 FetchTool::NAME.into(),
6316 agent_settings::ToolRules {
6317 default: Some(settings::ToolPermissionMode::Confirm),
6318 always_allow: vec![agent_settings::CompiledRegex::new(r"docs\.rs", false).unwrap()],
6319 always_deny: vec![],
6320 always_confirm: vec![],
6321 invalid_patterns: vec![],
6322 },
6323 );
6324 agent_settings::AgentSettings::override_global(settings, cx);
6325 });
6326
6327 let http_client = gpui::http_client::FakeHttpClient::with_200_response();
6328
6329 #[allow(clippy::arc_with_non_send_sync)]
6330 let tool = Arc::new(crate::FetchTool::new(http_client));
6331 let (event_stream, mut rx) = crate::ToolCallEventStream::test();
6332
6333 let input: crate::FetchToolInput =
6334 serde_json::from_value(json!({"url": "https://docs.rs/some-crate"})).unwrap();
6335
6336 let _task = cx.update(|cx| tool.run(ToolInput::resolved(input), event_stream, cx));
6337
6338 cx.run_until_parked();
6339
6340 let event = rx.try_next();
6341 assert!(
6342 !matches!(event, Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(_))))),
6343 "expected no authorization request for allowed docs.rs URL"
6344 );
6345}
6346
6347#[gpui::test]
6348async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
6349 init_test(cx);
6350 always_allow_tools(cx);
6351
6352 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6353 let fake_model = model.as_fake();
6354
6355 // Add a tool so we can simulate tool calls
6356 thread.update(cx, |thread, _cx| {
6357 thread.add_tool(EchoTool);
6358 });
6359
6360 // Start a turn by sending a message
6361 let mut events = thread
6362 .update(cx, |thread, cx| {
6363 thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
6364 })
6365 .unwrap();
6366 cx.run_until_parked();
6367
6368 // Simulate the model making a tool call
6369 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6370 LanguageModelToolUse {
6371 id: "tool_1".into(),
6372 name: "echo".into(),
6373 raw_input: r#"{"text": "hello"}"#.into(),
6374 input: json!({"text": "hello"}),
6375 is_input_complete: true,
6376 thought_signature: None,
6377 },
6378 ));
6379 fake_model
6380 .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
6381
6382 // Signal that a message is queued before ending the stream
6383 thread.update(cx, |thread, _cx| {
6384 thread.set_has_queued_message(true);
6385 });
6386
6387 // Now end the stream - tool will run, and the boundary check should see the queue
6388 fake_model.end_last_completion_stream();
6389
6390 // Collect all events until the turn stops
6391 let all_events = collect_events_until_stop(&mut events, cx).await;
6392
6393 // Verify we received the tool call event
6394 let tool_call_ids: Vec<_> = all_events
6395 .iter()
6396 .filter_map(|e| match e {
6397 Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
6398 _ => None,
6399 })
6400 .collect();
6401 assert_eq!(
6402 tool_call_ids,
6403 vec!["tool_1"],
6404 "Should have received a tool call event for our echo tool"
6405 );
6406
6407 // The turn should have stopped with EndTurn
6408 let stop_reasons = stop_events(all_events);
6409 assert_eq!(
6410 stop_reasons,
6411 vec![acp::StopReason::EndTurn],
6412 "Turn should have ended after tool completion due to queued message"
6413 );
6414
6415 // Verify the queued message flag is still set
6416 thread.update(cx, |thread, _cx| {
6417 assert!(
6418 thread.has_queued_message(),
6419 "Should still have queued message flag set"
6420 );
6421 });
6422
6423 // Thread should be idle now
6424 thread.update(cx, |thread, _cx| {
6425 assert!(
6426 thread.is_turn_complete(),
6427 "Thread should not be running after turn ends"
6428 );
6429 });
6430}
6431
6432#[gpui::test]
6433async fn test_streaming_tool_error_breaks_stream_loop_immediately(cx: &mut TestAppContext) {
6434 init_test(cx);
6435 always_allow_tools(cx);
6436
6437 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6438 let fake_model = model.as_fake();
6439
6440 thread.update(cx, |thread, _cx| {
6441 thread.add_tool(StreamingFailingEchoTool {
6442 receive_chunks_until_failure: 1,
6443 });
6444 });
6445
6446 let _events = thread
6447 .update(cx, |thread, cx| {
6448 thread.send(
6449 UserMessageId::new(),
6450 ["Use the streaming_failing_echo tool"],
6451 cx,
6452 )
6453 })
6454 .unwrap();
6455 cx.run_until_parked();
6456
6457 let tool_use = LanguageModelToolUse {
6458 id: "call_1".into(),
6459 name: StreamingFailingEchoTool::NAME.into(),
6460 raw_input: "hello".into(),
6461 input: json!({}),
6462 is_input_complete: false,
6463 thought_signature: None,
6464 };
6465
6466 fake_model
6467 .send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(tool_use.clone()));
6468
6469 cx.run_until_parked();
6470
6471 let completions = fake_model.pending_completions();
6472 let last_completion = completions.last().unwrap();
6473
6474 assert_eq!(
6475 last_completion.messages[1..],
6476 vec![
6477 LanguageModelRequestMessage {
6478 role: Role::User,
6479 content: vec!["Use the streaming_failing_echo tool".into()],
6480 cache: false,
6481 reasoning_details: None,
6482 },
6483 LanguageModelRequestMessage {
6484 role: Role::Assistant,
6485 content: vec![language_model::MessageContent::ToolUse(tool_use.clone())],
6486 cache: false,
6487 reasoning_details: None,
6488 },
6489 LanguageModelRequestMessage {
6490 role: Role::User,
6491 content: vec![language_model::MessageContent::ToolResult(
6492 LanguageModelToolResult {
6493 tool_use_id: tool_use.id.clone(),
6494 tool_name: tool_use.name,
6495 is_error: true,
6496 content: "failed".into(),
6497 output: Some("failed".into()),
6498 }
6499 )],
6500 cache: true,
6501 reasoning_details: None,
6502 },
6503 ]
6504 );
6505}
6506
6507#[gpui::test]
6508async fn test_streaming_tool_error_waits_for_prior_tools_to_complete(cx: &mut TestAppContext) {
6509 init_test(cx);
6510 always_allow_tools(cx);
6511
6512 let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
6513 let fake_model = model.as_fake();
6514
6515 let (complete_streaming_echo_tool_call_tx, complete_streaming_echo_tool_call_rx) =
6516 oneshot::channel();
6517
6518 thread.update(cx, |thread, _cx| {
6519 thread.add_tool(
6520 StreamingEchoTool::new().with_wait_until_complete(complete_streaming_echo_tool_call_rx),
6521 );
6522 thread.add_tool(StreamingFailingEchoTool {
6523 receive_chunks_until_failure: 1,
6524 });
6525 });
6526
6527 let _events = thread
6528 .update(cx, |thread, cx| {
6529 thread.send(
6530 UserMessageId::new(),
6531 ["Use the streaming_echo tool and the streaming_failing_echo tool"],
6532 cx,
6533 )
6534 })
6535 .unwrap();
6536 cx.run_until_parked();
6537
6538 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6539 LanguageModelToolUse {
6540 id: "call_1".into(),
6541 name: StreamingEchoTool::NAME.into(),
6542 raw_input: "hello".into(),
6543 input: json!({ "text": "hello" }),
6544 is_input_complete: false,
6545 thought_signature: None,
6546 },
6547 ));
6548 let first_tool_use = LanguageModelToolUse {
6549 id: "call_1".into(),
6550 name: StreamingEchoTool::NAME.into(),
6551 raw_input: "hello world".into(),
6552 input: json!({ "text": "hello world" }),
6553 is_input_complete: true,
6554 thought_signature: None,
6555 };
6556 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6557 first_tool_use.clone(),
6558 ));
6559 let second_tool_use = LanguageModelToolUse {
6560 name: StreamingFailingEchoTool::NAME.into(),
6561 raw_input: "hello".into(),
6562 input: json!({ "text": "hello" }),
6563 is_input_complete: false,
6564 thought_signature: None,
6565 id: "call_2".into(),
6566 };
6567 fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6568 second_tool_use.clone(),
6569 ));
6570
6571 cx.run_until_parked();
6572
6573 complete_streaming_echo_tool_call_tx.send(()).unwrap();
6574
6575 cx.run_until_parked();
6576
6577 let completions = fake_model.pending_completions();
6578 let last_completion = completions.last().unwrap();
6579
6580 assert_eq!(
6581 last_completion.messages[1..],
6582 vec![
6583 LanguageModelRequestMessage {
6584 role: Role::User,
6585 content: vec![
6586 "Use the streaming_echo tool and the streaming_failing_echo tool".into()
6587 ],
6588 cache: false,
6589 reasoning_details: None,
6590 },
6591 LanguageModelRequestMessage {
6592 role: Role::Assistant,
6593 content: vec![
6594 language_model::MessageContent::ToolUse(first_tool_use.clone()),
6595 language_model::MessageContent::ToolUse(second_tool_use.clone())
6596 ],
6597 cache: false,
6598 reasoning_details: None,
6599 },
6600 LanguageModelRequestMessage {
6601 role: Role::User,
6602 content: vec![
6603 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6604 tool_use_id: second_tool_use.id.clone(),
6605 tool_name: second_tool_use.name,
6606 is_error: true,
6607 content: "failed".into(),
6608 output: Some("failed".into()),
6609 }),
6610 language_model::MessageContent::ToolResult(LanguageModelToolResult {
6611 tool_use_id: first_tool_use.id.clone(),
6612 tool_name: first_tool_use.name,
6613 is_error: false,
6614 content: "hello world".into(),
6615 output: Some("hello world".into()),
6616 }),
6617 ],
6618 cache: true,
6619 reasoning_details: None,
6620 },
6621 ]
6622 );
6623}
6624
6625#[gpui::test]
6626async fn test_mid_turn_model_and_settings_refresh(cx: &mut TestAppContext) {
6627 let ThreadTest {
6628 model, thread, fs, ..
6629 } = setup(cx, TestModel::Fake).await;
6630 let fake_model_a = model.as_fake();
6631
6632 thread.update(cx, |thread, _cx| {
6633 thread.add_tool(EchoTool);
6634 thread.add_tool(DelayTool);
6635 });
6636
6637 // Set up two profiles: profile-a has both tools, profile-b has only DelayTool.
6638 fs.insert_file(
6639 paths::settings_file(),
6640 json!({
6641 "agent": {
6642 "profiles": {
6643 "profile-a": {
6644 "name": "Profile A",
6645 "tools": {
6646 EchoTool::NAME: true,
6647 DelayTool::NAME: true,
6648 }
6649 },
6650 "profile-b": {
6651 "name": "Profile B",
6652 "tools": {
6653 DelayTool::NAME: true,
6654 }
6655 }
6656 }
6657 }
6658 })
6659 .to_string()
6660 .into_bytes(),
6661 )
6662 .await;
6663 cx.run_until_parked();
6664
6665 thread.update(cx, |thread, cx| {
6666 thread.set_profile(AgentProfileId("profile-a".into()), cx);
6667 thread.set_thinking_enabled(false, cx);
6668 });
6669
6670 // Send a message — first iteration starts with model A, profile-a, thinking off.
6671 thread
6672 .update(cx, |thread, cx| {
6673 thread.send(UserMessageId::new(), ["test mid-turn refresh"], cx)
6674 })
6675 .unwrap();
6676 cx.run_until_parked();
6677
6678 // Verify first request has both tools and thinking disabled.
6679 let completions = fake_model_a.pending_completions();
6680 assert_eq!(completions.len(), 1);
6681 let first_tools = tool_names_for_completion(&completions[0]);
6682 assert_eq!(first_tools, vec![DelayTool::NAME, EchoTool::NAME]);
6683 assert!(!completions[0].thinking_allowed);
6684
6685 // Model A responds with an echo tool call.
6686 fake_model_a.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
6687 LanguageModelToolUse {
6688 id: "tool_1".into(),
6689 name: "echo".into(),
6690 raw_input: r#"{"text":"hello"}"#.into(),
6691 input: json!({"text": "hello"}),
6692 is_input_complete: true,
6693 thought_signature: None,
6694 },
6695 ));
6696 fake_model_a.end_last_completion_stream();
6697
6698 // Before the next iteration runs, switch to profile-b (only DelayTool),
6699 // swap in a new model, and enable thinking.
6700 let fake_model_b = Arc::new(FakeLanguageModel::with_id_and_thinking(
6701 "test-provider",
6702 "model-b",
6703 "Model B",
6704 true,
6705 ));
6706 thread.update(cx, |thread, cx| {
6707 thread.set_profile(AgentProfileId("profile-b".into()), cx);
6708 thread.set_model(fake_model_b.clone() as Arc<dyn LanguageModel>, cx);
6709 thread.set_thinking_enabled(true, cx);
6710 });
6711
6712 // Run until parked — processes the echo tool call, loops back, picks up
6713 // the new model/profile/thinking, and makes a second request to model B.
6714 cx.run_until_parked();
6715
6716 // The second request should have gone to model B.
6717 let model_b_completions = fake_model_b.pending_completions();
6718 assert_eq!(
6719 model_b_completions.len(),
6720 1,
6721 "second request should go to model B"
6722 );
6723
6724 // Profile-b only has DelayTool, so echo should be gone.
6725 let second_tools = tool_names_for_completion(&model_b_completions[0]);
6726 assert_eq!(second_tools, vec![DelayTool::NAME]);
6727
6728 // Thinking should now be enabled.
6729 assert!(model_b_completions[0].thinking_allowed);
6730}