agent: End turn at message boundary when queue has pending messages (#46980)

Mikayla Maki and Claude Opus 4.5 created

This feature cost $12

Release Notes:

- Changed the behavior of queued messages to gracefully wait for the
current activity to complete.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

Change summary

crates/agent/src/tests/mod.rs          |  92 +++++++++++++++++++++++++
crates/agent/src/thread.rs             |  46 ++++++++++++
crates/agent_ui/src/acp/thread_view.rs | 103 +++++++++++++++++----------
3 files changed, 202 insertions(+), 39 deletions(-)

Detailed changes

crates/agent/src/tests/mod.rs 🔗

@@ -5562,3 +5562,95 @@ async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext)
         "expected no authorization request for allowed docs.rs URL"
     );
 }
+
+#[gpui::test]
+async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) {
+    init_test(cx);
+    always_allow_tools(cx);
+
+    let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
+    let fake_model = model.as_fake();
+
+    // Add a tool so we can simulate tool calls
+    thread.update(cx, |thread, _cx| {
+        thread.add_tool(EchoTool);
+    });
+
+    // Start a turn by sending a message
+    let mut events = thread
+        .update(cx, |thread, cx| {
+            thread.send(UserMessageId::new(), ["Use the echo tool"], cx)
+        })
+        .unwrap();
+    cx.run_until_parked();
+
+    // Simulate the model making a tool call
+    fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse(
+        LanguageModelToolUse {
+            id: "tool_1".into(),
+            name: "echo".into(),
+            raw_input: r#"{"text": "hello"}"#.into(),
+            input: json!({"text": "hello"}),
+            is_input_complete: true,
+            thought_signature: None,
+        },
+    ));
+    fake_model
+        .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse));
+
+    // Queue a message before ending the stream
+    thread.update(cx, |thread, _cx| {
+        thread.queue_message(
+            vec![acp::ContentBlock::Text(acp::TextContent::new(
+                "This is my queued message".to_string(),
+            ))],
+            vec![],
+        );
+    });
+
+    // Now end the stream - tool will run, and the boundary check should see the queue
+    fake_model.end_last_completion_stream();
+
+    // Collect all events until the turn stops
+    let all_events = collect_events_until_stop(&mut events, cx).await;
+
+    // Verify we received the tool call event
+    let tool_call_ids: Vec<_> = all_events
+        .iter()
+        .filter_map(|e| match e {
+            Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()),
+            _ => None,
+        })
+        .collect();
+    assert_eq!(
+        tool_call_ids,
+        vec!["tool_1"],
+        "Should have received a tool call event for our echo tool"
+    );
+
+    // The turn should have stopped with EndTurn
+    let stop_reasons = stop_events(all_events);
+    assert_eq!(
+        stop_reasons,
+        vec![acp::StopReason::EndTurn],
+        "Turn should have ended after tool completion due to queued message"
+    );
+
+    // Verify the queued message is still there
+    thread.update(cx, |thread, _cx| {
+        let queued = thread.queued_messages();
+        assert_eq!(queued.len(), 1, "Should still have one queued message");
+        assert!(matches!(
+            &queued[0].content[0],
+            acp::ContentBlock::Text(t) if t.text == "This is my queued message"
+        ));
+    });
+
+    // Thread should be idle now
+    thread.update(cx, |thread, _cx| {
+        assert!(
+            thread.is_turn_complete(),
+            "Thread should not be running after turn ends"
+        );
+    });
+}

crates/agent/src/thread.rs 🔗

@@ -31,6 +31,7 @@ use futures::{
 use gpui::{
     App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
 };
+use language::Buffer;
 use language_model::{
     LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId,
     LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest,
@@ -680,6 +681,11 @@ enum CompletionError {
     Other(#[from] anyhow::Error),
 }
 
+pub struct QueuedMessage {
+    pub content: Vec<acp::ContentBlock>,
+    pub tracked_buffers: Vec<Entity<Buffer>>,
+}
+
 pub struct Thread {
     id: acp::SessionId,
     prompt_id: PromptId,
@@ -694,6 +700,7 @@ pub struct Thread {
     /// Survives across multiple requests as the model performs tool calls and
     /// we run tools, report their results.
     running_turn: Option<RunningTurn>,
+    queued_messages: Vec<QueuedMessage>,
     pending_message: Option<AgentMessage>,
     tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
     request_token_usage: HashMap<UserMessageId, language_model::TokenUsage>,
@@ -752,6 +759,7 @@ impl Thread {
             messages: Vec::new(),
             user_store: project.read(cx).user_store(),
             running_turn: None,
+            queued_messages: Vec::new(),
             pending_message: None,
             tools: BTreeMap::default(),
             request_token_usage: HashMap::default(),
@@ -804,6 +812,7 @@ impl Thread {
             messages: Vec::new(),
             user_store: project.read(cx).user_store(),
             running_turn: None,
+            queued_messages: Vec::new(),
             pending_message: None,
             tools: parent_tools,
             request_token_usage: HashMap::default(),
@@ -1000,6 +1009,7 @@ impl Thread {
             messages: db_thread.messages,
             user_store: project.read(cx).user_store(),
             running_turn: None,
+            queued_messages: Vec::new(),
             pending_message: None,
             tools: BTreeMap::default(),
             request_token_usage: db_thread.request_token_usage.clone(),
@@ -1220,6 +1230,37 @@ impl Thread {
         })
     }
 
+    pub fn queue_message(
+        &mut self,
+        content: Vec<acp::ContentBlock>,
+        tracked_buffers: Vec<Entity<Buffer>>,
+    ) {
+        self.queued_messages.push(QueuedMessage {
+            content,
+            tracked_buffers,
+        });
+    }
+
+    pub fn queued_messages(&self) -> &[QueuedMessage] {
+        &self.queued_messages
+    }
+
+    pub fn remove_queued_message(&mut self, index: usize) -> Option<QueuedMessage> {
+        if index < self.queued_messages.len() {
+            Some(self.queued_messages.remove(index))
+        } else {
+            None
+        }
+    }
+
+    pub fn clear_queued_messages(&mut self) {
+        self.queued_messages.clear();
+    }
+
+    fn has_queued_messages(&self) -> bool {
+        !self.queued_messages.is_empty()
+    }
+
     fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context<Self>) {
         let Some(last_user_message) = self.last_user_message() else {
             return;
@@ -1634,6 +1675,11 @@ impl Thread {
             } else if end_turn {
                 return Ok(());
             } else {
+                let has_queued = this.update(cx, |this, _| this.has_queued_messages())?;
+                if has_queued {
+                    log::debug!("Queued message found, ending turn at message boundary");
+                    return Ok(());
+                }
                 intent = CompletionIntent::ToolResults;
                 attempt = 0;
             }

crates/agent_ui/src/acp/thread_view.rs 🔗

@@ -355,7 +355,6 @@ pub struct AcpThreadView {
     _subscriptions: [Subscription; 5],
     show_codex_windows_warning: bool,
     in_flight_prompt: Option<Vec<acp::ContentBlock>>,
-    message_queue: Vec<QueuedMessage>,
     skip_queue_processing_count: usize,
     user_interrupted_generation: bool,
     can_fast_track_queue: bool,
@@ -368,11 +367,6 @@ pub struct AcpThreadView {
     hovered_edited_file_buttons: Option<usize>,
 }
 
-struct QueuedMessage {
-    content: Vec<acp::ContentBlock>,
-    tracked_buffers: Vec<Entity<Buffer>>,
-}
-
 enum ThreadState {
     Loading(Entity<LoadingView>),
     Ready {
@@ -551,7 +545,6 @@ impl AcpThreadView {
             resume_thread_metadata: resume_thread,
             show_codex_windows_warning,
             in_flight_prompt: None,
-            message_queue: Vec::new(),
             skip_queue_processing_count: 0,
             user_interrupted_generation: false,
             can_fast_track_queue: false,
@@ -577,7 +570,6 @@ impl AcpThreadView {
         );
         self.available_commands.replace(vec![]);
         self.new_server_version_available.take();
-        self.message_queue.clear();
         self.recent_history_entries.clear();
         self.turn_tokens = None;
         self.last_turn_tokens = None;
@@ -1319,11 +1311,10 @@ impl AcpThreadView {
 
         // Fast-track: if editor is empty, we're generating, and user can fast-track,
         // send the first queued message immediately (interrupting current generation)
-        if is_editor_empty
-            && is_generating
-            && self.can_fast_track_queue
-            && !self.message_queue.is_empty()
-        {
+        let has_queued = self
+            .as_native_thread(cx)
+            .is_some_and(|t| !t.read(cx).queued_messages().is_empty());
+        if is_editor_empty && is_generating && self.can_fast_track_queue && has_queued {
             self.can_fast_track_queue = false;
             self.send_queued_message_at_index(0, true, window, cx);
             return;
@@ -1644,10 +1635,11 @@ impl AcpThreadView {
             }
 
             this.update_in(cx, |this, window, cx| {
-                this.message_queue.push(QueuedMessage {
-                    content,
-                    tracked_buffers,
-                });
+                if let Some(thread) = this.as_native_thread(cx) {
+                    thread.update(cx, |thread, _| {
+                        thread.queue_message(content, tracked_buffers);
+                    });
+                }
                 // Enable fast-track: user can press Enter again to send this queued message immediately
                 this.can_fast_track_queue = true;
                 message_editor.update(cx, |message_editor, cx| {
@@ -1667,11 +1659,15 @@ impl AcpThreadView {
         window: &mut Window,
         cx: &mut Context<Self>,
     ) {
-        if index >= self.message_queue.len() {
+        let Some(native_thread) = self.as_native_thread(cx) else {
             return;
-        }
+        };
 
-        let queued = self.message_queue.remove(index);
+        let Some(queued) =
+            native_thread.update(cx, |thread, _| thread.remove_queued_message(index))
+        else {
+            return;
+        };
         let content = queued.content;
         let tracked_buffers = queued.tracked_buffers;
 
@@ -1914,8 +1910,13 @@ impl AcpThreadView {
                     // Manual interruption: don't auto-process queue.
                     // Reset the flag so future completions can process normally.
                     self.user_interrupted_generation = false;
-                } else if !self.message_queue.is_empty() {
-                    self.send_queued_message_at_index(0, false, window, cx);
+                } else {
+                    let has_queued = self
+                        .as_native_thread(cx)
+                        .is_some_and(|t| !t.read(cx).queued_messages().is_empty());
+                    if has_queued {
+                        self.send_queued_message_at_index(0, false, window, cx);
+                    }
                 }
 
                 self.history.update(cx, |history, cx| history.refresh(cx));
@@ -5108,8 +5109,11 @@ impl AcpThreadView {
         let telemetry = ActionLogTelemetry::from(thread);
         let changed_buffers = action_log.read(cx).changed_buffers(cx);
         let plan = thread.plan();
+        let queue_is_empty = self
+            .as_native_thread(cx)
+            .map_or(true, |t| t.read(cx).queued_messages().is_empty());
 
-        if changed_buffers.is_empty() && plan.is_empty() && self.message_queue.is_empty() {
+        if changed_buffers.is_empty() && plan.is_empty() && queue_is_empty {
             return None;
         }
 
@@ -5164,7 +5168,7 @@ impl AcpThreadView {
                     ))
                 })
             })
-            .when(!self.message_queue.is_empty(), |this| {
+            .when(!queue_is_empty, |this| {
                 this.when(!plan.is_empty() || !changed_buffers.is_empty(), |this| {
                     this.child(Divider::horizontal().color(DividerColor::Border))
                 })
@@ -5776,7 +5780,9 @@ impl AcpThreadView {
         _window: &mut Window,
         cx: &Context<Self>,
     ) -> impl IntoElement {
-        let queue_count = self.message_queue.len();
+        let queue_count = self
+            .as_native_thread(cx)
+            .map_or(0, |t| t.read(cx).queued_messages().len());
         let title: SharedString = if queue_count == 1 {
             "1 Queued Message".into()
         } else {
@@ -5807,7 +5813,9 @@ impl AcpThreadView {
                     .label_size(LabelSize::Small)
                     .key_binding(KeyBinding::for_action(&ClearMessageQueue, cx))
                     .on_click(cx.listener(|this, _, _, cx| {
-                        this.message_queue.clear();
+                        if let Some(thread) = this.as_native_thread(cx) {
+                            thread.update(cx, |thread, _| thread.clear_queued_messages());
+                        }
                         this.can_fast_track_queue = false;
                         cx.notify();
                     })),
@@ -5821,23 +5829,34 @@ impl AcpThreadView {
     ) -> impl IntoElement {
         let message_editor = self.message_editor.read(cx);
         let focus_handle = message_editor.focus_handle(cx);
-        let can_fast_track = self.can_fast_track_queue && !self.message_queue.is_empty();
+
+        let queued_messages: Vec<_> = self
+            .as_native_thread(cx)
+            .map(|t| {
+                t.read(cx)
+                    .queued_messages()
+                    .iter()
+                    .map(|q| q.content.clone())
+                    .collect()
+            })
+            .unwrap_or_default();
+
+        let queue_len = queued_messages.len();
+        let can_fast_track = self.can_fast_track_queue && queue_len > 0;
 
         v_flex()
             .id("message_queue_list")
             .max_h_40()
             .overflow_y_scroll()
             .children(
-                self.message_queue
-                    .iter()
+                queued_messages
+                    .into_iter()
                     .enumerate()
-                    .map(|(index, queued)| {
+                    .map(|(index, content)| {
                         let is_next = index == 0;
                         let icon_color = if is_next { Color::Accent } else { Color::Muted };
-                        let queue_len = self.message_queue.len();
 
-                        let preview: String = queued
-                            .content
+                        let preview: String = content
                             .iter()
                             .filter_map(|block| match block {
                                 acp::ContentBlock::Text(text) => {
@@ -5927,10 +5946,12 @@ impl AcpThreadView {
                                                 )
                                             })
                                             .on_click(cx.listener(move |this, _, _, cx| {
-                                                if index < this.message_queue.len() {
-                                                    this.message_queue.remove(index);
-                                                    cx.notify();
+                                                if let Some(thread) = this.as_native_thread(cx) {
+                                                    thread.update(cx, |thread, _| {
+                                                        thread.remove_queued_message(index);
+                                                    });
                                                 }
+                                                cx.notify();
                                             })),
                                     )
                                     .child(
@@ -7896,13 +7917,17 @@ impl Render for AcpThreadView {
                 this.send_queued_message_at_index(0, true, window, cx);
             }))
             .on_action(cx.listener(|this, _: &RemoveFirstQueuedMessage, _, cx| {
-                if !this.message_queue.is_empty() {
-                    this.message_queue.remove(0);
+                if let Some(thread) = this.as_native_thread(cx) {
+                    thread.update(cx, |thread, _| {
+                        thread.remove_queued_message(0);
+                    });
                     cx.notify();
                 }
             }))
             .on_action(cx.listener(|this, _: &ClearMessageQueue, _, cx| {
-                this.message_queue.clear();
+                if let Some(thread) = this.as_native_thread(cx) {
+                    thread.update(cx, |thread, _| thread.clear_queued_messages());
+                }
                 this.can_fast_track_queue = false;
                 cx.notify();
             }))