agent_ui: Preserve session resume state after load errors (#54411)

Ben Brandt and Bennet Bo Fenner created

Use stored thread metadata during reset when no root thread view exists,
so retries keep the original session id and title instead of starting a
new session.

Add a regression test for reconnecting after an initial custom agent
load failure.

Self-Review Checklist:

- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Closes #ISSUE

Release Notes:

- N/A or Added/Fixed/Improved ...

---------

Co-authored-by: Bennet Bo Fenner <bennetbo@gmx.de>

Change summary

crates/agent_ui/src/agent_panel.rs       | 187 ++++++++++++++++++++++++-
crates/agent_ui/src/conversation_view.rs | 187 +++++++++++++++++++++++++
2 files changed, 358 insertions(+), 16 deletions(-)

Detailed changes

crates/agent_ui/src/agent_panel.rs 🔗

@@ -723,18 +723,43 @@ impl AgentPanel {
         let selected_agent = self.selected_agent.clone();
 
         let is_draft_active = self.active_thread_is_draft(cx);
-        let last_active_thread = self.active_agent_thread(cx).map(|thread| {
-            let thread = thread.read(cx);
-
-            let title = thread.title();
-            let work_dirs = thread.work_dirs().cloned();
-            SerializedActiveThread {
-                session_id: (!is_draft_active).then(|| thread.session_id().0.to_string()),
-                agent_type: self.selected_agent.clone(),
-                title: title.map(|t| t.to_string()),
-                work_dirs: work_dirs.map(|dirs| dirs.serialize()),
-            }
-        });
+        let last_active_thread = self
+            .active_agent_thread(cx)
+            .map(|thread| {
+                let thread = thread.read(cx);
+
+                let title = thread.title();
+                let work_dirs = thread.work_dirs().cloned();
+                SerializedActiveThread {
+                    session_id: (!is_draft_active).then(|| thread.session_id().0.to_string()),
+                    agent_type: self.selected_agent.clone(),
+                    title: title.map(|t| t.to_string()),
+                    work_dirs: work_dirs.map(|dirs| dirs.serialize()),
+                }
+            })
+            .or_else(|| {
+                // The active view may be in `Loading` or `LoadError` — for
+                // example, while a restored thread is waiting for a custom
+                // agent to finish registering. Without this fallback, a
+                // stray `serialize()` triggered during that window would
+                // write `session_id=None` and wipe the restored session
+                if is_draft_active {
+                    return None;
+                }
+                let conversation_view = self.active_conversation_view()?;
+                let session_id = conversation_view.read(cx).root_session_id.clone()?;
+                let metadata = ThreadMetadataStore::try_global(cx)
+                    .and_then(|store| store.read(cx).entry_by_session(&session_id).cloned());
+                Some(SerializedActiveThread {
+                    session_id: Some(session_id.0.to_string()),
+                    agent_type: self.selected_agent.clone(),
+                    title: metadata
+                        .as_ref()
+                        .and_then(|m| m.title.as_ref())
+                        .map(|t| t.to_string()),
+                    work_dirs: metadata.map(|m| m.folder_paths().serialize()),
+                })
+            });
 
         let kvp = KeyValueStore::global(cx);
         let draft_thread_prompt = self.draft_thread.as_ref().and_then(|conversation| {
@@ -3755,6 +3780,35 @@ impl AgentPanel {
         self.set_base_view(thread.into(), true, window, cx);
     }
 
+    /// Opens a restored external thread with an arbitrary AgentServer and
+    /// a specific `resume_session_id` — as if we just restored from the KVP.
+    ///
+    /// Test-only helper. Not compiled into production builds.
+    pub fn open_restored_thread_with_server(
+        &mut self,
+        server: Rc<dyn AgentServer>,
+        resume_session_id: acp::SessionId,
+        window: &mut Window,
+        cx: &mut Context<Self>,
+    ) {
+        let ext_agent = Agent::Custom {
+            id: server.agent_id(),
+        };
+
+        let thread = self.create_agent_thread_with_server(
+            ext_agent,
+            Some(server),
+            Some(resume_session_id),
+            None,
+            None,
+            None,
+            "agent_panel",
+            window,
+            cx,
+        );
+        self.set_base_view(thread.into(), true, window, cx);
+    }
+
     /// Returns the currently active thread view, if any.
     ///
     /// This is a test-only accessor that exposes the private `active_thread_view()`
@@ -4148,6 +4202,115 @@ mod tests {
         });
     }
 
+    #[gpui::test]
+    async fn test_serialize_preserves_session_id_in_load_error(cx: &mut TestAppContext) {
+        use crate::conversation_view::tests::FlakyAgentServer;
+        use crate::thread_metadata_store::{ThreadId, ThreadMetadata};
+        use chrono::Utc;
+        use project::{AgentId as ProjectAgentId, WorktreePaths};
+
+        init_test(cx);
+        cx.update(|cx| {
+            agent::ThreadStore::init_global(cx);
+            language_model::LanguageModelRegistry::test(cx);
+        });
+
+        let fs = FakeFs::new(cx.executor());
+        let project = Project::test(fs, [], cx).await;
+
+        let multi_workspace =
+            cx.add_window(|window, cx| MultiWorkspace::test_new(project.clone(), window, cx));
+        let workspace = multi_workspace
+            .read_with(cx, |mw, _cx| mw.workspace().clone())
+            .unwrap();
+        workspace.update(cx, |workspace, _cx| {
+            workspace.set_random_database_id();
+        });
+        let workspace_id = workspace
+            .read_with(cx, |workspace, _cx| workspace.database_id())
+            .expect("workspace should have a database id");
+
+        let cx = &mut VisualTestContext::from_window(multi_workspace.into(), cx);
+
+        // Simulate a previous run that persisted metadata for this session.
+        let resume_session_id = acp::SessionId::new("persistent-session");
+        cx.update(|_window, cx| {
+            ThreadMetadataStore::global(cx).update(cx, |store, cx| {
+                store.save(
+                    ThreadMetadata {
+                        thread_id: ThreadId::new(),
+                        session_id: Some(resume_session_id.clone()),
+                        agent_id: ProjectAgentId::new("Flaky"),
+                        title: Some("Persistent chat".into()),
+                        updated_at: Utc::now(),
+                        created_at: Some(Utc::now()),
+                        interacted_at: None,
+                        worktree_paths: WorktreePaths::from_folder_paths(&PathList::default()),
+                        remote_connection: None,
+                        archived: false,
+                    },
+                    cx,
+                );
+            });
+        });
+
+        let panel = workspace.update_in(cx, |workspace, window, cx| {
+            cx.new(|cx| AgentPanel::new(workspace, None, window, cx))
+        });
+
+        // Open a restored thread using a flaky server so the initial connect
+        // fails and the view lands in LoadError — mirroring the cold-start
+        // race against a custom agent over SSH.
+        let (server, _fail) =
+            FlakyAgentServer::new(StubAgentConnection::new().with_supports_load_session(true));
+        panel.update_in(cx, |panel, window, cx| {
+            panel.open_restored_thread_with_server(
+                Rc::new(server),
+                resume_session_id.clone(),
+                window,
+                cx,
+            );
+        });
+        cx.run_until_parked();
+
+        // Sanity: the view couldn't connect, so no live AcpThread exists.
+        panel.read_with(cx, |panel, cx| {
+            assert!(
+                panel.active_agent_thread(cx).is_none(),
+                "active_agent_thread should be None while the flaky server is failing"
+            );
+            let conversation_view = panel
+                .active_conversation_view()
+                .expect("panel should still have an active ConversationView");
+            assert_eq!(
+                conversation_view.read(cx).root_session_id.as_ref(),
+                Some(&resume_session_id),
+                "ConversationView should still hold the restored session id"
+            );
+        });
+
+        // Serialize while in LoadError. Before the fix this wrote
+        // `session_id=None` to the KVP and permanently lost the session.
+        panel.update(cx, |panel, cx| panel.serialize(cx));
+        cx.run_until_parked();
+
+        let kvp = cx.update(|_window, cx| KeyValueStore::global(cx));
+        let serialized: Option<SerializedAgentPanel> = cx
+            .background_spawn(async move { read_serialized_panel(workspace_id, &kvp) })
+            .await;
+        let serialized_session_id = serialized
+            .as_ref()
+            .and_then(|p| p.last_active_thread.as_ref())
+            .and_then(|t| t.session_id.clone());
+        assert_eq!(
+            serialized_session_id,
+            Some(resume_session_id.0.to_string()),
+            "serialize() must preserve the restored session id even while the \
+             ConversationView is in LoadError; otherwise the bug survives a \
+             restart because the KVP has been wiped"
+        );
+    }
+
     /// Extracts the text from a Text content block, panicking if it's not Text.
     fn expect_text_block(block: &acp::ContentBlock) -> &str {
         match block {

crates/agent_ui/src/conversation_view.rs 🔗

@@ -84,7 +84,7 @@ use crate::entry_view_state::{EntryViewEvent, ViewEvent};
 use crate::message_editor::{MessageEditor, MessageEditorEvent};
 use crate::profile_selector::{ProfileProvider, ProfileSelector};
 
-use crate::thread_metadata_store::ThreadId;
+use crate::thread_metadata_store::{ThreadId, ThreadMetadataStore};
 use crate::ui::{AgentNotification, AgentNotificationEvent};
 use crate::{
     Agent, AgentDiffPane, AgentInitialContent, AgentPanel, AllowAlways, AllowOnce,
@@ -699,7 +699,7 @@ impl ConversationView {
     }
 
     fn reset(&mut self, window: &mut Window, cx: &mut Context<Self>) {
-        let (resume_session_id, cwd, title) = self
+        let (resume_session_id, work_dirs, title) = self
             .root_thread_view()
             .map(|thread_view| {
                 let tv = thread_view.read(cx);
@@ -710,14 +710,25 @@ impl ConversationView {
                     thread.title(),
                 )
             })
-            .unwrap_or((None, None, None));
+            .unwrap_or_else(|| {
+                let session_id = self.root_session_id.clone();
+                let (work_dirs, title) = session_id
+                    .as_ref()
+                    .and_then(|id| {
+                        let store = ThreadMetadataStore::try_global(cx)?;
+                        let entry = store.read(cx).entry_by_session(id)?;
+                        Some((Some(entry.folder_paths().clone()), entry.title.clone()))
+                    })
+                    .unwrap_or((None, None));
+                (session_id, work_dirs, title)
+            });
 
         let state = Self::initial_state(
             self.agent.clone(),
             self.connection_store.clone(),
             self.connection_key.clone(),
             resume_session_id,
-            cwd,
+            work_dirs,
             title,
             self.project.clone(),
             None,
@@ -3329,6 +3340,122 @@ pub(crate) mod tests {
         });
     }
 
+    #[gpui::test]
+    async fn test_reset_preserves_session_id_after_load_error(cx: &mut TestAppContext) {
+        use crate::thread_metadata_store::{ThreadId, ThreadMetadata};
+        use chrono::Utc;
+        use project::{AgentId as ProjectAgentId, WorktreePaths};
+        use std::sync::atomic::Ordering;
+
+        init_test(cx);
+
+        let fs = FakeFs::new(cx.executor());
+        let project = Project::test(fs, [], cx).await;
+        let (multi_workspace, cx) =
+            cx.add_window_view(|window, cx| MultiWorkspace::test_new(project.clone(), window, cx));
+        let workspace = multi_workspace.read_with(cx, |mw, _| mw.workspace().clone());
+
+        let thread_store = cx.update(|_window, cx| cx.new(|cx| ThreadStore::new(cx)));
+        let connection_store =
+            cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
+
+        // Simulate a previous run that persisted metadata for this session.
+        let resume_session_id = SessionId::new("persistent-session");
+        let stored_title: SharedString = "Persistent chat".into();
+        cx.update(|_window, cx| {
+            ThreadMetadataStore::global(cx).update(cx, |store, cx| {
+                store.save(
+                    ThreadMetadata {
+                        thread_id: ThreadId::new(),
+                        session_id: Some(resume_session_id.clone()),
+                        agent_id: ProjectAgentId::new("Flaky"),
+                        title: Some(stored_title.clone()),
+                        updated_at: Utc::now(),
+                        created_at: Some(Utc::now()),
+                        interacted_at: None,
+                        worktree_paths: WorktreePaths::from_folder_paths(&PathList::default()),
+                        remote_connection: None,
+                        archived: false,
+                    },
+                    cx,
+                );
+            });
+        });
+
+        let connection = StubAgentConnection::new().with_supports_load_session(true);
+        let (server, fail) = FlakyAgentServer::new(connection);
+
+        let conversation_view = cx.update(|window, cx| {
+            cx.new(|cx| {
+                ConversationView::new(
+                    Rc::new(server),
+                    connection_store,
+                    Agent::Custom { id: "Flaky".into() },
+                    Some(resume_session_id.clone()),
+                    None,
+                    None,
+                    None,
+                    None,
+                    workspace.downgrade(),
+                    project.clone(),
+                    Some(thread_store),
+                    None,
+                    "agent_panel",
+                    window,
+                    cx,
+                )
+            })
+        });
+        cx.run_until_parked();
+
+        // The first connect() fails, so we land in LoadError.
+        conversation_view.read_with(cx, |view, _cx| {
+            assert!(
+                matches!(view.server_state, ServerState::LoadError { .. }),
+                "expected LoadError after failed initial connect"
+            );
+            assert_eq!(
+                view.root_session_id.as_ref(),
+                Some(&resume_session_id),
+                "root_session_id should still hold the original id while in LoadError"
+            );
+        });
+
+        // Now let the agent come online and emit AgentServersUpdated. This is
+        // the moment the bug would have stomped on root_session_id.
+        fail.store(false, Ordering::SeqCst);
+        project.update(cx, |project, cx| {
+            project
+                .agent_server_store()
+                .update(cx, |_store, cx| cx.emit(project::AgentServersUpdated));
+        });
+        cx.run_until_parked();
+
+        // The retry should have resumed the ORIGINAL session, not created a
+        // brand-new one.
+        conversation_view.read_with(cx, |view, cx| {
+            let connected = view
+                .as_connected()
+                .expect("should be Connected after flaky server comes online");
+            let active_id = connected
+                .active_id
+                .as_ref()
+                .expect("Connected state should have an active_id");
+            assert_eq!(
+                active_id, &resume_session_id,
+                "reset() must resume the original session id, not call new_session()"
+            );
+            let active_thread = view
+                .active_thread()
+                .expect("should have an active thread view");
+            let thread_session = active_thread.read(cx).thread.read(cx).session_id().clone();
+            assert_eq!(
+                thread_session, resume_session_id,
+                "the live AcpThread should hold the resumed session id"
+            );
+        });
+    }
+
     #[gpui::test]
     async fn test_auth_required_on_initial_connect(cx: &mut TestAppContext) {
         init_test(cx);
@@ -4137,6 +4264,58 @@ pub(crate) mod tests {
         }
     }
 
+    /// Agent server whose `connect()` fails while `fail` is `true` and
+    /// returns the wrapped connection otherwise. Used to simulate the
+    /// race where an external agent isn't yet registered at startup.
+    pub(crate) struct FlakyAgentServer {
+        connection: StubAgentConnection,
+        fail: Arc<std::sync::atomic::AtomicBool>,
+    }
+
+    impl FlakyAgentServer {
+        pub(crate) fn new(
+            connection: StubAgentConnection,
+        ) -> (Self, Arc<std::sync::atomic::AtomicBool>) {
+            let fail = Arc::new(std::sync::atomic::AtomicBool::new(true));
+            (
+                Self {
+                    connection,
+                    fail: fail.clone(),
+                },
+                fail,
+            )
+        }
+    }
+
+    impl AgentServer for FlakyAgentServer {
+        fn logo(&self) -> ui::IconName {
+            ui::IconName::ZedAgent
+        }
+
+        fn agent_id(&self) -> AgentId {
+            "Flaky".into()
+        }
+
+        fn connect(
+            &self,
+            _delegate: AgentServerDelegate,
+            _project: Entity<Project>,
+            _cx: &mut App,
+        ) -> Task<gpui::Result<Rc<dyn AgentConnection>>> {
+            if self.fail.load(std::sync::atomic::Ordering::SeqCst) {
+                Task::ready(Err(anyhow!(
+                    "Custom agent server `Flaky` is not registered"
+                )))
+            } else {
+                Task::ready(Ok(Rc::new(self.connection.clone())))
+            }
+        }
+
+        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+            self
+        }
+    }
+
     fn build_test_thread(
         connection: Rc<dyn AgentConnection>,
         project: Entity<Project>,