From 01e67d832125a7713520e2b3f41b4e4fd1e5d35e Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Fri, 23 Jan 2026 12:38:38 +0100 Subject: [PATCH] acp: Support unstable session/resume request (#47387) This is behind the beta flag. But some agents don't support a full load flow, but rather just a resume from the current state of the session. They don't emit notifications for past thread messages, which isn't ideal, but the user can at least resume their work without completely losing it. We use it as a fallback if the agent doesn't support loading only, and signal to the user that they won't be able to see previous messages. Release Notes: - N/A --- crates/acp_thread/src/connection.rs | 23 ++++ crates/agent_servers/src/acp.rs | 87 ++++++++++++ crates/agent_ui/src/acp/thread_view.rs | 177 ++++++++++++++++++++++++- crates/agent_ui_v2/src/agents_panel.rs | 2 +- 4 files changed, 282 insertions(+), 7 deletions(-) diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index c2d14eacc7fcb2ab2f13166cd3b0b8983f24ba42..577aebbfcf31efbbfb4657d477a0a75144068af9 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -53,6 +53,29 @@ pub trait AgentConnection { Task::ready(Err(anyhow::Error::msg("Loading sessions is not supported"))) } + /// Whether this agent supports resuming existing sessions without loading history. + fn supports_resume_session(&self, _cx: &App) -> bool { + false + } + + /// Resume an existing session by ID without replaying previous messages. + fn resume_session( + self: Rc, + _session: AgentSessionInfo, + _project: Entity, + _cwd: &Path, + _cx: &mut App, + ) -> Task>> { + Task::ready(Err(anyhow::Error::msg( + "Resuming sessions is not supported", + ))) + } + + /// Whether this agent supports showing session history. + fn supports_session_history(&self, cx: &App) -> bool { + self.supports_load_session(cx) || self.supports_resume_session(cx) + } + fn auth_methods(&self) -> &[acp::AuthMethod]; fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index 872922741f976d010fb997d3651ea4143a658906..646a3455852bb8e2b90658a9a09706dd01af26ba 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -590,6 +590,15 @@ impl AgentConnection for AcpConnection { cx.has_flag::() && self.agent_capabilities.load_session } + fn supports_resume_session(&self, cx: &App) -> bool { + cx.has_flag::() + && self + .agent_capabilities + .session_capabilities + .resume + .is_some() + } + fn load_session( self: Rc, session: AgentSessionInfo, @@ -662,6 +671,84 @@ impl AgentConnection for AcpConnection { }) } + fn resume_session( + self: Rc, + session: AgentSessionInfo, + project: Entity, + cwd: &Path, + cx: &mut App, + ) -> Task>> { + if !cx.has_flag::() + || self + .agent_capabilities + .session_capabilities + .resume + .is_none() + { + return Task::ready(Err(anyhow!(LoadError::Other( + "Resuming sessions is not supported by this agent.".into() + )))); + } + + let cwd = cwd.to_path_buf(); + let mcp_servers = mcp_servers_for_project(&project, cx); + let action_log = cx.new(|_| ActionLog::new(project.clone())); + let thread: Entity = cx.new(|cx| { + AcpThread::new( + self.server_name.clone(), + self.clone(), + project, + action_log, + session.session_id.clone(), + watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()), + cx, + ) + }); + + self.sessions.borrow_mut().insert( + session.session_id.clone(), + AcpSession { + thread: thread.downgrade(), + suppress_abort_err: false, + session_modes: None, + models: None, + config_options: None, + }, + ); + + cx.spawn(async move |cx| { + let response = match self + .connection + .resume_session( + acp::ResumeSessionRequest::new(session.session_id.clone(), cwd) + .mcp_servers(mcp_servers), + ) + .await + { + Ok(response) => response, + Err(err) => { + self.sessions.borrow_mut().remove(&session.session_id); + return Err(map_acp_error(err)); + } + }; + + let (modes, models, config_options) = cx.update(|cx| { + config_state(cx, response.modes, response.models, response.config_options) + }); + if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) { + session.session_modes = modes; + session.models = models; + session.config_options = config_options.map(ConfigOptions::new); + } + + if let Some(session_list) = &self.session_list { + session_list.notify_update(); + } + + Ok(thread) + }) + } + fn auth_methods(&self) -> &[acp::AuthMethod] { &self.auth_methods } diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 07444999d1402d15dfcb8d563a8fcdc82764d7a9..2927ec87d7acb57e05b3e272eb2eebfc6c72b50e 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -381,6 +381,7 @@ pub struct AcpThreadView { is_loading_contents: bool, new_server_version_available: Option, resume_thread_metadata: Option, + resumed_without_history: bool, _cancel_task: Option>, _subscriptions: [Subscription; 5], show_codex_windows_warning: bool, @@ -626,6 +627,7 @@ impl AcpThreadView { focus_handle: cx.focus_handle(), new_server_version_available: None, resume_thread_metadata: resume_thread, + resumed_without_history: false, show_codex_windows_warning, in_flight_prompt: None, skip_queue_processing_count: 0, @@ -659,6 +661,7 @@ impl AcpThreadView { self.turn_started_at = None; self.last_turn_duration = None; self._turn_timer_task = None; + self.resumed_without_history = false; cx.notify(); } @@ -730,22 +733,31 @@ impl AcpThreadView { telemetry::event!("Agent Thread Started", agent = connection.telemetry_id()); + let mut resumed_without_history = false; let result = if let Some(resume) = resume_thread.clone() { cx.update(|_, cx| { + let session_cwd = resume + .cwd + .clone() + .unwrap_or_else(|| fallback_cwd.as_ref().to_path_buf()); if connection.supports_load_session(cx) { - let session_cwd = resume - .cwd - .clone() - .unwrap_or_else(|| fallback_cwd.as_ref().to_path_buf()); connection.clone().load_session( resume, project.clone(), session_cwd.as_path(), cx, ) + } else if connection.supports_resume_session(cx) { + resumed_without_history = true; + connection.clone().resume_session( + resume, + project.clone(), + session_cwd.as_path(), + cx, + ) } else { Task::ready(Err(anyhow!(LoadError::Other( - "Loading sessions is not supported by this agent.".into() + "Loading or resuming sessions is not supported by this agent.".into() )))) } }) @@ -782,6 +794,7 @@ impl AcpThreadView { Ok(thread) => { let action_log = thread.read(cx).action_log().clone(); + this.resumed_without_history = resumed_without_history; this.prompt_capabilities .replace(thread.read(cx).prompt_capabilities()); @@ -800,7 +813,7 @@ impl AcpThreadView { let connection = thread.read(cx).connection().clone(); let session_id = thread.read(cx).session_id().clone(); - let session_list = if connection.supports_load_session(cx) { + let session_list = if connection.supports_session_history(cx) { connection.session_list(cx) } else { None @@ -4893,6 +4906,24 @@ impl AcpThreadView { ) } + fn render_resume_notice(&self, _cx: &Context) -> AnyElement { + let description = "This agent does not support viewing previous messages. However, your session will still continue from where you last left off."; + + div() + .px_2() + .pt_2() + .pb_3() + .w_full() + .child( + Callout::new() + .severity(Severity::Info) + .icon(IconName::Info) + .title("Resumed Session") + .description(description), + ) + .into_any_element() + } + fn update_recent_history_from_cache( &mut self, history: &Entity, @@ -8513,6 +8544,9 @@ impl Render for AcpThreadView { .child(self.render_load_error(e, window, cx)) .into_any(), ThreadState::Ready { .. } => v_flex().flex_1().map(|this| { + let this = this.when(self.resumed_without_history, |this| { + this.child(self.render_resume_notice(cx)) + }); if has_messages { this.child( list( @@ -8875,6 +8909,44 @@ pub(crate) mod tests { }); } + #[gpui::test] + async fn test_resume_without_history_adds_notice(cx: &mut TestAppContext) { + init_test(cx); + + let session = AgentSessionInfo::new(SessionId::new("resume-session")); + let fs = FakeFs::new(cx.executor()); + let project = Project::test(fs, [], cx).await; + let (workspace, cx) = + cx.add_window_view(|window, cx| Workspace::test_new(project.clone(), window, cx)); + + let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx))); + let history = cx.update(|window, cx| cx.new(|cx| AcpThreadHistory::new(None, window, cx))); + + let thread_view = cx.update(|window, cx| { + cx.new(|cx| { + AcpThreadView::new( + Rc::new(StubAgentServer::new(ResumeOnlyAgentConnection)), + Some(session), + None, + workspace.downgrade(), + project, + Some(thread_store), + None, + history, + window, + cx, + ) + }) + }); + + cx.run_until_parked(); + + thread_view.read_with(cx, |view, _cx| { + assert!(view.resumed_without_history); + assert_eq!(view.list_state.item_count(), 0); + }); + } + #[gpui::test] async fn test_refusal_handling(cx: &mut TestAppContext) { init_test(cx); @@ -9216,6 +9288,99 @@ pub(crate) mod tests { } } + #[derive(Clone)] + struct ResumeOnlyAgentConnection; + + impl AgentConnection for ResumeOnlyAgentConnection { + fn telemetry_id(&self) -> SharedString { + "resume-only".into() + } + + fn new_thread( + self: Rc, + project: Entity, + _cwd: &Path, + cx: &mut gpui::App, + ) -> Task>> { + let action_log = cx.new(|_| ActionLog::new(project.clone())); + let thread = cx.new(|cx| { + AcpThread::new( + "ResumeOnlyAgentConnection", + self.clone(), + project, + action_log, + SessionId::new("new-session"), + watch::Receiver::constant( + acp::PromptCapabilities::new() + .image(true) + .audio(true) + .embedded_context(true), + ), + cx, + ) + }); + Task::ready(Ok(thread)) + } + + fn supports_resume_session(&self, _cx: &App) -> bool { + true + } + + fn resume_session( + self: Rc, + session: AgentSessionInfo, + project: Entity, + _cwd: &Path, + cx: &mut App, + ) -> Task>> { + let action_log = cx.new(|_| ActionLog::new(project.clone())); + let thread = cx.new(|cx| { + AcpThread::new( + "ResumeOnlyAgentConnection", + self.clone(), + project, + action_log, + session.session_id, + watch::Receiver::constant( + acp::PromptCapabilities::new() + .image(true) + .audio(true) + .embedded_context(true), + ), + cx, + ) + }); + Task::ready(Ok(thread)) + } + + fn auth_methods(&self) -> &[acp::AuthMethod] { + &[] + } + + fn authenticate( + &self, + _method_id: acp::AuthMethodId, + _cx: &mut App, + ) -> Task> { + Task::ready(Ok(())) + } + + fn prompt( + &self, + _id: Option, + _params: acp::PromptRequest, + _cx: &mut App, + ) -> Task> { + Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))) + } + + fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {} + + fn into_any(self: Rc) -> Rc { + self + } + } + impl AgentServer for StubAgentServer where C: 'static + AgentConnection + Send + Clone, diff --git a/crates/agent_ui_v2/src/agents_panel.rs b/crates/agent_ui_v2/src/agents_panel.rs index f330dec7f16ccd3b06f53541285b92d0ac57875c..f8e223ffbf8a05aa8e78af6baf7ec95265afaf45 100644 --- a/crates/agent_ui_v2/src/agents_panel.rs +++ b/crates/agent_ui_v2/src/agents_panel.rs @@ -151,7 +151,7 @@ impl AgentsPanel { }; cx.update(|cx| { - if connection.supports_load_session(cx) + if connection.supports_session_history(cx) && let Some(session_list) = connection.session_list(cx) { history_handle.update(cx, |history, cx| {