From 9304e02b9a2e72169d9082f56fce2c632a02a8eb Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 20 Feb 2026 19:34:38 +0100 Subject: [PATCH] agent: Allow the agent to reprompt an existing subagent (#49737) Also fixes a regression I introduced that didn't cancel threads after the timeout was hit Release Notes: - N/A --- crates/agent/src/agent.rs | 90 +++++++++-- crates/agent/src/tests/mod.rs | 191 ++++++++++++++++++++++++ crates/agent/src/thread.rs | 13 ++ crates/agent/src/tools/subagent_tool.rs | 69 ++++++--- 4 files changed, 327 insertions(+), 36 deletions(-) diff --git a/crates/agent/src/agent.rs b/crates/agent/src/agent.rs index 8cea712c19cebb3261573184349987d67492ac62..4e8691c0b5017e57fb5af1a6db211c41f7629f49 100644 --- a/crates/agent/src/agent.rs +++ b/crates/agent/src/agent.rs @@ -1611,11 +1611,60 @@ impl NativeThreadEnvironment { agent.register_session(subagent_thread.clone(), cx) })?; + Self::prompt_subagent( + session_id, + subagent_thread, + acp_thread, + parent_thread_entity, + initial_prompt, + timeout, + cx, + ) + } + + pub(crate) fn resume_subagent_thread( + agent: WeakEntity, + parent_thread_entity: Entity, + session_id: acp::SessionId, + follow_up_prompt: String, + timeout: Option, + cx: &mut App, + ) -> Result> { + let (subagent_thread, acp_thread) = agent.update(cx, |agent, _cx| { + let session = agent + .sessions + .get(&session_id) + .ok_or_else(|| anyhow!("No subagent session found with id {session_id}"))?; + anyhow::Ok((session.thread.clone(), session.acp_thread.clone())) + })??; + + Self::prompt_subagent( + session_id, + subagent_thread, + acp_thread, + parent_thread_entity, + follow_up_prompt, + timeout, + cx, + ) + } + + fn prompt_subagent( + session_id: acp::SessionId, + subagent_thread: Entity, + acp_thread: Entity, + parent_thread_entity: Entity, + prompt: String, + timeout: Option, + cx: &mut App, + ) -> Result> { parent_thread_entity.update(cx, |parent_thread, _cx| { parent_thread.register_running_subagent(subagent_thread.downgrade()) }); - let task = acp_thread.update(cx, |agent, cx| agent.send(vec![initial_prompt.into()], cx)); + let task = acp_thread.update(cx, |acp_thread, cx| { + acp_thread.send(vec![prompt.into()], cx) + }); let timeout_timer = timeout.map(|d| cx.background_executor().timer(d)); let wait_for_prompt_to_complete = cx @@ -1708,6 +1757,24 @@ impl ThreadEnvironment for NativeThreadEnvironment { cx, ) } + + fn resume_subagent( + &self, + parent_thread_entity: Entity, + session_id: acp::SessionId, + follow_up_prompt: String, + timeout: Option, + cx: &mut App, + ) -> Result> { + Self::resume_subagent_thread( + self.agent.clone(), + parent_thread_entity, + session_id, + follow_up_prompt, + timeout, + cx, + ) + } } #[derive(Debug, Clone, Copy)] @@ -1737,21 +1804,20 @@ impl SubagentHandle for NativeSubagentHandle { let parent_thread = self.parent_thread.clone(); cx.spawn(async move |cx| { - match wait_for_prompt.await { - SubagentInitialPromptResult::Completed => {} + let result = match wait_for_prompt.await { + SubagentInitialPromptResult::Completed => thread.read_with(cx, |thread, _cx| { + thread + .last_message() + .map(|m| m.to_markdown()) + .context("No response from subagent") + }), SubagentInitialPromptResult::Timeout => { - return Err(anyhow!("The time to complete the task was exceeded.")); + thread.update(cx, |thread, cx| thread.cancel(cx)).await; + Err(anyhow!("The time to complete the task was exceeded.")) } - SubagentInitialPromptResult::Cancelled => return Err(anyhow!("User cancelled")), + SubagentInitialPromptResult::Cancelled => Err(anyhow!("User cancelled")), }; - let result = thread.read_with(cx, |thread, _cx| { - thread - .last_message() - .map(|m| m.to_markdown()) - .context("No response from subagent") - }); - parent_thread .update(cx, |parent_thread, cx| { parent_thread.unregister_running_subagent(&subagent_session_id, cx) diff --git a/crates/agent/src/tests/mod.rs b/crates/agent/src/tests/mod.rs index d895557fc1dec3f3959e04d3cd6c8f5bc9e16851..2673e33d0e7c8a05c204ee1c6fef6b74bac80b08 100644 --- a/crates/agent/src/tests/mod.rs +++ b/crates/agent/src/tests/mod.rs @@ -4233,6 +4233,7 @@ async fn test_subagent_tool_call_end_to_end(cx: &mut TestAppContext) { let subagent_tool_input = SubagentToolInput { label: "label".to_string(), task: "subagent task prompt".to_string(), + session_id: None, timeout_secs: None, }; let subagent_tool_use = LanguageModelToolUse { @@ -4383,6 +4384,7 @@ async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAp let subagent_tool_input = SubagentToolInput { label: "label".to_string(), task: "subagent task prompt".to_string(), + session_id: None, timeout_secs: None, }; let subagent_tool_use = LanguageModelToolUse { @@ -4460,6 +4462,195 @@ async fn test_subagent_tool_call_cancellation_during_task_prompt(cx: &mut TestAp }); } +#[gpui::test] +async fn test_subagent_tool_resume_session(cx: &mut TestAppContext) { + init_test(cx); + cx.update(|cx| { + LanguageModelRegistry::test(cx); + }); + cx.update(|cx| { + cx.update_flags(true, vec!["subagents".to_string()]); + }); + + let fs = FakeFs::new(cx.executor()); + fs.insert_tree( + "/", + json!({ + "a": { + "b.md": "Lorem" + } + }), + ) + .await; + let project = Project::test(fs.clone(), [path!("/a").as_ref()], cx).await; + let thread_store = cx.new(|cx| ThreadStore::new(cx)); + let agent = NativeAgent::new( + project.clone(), + thread_store.clone(), + Templates::new(), + None, + fs.clone(), + &mut cx.to_async(), + ) + .await + .unwrap(); + let connection = Rc::new(NativeAgentConnection(agent.clone())); + + let acp_thread = cx + .update(|cx| { + connection + .clone() + .new_session(project.clone(), Path::new(""), cx) + }) + .await + .unwrap(); + let session_id = acp_thread.read_with(cx, |thread, _| thread.session_id().clone()); + let thread = agent.read_with(cx, |agent, _| { + agent.sessions.get(&session_id).unwrap().thread.clone() + }); + let model = Arc::new(FakeLanguageModel::default()); + + thread.update(cx, |thread, cx| { + thread.set_model(model.clone(), cx); + }); + cx.run_until_parked(); + + // === First turn: create subagent === + let send = acp_thread.update(cx, |thread, cx| thread.send_raw("First prompt", cx)); + cx.run_until_parked(); + model.send_last_completion_stream_text_chunk("spawning subagent"); + let subagent_tool_input = SubagentToolInput { + label: "initial task".to_string(), + task: "do the first task".to_string(), + session_id: None, + timeout_secs: None, + }; + let subagent_tool_use = LanguageModelToolUse { + id: "subagent_1".into(), + name: SubagentTool::NAME.into(), + raw_input: serde_json::to_string(&subagent_tool_input).unwrap(), + input: serde_json::to_value(&subagent_tool_input).unwrap(), + is_input_complete: true, + thought_signature: None, + }; + model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse( + subagent_tool_use, + )); + model.end_last_completion_stream(); + + cx.run_until_parked(); + + let subagent_session_id = thread.read_with(cx, |thread, cx| { + thread + .running_subagent_ids(cx) + .get(0) + .expect("subagent thread should be running") + .clone() + }); + + let subagent_acp_thread = agent.read_with(cx, |agent, _cx| { + agent + .sessions + .get(&subagent_session_id) + .expect("subagent session should exist") + .acp_thread + .clone() + }); + + // Subagent responds + model.send_last_completion_stream_text_chunk("first task response"); + model.end_last_completion_stream(); + + cx.run_until_parked(); + + // Parent model responds to complete first turn + model.send_last_completion_stream_text_chunk("First response"); + model.end_last_completion_stream(); + + send.await.unwrap(); + + // Verify subagent is no longer running + thread.read_with(cx, |thread, cx| { + assert!( + thread.running_subagent_ids(cx).is_empty(), + "subagent should not be running after completion" + ); + }); + + // === Second turn: resume subagent with session_id === + let send2 = acp_thread.update(cx, |thread, cx| thread.send_raw("Follow up", cx)); + cx.run_until_parked(); + model.send_last_completion_stream_text_chunk("resuming subagent"); + let resume_tool_input = SubagentToolInput { + label: "follow-up task".to_string(), + task: "do the follow-up task".to_string(), + session_id: Some(subagent_session_id.clone()), + timeout_secs: None, + }; + let resume_tool_use = LanguageModelToolUse { + id: "subagent_2".into(), + name: SubagentTool::NAME.into(), + raw_input: serde_json::to_string(&resume_tool_input).unwrap(), + input: serde_json::to_value(&resume_tool_input).unwrap(), + is_input_complete: true, + thought_signature: None, + }; + model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(resume_tool_use)); + model.end_last_completion_stream(); + + cx.run_until_parked(); + + // Subagent should be running again with the same session + thread.read_with(cx, |thread, cx| { + let running = thread.running_subagent_ids(cx); + assert_eq!(running.len(), 1, "subagent should be running"); + assert_eq!(running[0], subagent_session_id, "should be same session"); + }); + + // Subagent responds to follow-up + model.send_last_completion_stream_text_chunk("follow-up task response"); + model.end_last_completion_stream(); + + cx.run_until_parked(); + + // Parent model responds to complete second turn + model.send_last_completion_stream_text_chunk("Second response"); + model.end_last_completion_stream(); + + send2.await.unwrap(); + + // Verify subagent is no longer running + thread.read_with(cx, |thread, cx| { + assert!( + thread.running_subagent_ids(cx).is_empty(), + "subagent should not be running after resume completion" + ); + }); + + // Verify the subagent's acp thread has both conversation turns + assert_eq!( + subagent_acp_thread.read_with(cx, |thread, cx| thread.to_markdown(cx)), + indoc! {" + ## User + + do the first task + + ## Assistant + + first task response + + ## User + + do the follow-up task + + ## Assistant + + follow-up task response + + "} + ); +} + #[gpui::test] async fn test_subagent_tool_is_present_when_feature_flag_enabled(cx: &mut TestAppContext) { init_test(cx); diff --git a/crates/agent/src/thread.rs b/crates/agent/src/thread.rs index 93024a97073e6f2aa0f8bfe7a135c01633dc0b48..cc8a46987d0d01a29b4b51abed21a9e415efc5a1 100644 --- a/crates/agent/src/thread.rs +++ b/crates/agent/src/thread.rs @@ -621,6 +621,19 @@ pub trait ThreadEnvironment { timeout: Option, cx: &mut App, ) -> Result>; + + fn resume_subagent( + &self, + _parent_thread: Entity, + _session_id: acp::SessionId, + _follow_up_prompt: String, + _timeout: Option, + _cx: &mut App, + ) -> Result> { + Err(anyhow::anyhow!( + "Resuming subagent sessions is not supported" + )) + } } #[derive(Debug)] diff --git a/crates/agent/src/tools/subagent_tool.rs b/crates/agent/src/tools/subagent_tool.rs index 1c9487dcbe611fe98d1614573b74842757511e41..8212192d7bfc5119db4070aafe201f20ee4354a6 100644 --- a/crates/agent/src/tools/subagent_tool.rs +++ b/crates/agent/src/tools/subagent_tool.rs @@ -23,6 +23,8 @@ use crate::{AgentTool, Thread, ThreadEnvironment, ToolCallEventStream}; /// /// You will receive only the agent's final message as output. /// +/// If a response (success or error) includes a session_id, you can send a follow-up message to that session by passing the session_id back. This is useful for multi-turn conversations with a subagent, asking clarifying questions about its output, or retrying after timeouts or transient failures. +/// /// Note: /// - Agents cannot use tools you don't have access to. /// - If spawning multiple agents that might write to the filesystem, provide guidance on how to avoid conflicts (e.g. assign each to different directories). @@ -32,6 +34,9 @@ pub struct SubagentToolInput { pub label: String, /// Describe the task for the agent to perform. Be specific about what you want accomplished. Include all necessary context (file paths, requirements, constraints) since the agent cannot see your conversation. pub task: String, + /// Optional session ID of an existing subagent to continue a conversation with. When provided, the task is sent as a follow-up message to that session instead of creating a new one. Use this to ask clarifying questions, request changes based on previous output, or retry after errors. + #[serde(default)] + pub session_id: Option, /// Optional maximum runtime in seconds. The purpose of this timeout is to prevent the agent from getting stuck in infinite loops, NOT to estimate task duration. Be generous if setting. If not set, the agent runs until it completes or is cancelled. #[serde(default)] pub timeout_secs: Option, @@ -45,18 +50,18 @@ pub enum SubagentToolOutput { output: String, }, Error { + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + session_id: Option, error: String, }, } impl From for LanguageModelToolResultContent { fn from(output: SubagentToolOutput) -> Self { - match output { - output @ SubagentToolOutput::Success { .. } => serde_json::to_string(&output) - .unwrap_or_else(|e| format!("Failed to serialize subagent output: {e}")) - .into(), - SubagentToolOutput::Error { error } => error.into(), - } + serde_json::to_string(&output) + .unwrap_or_else(|e| format!("Failed to serialize subagent output: {e}")) + .into() } } @@ -103,25 +108,37 @@ impl AgentTool for SubagentTool { ) -> Task> { let Some(parent_thread_entity) = self.parent_thread.upgrade() else { return Task::ready(Err(SubagentToolOutput::Error { + session_id: None, error: "Parent thread no longer exists".to_string(), })); }; - let subagent = match self.environment.create_subagent( - parent_thread_entity, - input.label, - input.task, - input.timeout_secs.map(|secs| Duration::from_secs(secs)), - cx, - ) { + let subagent = if let Some(session_id) = input.session_id { + self.environment.resume_subagent( + parent_thread_entity, + session_id, + input.task, + input.timeout_secs.map(Duration::from_secs), + cx, + ) + } else { + self.environment.create_subagent( + parent_thread_entity, + input.label, + input.task, + input.timeout_secs.map(Duration::from_secs), + cx, + ) + }; + let subagent = match subagent { Ok(subagent) => subagent, Err(err) => { return Task::ready(Err(SubagentToolOutput::Error { + session_id: None, error: err.to_string(), })); } }; - let subagent_session_id = subagent.id(); event_stream.subagent_spawned(subagent_session_id.clone()); @@ -137,6 +154,7 @@ impl AgentTool for SubagentTool { .wait_for_output(cx) .await .map_err(|e| SubagentToolOutput::Error { + session_id: Some(subagent_session_id.clone()), error: e.to_string(), })?; Ok(SubagentToolOutput::Success { @@ -153,17 +171,20 @@ impl AgentTool for SubagentTool { event_stream: ToolCallEventStream, _cx: &mut App, ) -> Result<()> { - match output { - SubagentToolOutput::Success { session_id, .. } => { - event_stream.subagent_spawned(session_id.clone()); - let meta = acp::Meta::from_iter([( - SUBAGENT_SESSION_ID_META_KEY.into(), - session_id.to_string().into(), - )]); - event_stream.update_fields_with_meta(acp::ToolCallUpdateFields::new(), Some(meta)); - } - SubagentToolOutput::Error { .. } => {} + let session_id = match &output { + SubagentToolOutput::Success { session_id, .. } => Some(session_id), + SubagentToolOutput::Error { session_id, .. } => session_id.as_ref(), + }; + + if let Some(session_id) = session_id { + event_stream.subagent_spawned(session_id.clone()); + let meta = acp::Meta::from_iter([( + SUBAGENT_SESSION_ID_META_KEY.into(), + session_id.to_string().into(), + )]); + event_stream.update_fields_with_meta(acp::ToolCallUpdateFields::new(), Some(meta)); } + Ok(()) } }