agent_ui: Smarter logic for when we drop background threads (#52016)

Ben Brandt and Bennet Bo Fenner created

## Context

Previously we were very eagerly dropping threads from memory. However,
reloading threads has a cost:
- For our agent, we have to replay events, which is lossy
- It's slower for external agents
- Some agents don't support reloading

So we keep the most recent 5 idle threads (or all threads who can't be
reloaded) and clean up our list after every insertion.


## Self-Review Checklist

<!-- Check before requesting review: -->
- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [ ] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Release Notes:

- N/A

---------

Co-authored-by: Bennet Bo Fenner <bennetbo@gmx.de>

Change summary

crates/acp_thread/src/connection.rs      | 118 ++++++++--
crates/agent_ui/src/agent_panel.rs       | 262 +++++++++++++++++++++++--
crates/agent_ui/src/conversation_view.rs |  78 ++++--
crates/agent_ui/src/test_support.rs      |  35 +++
4 files changed, 407 insertions(+), 86 deletions(-)

Detailed changes

crates/acp_thread/src/connection.rs 🔗

@@ -567,11 +567,14 @@ mod test_support {
         )
     }
 
-    #[derive(Clone, Default)]
+    #[derive(Clone)]
     pub struct StubAgentConnection {
         sessions: Arc<Mutex<HashMap<acp::SessionId, Session>>>,
         permission_requests: HashMap<acp::ToolCallId, PermissionOptions>,
         next_prompt_updates: Arc<Mutex<Vec<acp::SessionUpdate>>>,
+        supports_load_session: bool,
+        agent_id: AgentId,
+        telemetry_id: SharedString,
     }
 
     struct Session {
@@ -579,12 +582,21 @@ mod test_support {
         response_tx: Option<oneshot::Sender<acp::StopReason>>,
     }
 
+    impl Default for StubAgentConnection {
+        fn default() -> Self {
+            Self::new()
+        }
+    }
+
     impl StubAgentConnection {
         pub fn new() -> Self {
             Self {
                 next_prompt_updates: Default::default(),
                 permission_requests: HashMap::default(),
                 sessions: Arc::default(),
+                supports_load_session: false,
+                agent_id: AgentId::new("stub"),
+                telemetry_id: "stub".into(),
             }
         }
 
@@ -600,6 +612,59 @@ mod test_support {
             self
         }
 
+        pub fn with_supports_load_session(mut self, supports_load_session: bool) -> Self {
+            self.supports_load_session = supports_load_session;
+            self
+        }
+
+        pub fn with_agent_id(mut self, agent_id: AgentId) -> Self {
+            self.agent_id = agent_id;
+            self
+        }
+
+        pub fn with_telemetry_id(mut self, telemetry_id: SharedString) -> Self {
+            self.telemetry_id = telemetry_id;
+            self
+        }
+
+        fn create_session(
+            self: Rc<Self>,
+            session_id: acp::SessionId,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            title: Option<SharedString>,
+            cx: &mut gpui::App,
+        ) -> Entity<AcpThread> {
+            let action_log = cx.new(|_| ActionLog::new(project.clone()));
+            let thread_title = title.unwrap_or_else(|| SharedString::new_static("Test"));
+            let thread = cx.new(|cx| {
+                AcpThread::new(
+                    None,
+                    thread_title,
+                    Some(work_dirs),
+                    self.clone(),
+                    project,
+                    action_log,
+                    session_id.clone(),
+                    watch::Receiver::constant(
+                        acp::PromptCapabilities::new()
+                            .image(true)
+                            .audio(true)
+                            .embedded_context(true),
+                    ),
+                    cx,
+                )
+            });
+            self.sessions.lock().insert(
+                session_id,
+                Session {
+                    thread: thread.downgrade(),
+                    response_tx: None,
+                },
+            );
+            thread
+        }
+
         pub fn send_update(
             &self,
             session_id: acp::SessionId,
@@ -637,11 +702,11 @@ mod test_support {
 
     impl AgentConnection for StubAgentConnection {
         fn agent_id(&self) -> AgentId {
-            AgentId::new("stub")
+            self.agent_id.clone()
         }
 
         fn telemetry_id(&self) -> SharedString {
-            "stub".into()
+            self.telemetry_id.clone()
         }
 
         fn auth_methods(&self) -> &[acp::AuthMethod] {
@@ -664,32 +729,27 @@ mod test_support {
             static NEXT_SESSION_ID: AtomicUsize = AtomicUsize::new(0);
             let session_id =
                 acp::SessionId::new(NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst).to_string());
-            let action_log = cx.new(|_| ActionLog::new(project.clone()));
-            let thread = cx.new(|cx| {
-                AcpThread::new(
-                    None,
-                    "Test",
-                    Some(work_dirs),
-                    self.clone(),
-                    project,
-                    action_log,
-                    session_id.clone(),
-                    watch::Receiver::constant(
-                        acp::PromptCapabilities::new()
-                            .image(true)
-                            .audio(true)
-                            .embedded_context(true),
-                    ),
-                    cx,
-                )
-            });
-            self.sessions.lock().insert(
-                session_id,
-                Session {
-                    thread: thread.downgrade(),
-                    response_tx: None,
-                },
-            );
+            let thread = self.create_session(session_id, project, work_dirs, None, cx);
+            Task::ready(Ok(thread))
+        }
+
+        fn supports_load_session(&self) -> bool {
+            self.supports_load_session
+        }
+
+        fn load_session(
+            self: Rc<Self>,
+            session_id: acp::SessionId,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            title: Option<SharedString>,
+            cx: &mut App,
+        ) -> Task<Result<Entity<AcpThread>>> {
+            if !self.supports_load_session {
+                return Task::ready(Err(anyhow::Error::msg("Loading sessions is not supported")));
+            }
+
+            let thread = self.create_session(session_id, project, work_dirs, title, cx);
             Task::ready(Ok(thread))
         }
 

crates/agent_ui/src/agent_panel.rs 🔗

@@ -1978,13 +1978,13 @@ impl AgentPanel {
         let mut views = Vec::new();
 
         if let Some(server_view) = self.active_conversation_view() {
-            if let Some(thread_view) = server_view.read(cx).parent_thread(cx) {
+            if let Some(thread_view) = server_view.read(cx).root_thread(cx) {
                 views.push(thread_view);
             }
         }
 
         for server_view in self.background_threads.values() {
-            if let Some(thread_view) = server_view.read(cx).parent_thread(cx) {
+            if let Some(thread_view) = server_view.read(cx).root_thread(cx) {
                 views.push(thread_view);
             }
         }
@@ -1997,22 +1997,46 @@ impl AgentPanel {
             return;
         };
 
-        let Some(thread_view) = conversation_view.read(cx).parent_thread(cx) else {
+        let Some(thread_view) = conversation_view.read(cx).root_thread(cx) else {
             return;
         };
 
-        let thread = &thread_view.read(cx).thread;
-        let (status, session_id) = {
-            let thread = thread.read(cx);
-            (thread.status(), thread.session_id().clone())
-        };
+        self.background_threads
+            .insert(thread_view.read(cx).id.clone(), conversation_view);
+        self.cleanup_background_threads(cx);
+    }
 
-        if status != ThreadStatus::Generating {
-            return;
-        }
+    /// We keep threads that are:
+    /// - Still running
+    /// - Do not support reloading the full session
+    /// - Have had the most recent events (up to 5 idle threads)
+    fn cleanup_background_threads(&mut self, cx: &App) {
+        let mut potential_removals = self
+            .background_threads
+            .iter()
+            .filter(|(_id, view)| {
+                let Some(thread_view) = view.read(cx).root_thread(cx) else {
+                    return true;
+                };
+                let thread = thread_view.read(cx).thread.read(cx);
+                thread.connection().supports_load_session() && thread.status() == ThreadStatus::Idle
+            })
+            .collect::<Vec<_>>();
 
-        self.background_threads
-            .insert(session_id, conversation_view);
+        const MAX_IDLE_BACKGROUND_THREADS: usize = 5;
+
+        potential_removals.sort_unstable_by_key(|(_, view)| view.read(cx).updated_at(cx));
+        let n = potential_removals
+            .len()
+            .saturating_sub(MAX_IDLE_BACKGROUND_THREADS);
+        let to_remove = potential_removals
+            .into_iter()
+            .map(|(id, _)| id.clone())
+            .take(n)
+            .collect::<Vec<_>>();
+        for id in to_remove {
+            self.background_threads.remove(&id);
+        }
     }
 
     pub(crate) fn active_native_agent_thread(&self, cx: &App) -> Option<Entity<agent::Thread>> {
@@ -3186,12 +3210,12 @@ impl AgentPanel {
             ActiveView::AgentThread { conversation_view } => {
                 let server_view_ref = conversation_view.read(cx);
                 let is_generating_title = server_view_ref.as_native_thread(cx).is_some()
-                    && server_view_ref.parent_thread(cx).map_or(false, |tv| {
+                    && server_view_ref.root_thread(cx).map_or(false, |tv| {
                         tv.read(cx).thread.read(cx).has_provisional_title()
                     });
 
                 if let Some(title_editor) = server_view_ref
-                    .parent_thread(cx)
+                    .root_thread(cx)
                     .map(|r| r.read(cx).title_editor.clone())
                 {
                     if is_generating_title {
@@ -4943,7 +4967,10 @@ impl AgentPanel {
 mod tests {
     use super::*;
     use crate::conversation_view::tests::{StubAgentServer, init_test};
-    use crate::test_support::{active_session_id, open_thread_with_connection, send_message};
+    use crate::test_support::{
+        active_session_id, open_thread_with_connection, open_thread_with_custom_connection,
+        send_message,
+    };
     use acp_thread::{StubAgentConnection, ThreadStatus};
     use agent_servers::CODEX_ID;
     use assistant_text_thread::TextThreadStore;
@@ -4952,6 +4979,7 @@ mod tests {
     use gpui::{TestAppContext, VisualTestContext};
     use project::Project;
     use serde_json::json;
+    use std::time::Instant;
     use workspace::MultiWorkspace;
 
     #[gpui::test]
@@ -5419,6 +5447,41 @@ mod tests {
         assert!(uri.contains("utils.rs"), "URI should encode the file path");
     }
 
+    fn open_generating_thread_with_loadable_connection(
+        panel: &Entity<AgentPanel>,
+        connection: &StubAgentConnection,
+        cx: &mut VisualTestContext,
+    ) -> acp::SessionId {
+        open_thread_with_custom_connection(panel, connection.clone(), cx);
+        let session_id = active_session_id(panel, cx);
+        send_message(panel, cx);
+        cx.update(|_, cx| {
+            connection.send_update(
+                session_id.clone(),
+                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new("done".into())),
+                cx,
+            );
+        });
+        cx.run_until_parked();
+        session_id
+    }
+
+    fn open_idle_thread_with_non_loadable_connection(
+        panel: &Entity<AgentPanel>,
+        connection: &StubAgentConnection,
+        cx: &mut VisualTestContext,
+    ) -> acp::SessionId {
+        open_thread_with_custom_connection(panel, connection.clone(), cx);
+        let session_id = active_session_id(panel, cx);
+
+        connection.set_next_prompt_updates(vec![acp::SessionUpdate::AgentMessageChunk(
+            acp::ContentChunk::new("done".into()),
+        )]);
+        send_message(panel, cx);
+
+        session_id
+    }
+
     async fn setup_panel(cx: &mut TestAppContext) -> (Entity<AgentPanel>, VisualTestContext) {
         init_test(cx);
         cx.update(|cx| {
@@ -5492,7 +5555,7 @@ mod tests {
     }
 
     #[gpui::test]
-    async fn test_idle_thread_dropped_when_navigating_away(cx: &mut TestAppContext) {
+    async fn test_idle_non_loadable_thread_retained_when_navigating_away(cx: &mut TestAppContext) {
         let (panel, mut cx) = setup_panel(cx).await;
 
         let connection_a = StubAgentConnection::new();
@@ -5505,6 +5568,7 @@ mod tests {
         let weak_view_a = panel.read_with(&cx, |panel, _cx| {
             panel.active_conversation().unwrap().downgrade()
         });
+        let session_id_a = active_session_id(&panel, &cx);
 
         // Thread A should be idle (auto-completed via set_next_prompt_updates).
         panel.read_with(&cx, |panel, cx| {
@@ -5512,21 +5576,25 @@ mod tests {
             assert_eq!(thread.read(cx).status(), ThreadStatus::Idle);
         });
 
-        // Open a new thread B — thread A should NOT be retained.
+        // Open a new thread B — thread A should be retained because it is not loadable.
         let connection_b = StubAgentConnection::new();
         open_thread_with_connection(&panel, connection_b, &mut cx);
 
         panel.read_with(&cx, |panel, _cx| {
+            assert_eq!(
+                panel.background_threads.len(),
+                1,
+                "Idle non-loadable thread A should be retained in background_views"
+            );
             assert!(
-                panel.background_threads.is_empty(),
-                "Idle thread A should not be retained in background_views"
+                panel.background_threads.contains_key(&session_id_a),
+                "Background view should be keyed by thread A's session ID"
             );
         });
 
-        // Verify the old ConnectionView entity was dropped (no strong references remain).
         assert!(
-            weak_view_a.upgrade().is_none(),
-            "Idle ConnectionView should have been dropped"
+            weak_view_a.upgrade().is_some(),
+            "Idle non-loadable ConnectionView should still be retained"
         );
     }
 
@@ -5587,8 +5655,152 @@ mod tests {
                 "Promoted thread A should no longer be in background_views"
             );
             assert!(
-                !panel.background_threads.contains_key(&session_id_b),
-                "Thread B (idle) should not have been retained in background_views"
+                panel.background_threads.contains_key(&session_id_b),
+                "Thread B (idle, non-loadable) should remain retained in background_views"
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_cleanup_background_threads_keeps_five_most_recent_idle_loadable_threads(
+        cx: &mut TestAppContext,
+    ) {
+        let (panel, mut cx) = setup_panel(cx).await;
+        let connection = StubAgentConnection::new()
+            .with_supports_load_session(true)
+            .with_agent_id("loadable-stub".into())
+            .with_telemetry_id("loadable-stub".into());
+        let mut session_ids = Vec::new();
+
+        for _ in 0..7 {
+            session_ids.push(open_generating_thread_with_loadable_connection(
+                &panel,
+                &connection,
+                &mut cx,
+            ));
+        }
+
+        let base_time = Instant::now();
+
+        for session_id in session_ids.iter().take(6) {
+            connection.end_turn(session_id.clone(), acp::StopReason::EndTurn);
+        }
+        cx.run_until_parked();
+
+        panel.update(&mut cx, |panel, cx| {
+            for (index, session_id) in session_ids.iter().take(6).enumerate() {
+                let conversation_view = panel
+                    .background_threads
+                    .get(session_id)
+                    .expect("background thread should exist")
+                    .clone();
+                conversation_view.update(cx, |view, cx| {
+                    view.set_updated_at(base_time + Duration::from_secs(index as u64), cx);
+                });
+            }
+            panel.cleanup_background_threads(cx);
+        });
+
+        panel.read_with(&cx, |panel, _cx| {
+            assert_eq!(
+                panel.background_threads.len(),
+                5,
+                "cleanup should keep at most five idle loadable background threads"
+            );
+            assert!(
+                !panel.background_threads.contains_key(&session_ids[0]),
+                "oldest idle loadable background thread should be removed"
+            );
+            for session_id in &session_ids[1..6] {
+                assert!(
+                    panel.background_threads.contains_key(session_id),
+                    "more recent idle loadable background threads should be retained"
+                );
+            }
+            assert!(
+                !panel.background_threads.contains_key(&session_ids[6]),
+                "the active thread should not also be stored as a background thread"
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_cleanup_background_threads_preserves_idle_non_loadable_threads(
+        cx: &mut TestAppContext,
+    ) {
+        let (panel, mut cx) = setup_panel(cx).await;
+
+        let non_loadable_connection = StubAgentConnection::new();
+        let non_loadable_session_id = open_idle_thread_with_non_loadable_connection(
+            &panel,
+            &non_loadable_connection,
+            &mut cx,
+        );
+
+        let loadable_connection = StubAgentConnection::new()
+            .with_supports_load_session(true)
+            .with_agent_id("loadable-stub".into())
+            .with_telemetry_id("loadable-stub".into());
+        let mut loadable_session_ids = Vec::new();
+
+        for _ in 0..7 {
+            loadable_session_ids.push(open_generating_thread_with_loadable_connection(
+                &panel,
+                &loadable_connection,
+                &mut cx,
+            ));
+        }
+
+        let base_time = Instant::now();
+
+        for session_id in loadable_session_ids.iter().take(6) {
+            loadable_connection.end_turn(session_id.clone(), acp::StopReason::EndTurn);
+        }
+        cx.run_until_parked();
+
+        panel.update(&mut cx, |panel, cx| {
+            for (index, session_id) in loadable_session_ids.iter().take(6).enumerate() {
+                let conversation_view = panel
+                    .background_threads
+                    .get(session_id)
+                    .expect("background thread should exist")
+                    .clone();
+                conversation_view.update(cx, |view, cx| {
+                    view.set_updated_at(base_time + Duration::from_secs(index as u64), cx);
+                });
+            }
+            panel.cleanup_background_threads(cx);
+        });
+
+        panel.read_with(&cx, |panel, _cx| {
+            assert_eq!(
+                panel.background_threads.len(),
+                6,
+                "cleanup should keep the non-loadable idle thread in addition to five loadable ones"
+            );
+            assert!(
+                panel
+                    .background_threads
+                    .contains_key(&non_loadable_session_id),
+                "idle non-loadable background threads should not be cleanup candidates"
+            );
+            assert!(
+                !panel
+                    .background_threads
+                    .contains_key(&loadable_session_ids[0]),
+                "oldest idle loadable background thread should still be removed"
+            );
+            for session_id in &loadable_session_ids[1..6] {
+                assert!(
+                    panel.background_threads.contains_key(session_id),
+                    "more recent idle loadable background threads should be retained"
+                );
+            }
+            assert!(
+                !panel
+                    .background_threads
+                    .contains_key(&loadable_session_ids[6]),
+                "the active loadable thread should not also be stored as a background thread"
             );
         });
     }

crates/agent_ui/src/conversation_view.rs 🔗

@@ -167,41 +167,45 @@ pub(crate) struct Conversation {
     /// Tracks the selected granularity index for each tool call's permission dropdown.
     /// The index corresponds to the position in the allow_options list.
     selected_permission_granularity: HashMap<acp::SessionId, HashMap<acp::ToolCallId, usize>>,
+    updated_at: Option<Instant>,
 }
 
 impl Conversation {
     pub fn register_thread(&mut self, thread: Entity<AcpThread>, cx: &mut Context<Self>) {
         let session_id = thread.read(cx).session_id().clone();
-        let subscription = cx.subscribe(&thread, move |this, _thread, event, _cx| match event {
-            AcpThreadEvent::ToolAuthorizationRequested(id) => {
-                this.permission_requests
-                    .entry(session_id.clone())
-                    .or_default()
-                    .push(id.clone());
-            }
-            AcpThreadEvent::ToolAuthorizationReceived(id) => {
-                if let Some(tool_calls) = this.permission_requests.get_mut(&session_id) {
-                    tool_calls.retain(|tool_call_id| tool_call_id != id);
-                    if tool_calls.is_empty() {
-                        this.permission_requests.shift_remove(&session_id);
+        let subscription = cx.subscribe(&thread, move |this, _thread, event, _cx| {
+            this.updated_at = Some(Instant::now());
+            match event {
+                AcpThreadEvent::ToolAuthorizationRequested(id) => {
+                    this.permission_requests
+                        .entry(session_id.clone())
+                        .or_default()
+                        .push(id.clone());
+                }
+                AcpThreadEvent::ToolAuthorizationReceived(id) => {
+                    if let Some(tool_calls) = this.permission_requests.get_mut(&session_id) {
+                        tool_calls.retain(|tool_call_id| tool_call_id != id);
+                        if tool_calls.is_empty() {
+                            this.permission_requests.shift_remove(&session_id);
+                        }
                     }
                 }
+                AcpThreadEvent::NewEntry
+                | AcpThreadEvent::TitleUpdated
+                | AcpThreadEvent::TokenUsageUpdated
+                | AcpThreadEvent::EntryUpdated(_)
+                | AcpThreadEvent::EntriesRemoved(_)
+                | AcpThreadEvent::Retry(_)
+                | AcpThreadEvent::SubagentSpawned(_)
+                | AcpThreadEvent::Stopped(_)
+                | AcpThreadEvent::Error
+                | AcpThreadEvent::LoadError(_)
+                | AcpThreadEvent::PromptCapabilitiesUpdated
+                | AcpThreadEvent::Refusal
+                | AcpThreadEvent::AvailableCommandsUpdated(_)
+                | AcpThreadEvent::ModeUpdated(_)
+                | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
             }
-            AcpThreadEvent::NewEntry
-            | AcpThreadEvent::TitleUpdated
-            | AcpThreadEvent::TokenUsageUpdated
-            | AcpThreadEvent::EntryUpdated(_)
-            | AcpThreadEvent::EntriesRemoved(_)
-            | AcpThreadEvent::Retry(_)
-            | AcpThreadEvent::SubagentSpawned(_)
-            | AcpThreadEvent::Stopped(_)
-            | AcpThreadEvent::Error
-            | AcpThreadEvent::LoadError(_)
-            | AcpThreadEvent::PromptCapabilitiesUpdated
-            | AcpThreadEvent::Refusal
-            | AcpThreadEvent::AvailableCommandsUpdated(_)
-            | AcpThreadEvent::ModeUpdated(_)
-            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
         });
         self.subscriptions.push(subscription);
         self.threads
@@ -352,7 +356,7 @@ impl ConversationView {
             .pending_tool_call(id, cx)
     }
 
-    pub fn parent_thread(&self, cx: &App) -> Option<Entity<ThreadView>> {
+    pub fn root_thread(&self, cx: &App) -> Option<Entity<ThreadView>> {
         match &self.server_state {
             ServerState::Connected(connected) => {
                 let mut current = connected.active_view()?;
@@ -388,6 +392,11 @@ impl ConversationView {
         }
     }
 
+    pub fn updated_at(&self, cx: &App) -> Option<Instant> {
+        self.as_connected()
+            .and_then(|connected| connected.conversation.read(cx).updated_at)
+    }
+
     pub fn navigate_to_session(
         &mut self,
         session_id: acp::SessionId,
@@ -1152,7 +1161,7 @@ impl ConversationView {
     pub fn parent_id(&self, cx: &App) -> Option<acp::SessionId> {
         match &self.server_state {
             ServerState::Connected(_) => self
-                .parent_thread(cx)
+                .root_thread(cx)
                 .map(|thread| thread.read(cx).id.clone()),
             ServerState::Loading(loading) => loading.read(cx).session_id.clone(),
             ServerState::LoadError { session_id, .. } => session_id.clone(),
@@ -2595,6 +2604,17 @@ impl ConversationView {
             cx.notify();
         }
     }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn set_updated_at(&mut self, updated_at: Instant, cx: &mut Context<Self>) {
+        let Some(connected) = self.as_connected_mut() else {
+            return;
+        };
+
+        connected.conversation.update(cx, |conversation, _cx| {
+            conversation.updated_at = Some(updated_at);
+        });
+    }
 }
 
 impl Render for ConversationView {

crates/agent_ui/src/test_support.rs 🔗

@@ -13,11 +13,23 @@ use crate::agent_panel;
 
 pub struct StubAgentServer<C> {
     connection: C,
+    agent_id: AgentId,
 }
 
-impl<C> StubAgentServer<C> {
+impl<C> StubAgentServer<C>
+where
+    C: AgentConnection,
+{
     pub fn new(connection: C) -> Self {
-        Self { connection }
+        Self {
+            connection,
+            agent_id: "Test".into(),
+        }
+    }
+
+    pub fn with_connection_agent_id(mut self) -> Self {
+        self.agent_id = self.connection.agent_id();
+        self
     }
 }
 
@@ -40,7 +52,7 @@ where
     }
 
     fn agent_id(&self) -> AgentId {
-        "Test".into()
+        self.agent_id.clone()
     }
 
     fn connect(
@@ -83,6 +95,23 @@ pub fn open_thread_with_connection(
     cx.run_until_parked();
 }
 
+pub fn open_thread_with_custom_connection<C>(
+    panel: &Entity<AgentPanel>,
+    connection: C,
+    cx: &mut VisualTestContext,
+) where
+    C: 'static + AgentConnection + Send + Clone,
+{
+    panel.update_in(cx, |panel, window, cx| {
+        panel.open_external_thread_with_server(
+            Rc::new(StubAgentServer::new(connection).with_connection_agent_id()),
+            window,
+            cx,
+        );
+    });
+    cx.run_until_parked();
+}
+
 pub fn send_message(panel: &Entity<AgentPanel>, cx: &mut VisualTestContext) {
     let thread_view = panel.read_with(cx, |panel, cx| panel.active_thread_view(cx).unwrap());
     let message_editor = thread_view.read_with(cx, |view, _cx| view.message_editor.clone());