acp: Support unstable session/resume request (#47387)

Ben Brandt created

This is behind the beta flag. But some agents don't support a full load
flow, but rather just a resume from the current state of the session.
They don't emit notifications for past thread messages, which isn't
ideal, but the user can at least resume their work without completely
losing it.

We use it as a fallback if the agent doesn't support loading only, and
signal to the user that they won't be able to see previous messages.

Release Notes:

- N/A

Change summary

crates/acp_thread/src/connection.rs    |  23 +++
crates/agent_servers/src/acp.rs        |  87 +++++++++++++
crates/agent_ui/src/acp/thread_view.rs | 177 +++++++++++++++++++++++++++
crates/agent_ui_v2/src/agents_panel.rs |   2 
4 files changed, 282 insertions(+), 7 deletions(-)

Detailed changes

crates/acp_thread/src/connection.rs 🔗

@@ -53,6 +53,29 @@ pub trait AgentConnection {
         Task::ready(Err(anyhow::Error::msg("Loading sessions is not supported")))
     }
 
+    /// Whether this agent supports resuming existing sessions without loading history.
+    fn supports_resume_session(&self, _cx: &App) -> bool {
+        false
+    }
+
+    /// Resume an existing session by ID without replaying previous messages.
+    fn resume_session(
+        self: Rc<Self>,
+        _session: AgentSessionInfo,
+        _project: Entity<Project>,
+        _cwd: &Path,
+        _cx: &mut App,
+    ) -> Task<Result<Entity<AcpThread>>> {
+        Task::ready(Err(anyhow::Error::msg(
+            "Resuming sessions is not supported",
+        )))
+    }
+
+    /// Whether this agent supports showing session history.
+    fn supports_session_history(&self, cx: &App) -> bool {
+        self.supports_load_session(cx) || self.supports_resume_session(cx)
+    }
+
     fn auth_methods(&self) -> &[acp::AuthMethod];
 
     fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>>;

crates/agent_servers/src/acp.rs 🔗

@@ -590,6 +590,15 @@ impl AgentConnection for AcpConnection {
         cx.has_flag::<AcpBetaFeatureFlag>() && self.agent_capabilities.load_session
     }
 
+    fn supports_resume_session(&self, cx: &App) -> bool {
+        cx.has_flag::<AcpBetaFeatureFlag>()
+            && self
+                .agent_capabilities
+                .session_capabilities
+                .resume
+                .is_some()
+    }
+
     fn load_session(
         self: Rc<Self>,
         session: AgentSessionInfo,
@@ -662,6 +671,84 @@ impl AgentConnection for AcpConnection {
         })
     }
 
+    fn resume_session(
+        self: Rc<Self>,
+        session: AgentSessionInfo,
+        project: Entity<Project>,
+        cwd: &Path,
+        cx: &mut App,
+    ) -> Task<Result<Entity<AcpThread>>> {
+        if !cx.has_flag::<AcpBetaFeatureFlag>()
+            || self
+                .agent_capabilities
+                .session_capabilities
+                .resume
+                .is_none()
+        {
+            return Task::ready(Err(anyhow!(LoadError::Other(
+                "Resuming sessions is not supported by this agent.".into()
+            ))));
+        }
+
+        let cwd = cwd.to_path_buf();
+        let mcp_servers = mcp_servers_for_project(&project, cx);
+        let action_log = cx.new(|_| ActionLog::new(project.clone()));
+        let thread: Entity<AcpThread> = cx.new(|cx| {
+            AcpThread::new(
+                self.server_name.clone(),
+                self.clone(),
+                project,
+                action_log,
+                session.session_id.clone(),
+                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
+                cx,
+            )
+        });
+
+        self.sessions.borrow_mut().insert(
+            session.session_id.clone(),
+            AcpSession {
+                thread: thread.downgrade(),
+                suppress_abort_err: false,
+                session_modes: None,
+                models: None,
+                config_options: None,
+            },
+        );
+
+        cx.spawn(async move |cx| {
+            let response = match self
+                .connection
+                .resume_session(
+                    acp::ResumeSessionRequest::new(session.session_id.clone(), cwd)
+                        .mcp_servers(mcp_servers),
+                )
+                .await
+            {
+                Ok(response) => response,
+                Err(err) => {
+                    self.sessions.borrow_mut().remove(&session.session_id);
+                    return Err(map_acp_error(err));
+                }
+            };
+
+            let (modes, models, config_options) = cx.update(|cx| {
+                config_state(cx, response.modes, response.models, response.config_options)
+            });
+            if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
+                session.session_modes = modes;
+                session.models = models;
+                session.config_options = config_options.map(ConfigOptions::new);
+            }
+
+            if let Some(session_list) = &self.session_list {
+                session_list.notify_update();
+            }
+
+            Ok(thread)
+        })
+    }
+
     fn auth_methods(&self) -> &[acp::AuthMethod] {
         &self.auth_methods
     }

crates/agent_ui/src/acp/thread_view.rs 🔗

@@ -381,6 +381,7 @@ pub struct AcpThreadView {
     is_loading_contents: bool,
     new_server_version_available: Option<SharedString>,
     resume_thread_metadata: Option<AgentSessionInfo>,
+    resumed_without_history: bool,
     _cancel_task: Option<Task<()>>,
     _subscriptions: [Subscription; 5],
     show_codex_windows_warning: bool,
@@ -626,6 +627,7 @@ impl AcpThreadView {
             focus_handle: cx.focus_handle(),
             new_server_version_available: None,
             resume_thread_metadata: resume_thread,
+            resumed_without_history: false,
             show_codex_windows_warning,
             in_flight_prompt: None,
             skip_queue_processing_count: 0,
@@ -659,6 +661,7 @@ impl AcpThreadView {
         self.turn_started_at = None;
         self.last_turn_duration = None;
         self._turn_timer_task = None;
+        self.resumed_without_history = false;
         cx.notify();
     }
 
@@ -730,22 +733,31 @@ impl AcpThreadView {
 
             telemetry::event!("Agent Thread Started", agent = connection.telemetry_id());
 
+            let mut resumed_without_history = false;
             let result = if let Some(resume) = resume_thread.clone() {
                 cx.update(|_, cx| {
+                    let session_cwd = resume
+                        .cwd
+                        .clone()
+                        .unwrap_or_else(|| fallback_cwd.as_ref().to_path_buf());
                     if connection.supports_load_session(cx) {
-                        let session_cwd = resume
-                            .cwd
-                            .clone()
-                            .unwrap_or_else(|| fallback_cwd.as_ref().to_path_buf());
                         connection.clone().load_session(
                             resume,
                             project.clone(),
                             session_cwd.as_path(),
                             cx,
                         )
+                    } else if connection.supports_resume_session(cx) {
+                        resumed_without_history = true;
+                        connection.clone().resume_session(
+                            resume,
+                            project.clone(),
+                            session_cwd.as_path(),
+                            cx,
+                        )
                     } else {
                         Task::ready(Err(anyhow!(LoadError::Other(
-                            "Loading sessions is not supported by this agent.".into()
+                            "Loading or resuming sessions is not supported by this agent.".into()
                         ))))
                     }
                 })
@@ -782,6 +794,7 @@ impl AcpThreadView {
                     Ok(thread) => {
                         let action_log = thread.read(cx).action_log().clone();
 
+                        this.resumed_without_history = resumed_without_history;
                         this.prompt_capabilities
                             .replace(thread.read(cx).prompt_capabilities());
 
@@ -800,7 +813,7 @@ impl AcpThreadView {
 
                         let connection = thread.read(cx).connection().clone();
                         let session_id = thread.read(cx).session_id().clone();
-                        let session_list = if connection.supports_load_session(cx) {
+                        let session_list = if connection.supports_session_history(cx) {
                             connection.session_list(cx)
                         } else {
                             None
@@ -4893,6 +4906,24 @@ impl AcpThreadView {
         )
     }
 
+    fn render_resume_notice(&self, _cx: &Context<Self>) -> AnyElement {
+        let description = "This agent does not support viewing previous messages. However, your session will still continue from where you last left off.";
+
+        div()
+            .px_2()
+            .pt_2()
+            .pb_3()
+            .w_full()
+            .child(
+                Callout::new()
+                    .severity(Severity::Info)
+                    .icon(IconName::Info)
+                    .title("Resumed Session")
+                    .description(description),
+            )
+            .into_any_element()
+    }
+
     fn update_recent_history_from_cache(
         &mut self,
         history: &Entity<AcpThreadHistory>,
@@ -8513,6 +8544,9 @@ impl Render for AcpThreadView {
                     .child(self.render_load_error(e, window, cx))
                     .into_any(),
                 ThreadState::Ready { .. } => v_flex().flex_1().map(|this| {
+                    let this = this.when(self.resumed_without_history, |this| {
+                        this.child(self.render_resume_notice(cx))
+                    });
                     if has_messages {
                         this.child(
                             list(
@@ -8875,6 +8909,44 @@ pub(crate) mod tests {
         });
     }
 
+    #[gpui::test]
+    async fn test_resume_without_history_adds_notice(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session = AgentSessionInfo::new(SessionId::new("resume-session"));
+        let fs = FakeFs::new(cx.executor());
+        let project = Project::test(fs, [], cx).await;
+        let (workspace, cx) =
+            cx.add_window_view(|window, cx| Workspace::test_new(project.clone(), window, cx));
+
+        let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
+        let history = cx.update(|window, cx| cx.new(|cx| AcpThreadHistory::new(None, window, cx)));
+
+        let thread_view = cx.update(|window, cx| {
+            cx.new(|cx| {
+                AcpThreadView::new(
+                    Rc::new(StubAgentServer::new(ResumeOnlyAgentConnection)),
+                    Some(session),
+                    None,
+                    workspace.downgrade(),
+                    project,
+                    Some(thread_store),
+                    None,
+                    history,
+                    window,
+                    cx,
+                )
+            })
+        });
+
+        cx.run_until_parked();
+
+        thread_view.read_with(cx, |view, _cx| {
+            assert!(view.resumed_without_history);
+            assert_eq!(view.list_state.item_count(), 0);
+        });
+    }
+
     #[gpui::test]
     async fn test_refusal_handling(cx: &mut TestAppContext) {
         init_test(cx);
@@ -9216,6 +9288,99 @@ pub(crate) mod tests {
         }
     }
 
+    #[derive(Clone)]
+    struct ResumeOnlyAgentConnection;
+
+    impl AgentConnection for ResumeOnlyAgentConnection {
+        fn telemetry_id(&self) -> SharedString {
+            "resume-only".into()
+        }
+
+        fn new_thread(
+            self: Rc<Self>,
+            project: Entity<Project>,
+            _cwd: &Path,
+            cx: &mut gpui::App,
+        ) -> Task<gpui::Result<Entity<AcpThread>>> {
+            let action_log = cx.new(|_| ActionLog::new(project.clone()));
+            let thread = cx.new(|cx| {
+                AcpThread::new(
+                    "ResumeOnlyAgentConnection",
+                    self.clone(),
+                    project,
+                    action_log,
+                    SessionId::new("new-session"),
+                    watch::Receiver::constant(
+                        acp::PromptCapabilities::new()
+                            .image(true)
+                            .audio(true)
+                            .embedded_context(true),
+                    ),
+                    cx,
+                )
+            });
+            Task::ready(Ok(thread))
+        }
+
+        fn supports_resume_session(&self, _cx: &App) -> bool {
+            true
+        }
+
+        fn resume_session(
+            self: Rc<Self>,
+            session: AgentSessionInfo,
+            project: Entity<Project>,
+            _cwd: &Path,
+            cx: &mut App,
+        ) -> Task<gpui::Result<Entity<AcpThread>>> {
+            let action_log = cx.new(|_| ActionLog::new(project.clone()));
+            let thread = cx.new(|cx| {
+                AcpThread::new(
+                    "ResumeOnlyAgentConnection",
+                    self.clone(),
+                    project,
+                    action_log,
+                    session.session_id,
+                    watch::Receiver::constant(
+                        acp::PromptCapabilities::new()
+                            .image(true)
+                            .audio(true)
+                            .embedded_context(true),
+                    ),
+                    cx,
+                )
+            });
+            Task::ready(Ok(thread))
+        }
+
+        fn auth_methods(&self) -> &[acp::AuthMethod] {
+            &[]
+        }
+
+        fn authenticate(
+            &self,
+            _method_id: acp::AuthMethodId,
+            _cx: &mut App,
+        ) -> Task<gpui::Result<()>> {
+            Task::ready(Ok(()))
+        }
+
+        fn prompt(
+            &self,
+            _id: Option<acp_thread::UserMessageId>,
+            _params: acp::PromptRequest,
+            _cx: &mut App,
+        ) -> Task<gpui::Result<acp::PromptResponse>> {
+            Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
+        }
+
+        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
+
+        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+            self
+        }
+    }
+
     impl<C> AgentServer for StubAgentServer<C>
     where
         C: 'static + AgentConnection + Send + Clone,

crates/agent_ui_v2/src/agents_panel.rs 🔗

@@ -151,7 +151,7 @@ impl AgentsPanel {
             };
 
             cx.update(|cx| {
-                if connection.supports_load_session(cx)
+                if connection.supports_session_history(cx)
                     && let Some(session_list) = connection.session_list(cx)
                 {
                     history_handle.update(cx, |history, cx| {