From f1a8b053916dd7cbb911192ecf14b3027cf026f0 Mon Sep 17 00:00:00 2001 From: Mikayla Maki Date: Fri, 16 Jan 2026 02:17:08 -0800 Subject: [PATCH] agent: End turn at message boundary when queue has pending messages (#46980) This feature cost $12 Release Notes: - Changed the behavior of queued messages to gracefully wait for the current activity to complete. Co-authored-by: Claude Opus 4.5 --- crates/agent/src/tests/mod.rs | 92 ++++++++++++++++++++++ crates/agent/src/thread.rs | 46 +++++++++++ crates/agent_ui/src/acp/thread_view.rs | 103 +++++++++++++++---------- 3 files changed, 202 insertions(+), 39 deletions(-) diff --git a/crates/agent/src/tests/mod.rs b/crates/agent/src/tests/mod.rs index 34e53c82b847ccd3e1b2a2b66e45f4dad68d5d2e..e31953fc15170ae1fdc4651448058be192c83c60 100644 --- a/crates/agent/src/tests/mod.rs +++ b/crates/agent/src/tests/mod.rs @@ -5562,3 +5562,95 @@ async fn test_fetch_tool_allow_rule_skips_confirmation(cx: &mut TestAppContext) "expected no authorization request for allowed docs.rs URL" ); } + +#[gpui::test] +async fn test_queued_message_ends_turn_at_boundary(cx: &mut TestAppContext) { + init_test(cx); + always_allow_tools(cx); + + let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await; + let fake_model = model.as_fake(); + + // Add a tool so we can simulate tool calls + thread.update(cx, |thread, _cx| { + thread.add_tool(EchoTool); + }); + + // Start a turn by sending a message + let mut events = thread + .update(cx, |thread, cx| { + thread.send(UserMessageId::new(), ["Use the echo tool"], cx) + }) + .unwrap(); + cx.run_until_parked(); + + // Simulate the model making a tool call + fake_model.send_last_completion_stream_event(LanguageModelCompletionEvent::ToolUse( + LanguageModelToolUse { + id: "tool_1".into(), + name: "echo".into(), + raw_input: r#"{"text": "hello"}"#.into(), + input: json!({"text": "hello"}), + is_input_complete: true, + thought_signature: None, + }, + )); + fake_model + .send_last_completion_stream_event(LanguageModelCompletionEvent::Stop(StopReason::ToolUse)); + + // Queue a message before ending the stream + thread.update(cx, |thread, _cx| { + thread.queue_message( + vec![acp::ContentBlock::Text(acp::TextContent::new( + "This is my queued message".to_string(), + ))], + vec![], + ); + }); + + // Now end the stream - tool will run, and the boundary check should see the queue + fake_model.end_last_completion_stream(); + + // Collect all events until the turn stops + let all_events = collect_events_until_stop(&mut events, cx).await; + + // Verify we received the tool call event + let tool_call_ids: Vec<_> = all_events + .iter() + .filter_map(|e| match e { + Ok(ThreadEvent::ToolCall(tc)) => Some(tc.tool_call_id.to_string()), + _ => None, + }) + .collect(); + assert_eq!( + tool_call_ids, + vec!["tool_1"], + "Should have received a tool call event for our echo tool" + ); + + // The turn should have stopped with EndTurn + let stop_reasons = stop_events(all_events); + assert_eq!( + stop_reasons, + vec![acp::StopReason::EndTurn], + "Turn should have ended after tool completion due to queued message" + ); + + // Verify the queued message is still there + thread.update(cx, |thread, _cx| { + let queued = thread.queued_messages(); + assert_eq!(queued.len(), 1, "Should still have one queued message"); + assert!(matches!( + &queued[0].content[0], + acp::ContentBlock::Text(t) if t.text == "This is my queued message" + )); + }); + + // Thread should be idle now + thread.update(cx, |thread, _cx| { + assert!( + thread.is_turn_complete(), + "Thread should not be running after turn ends" + ); + }); +} diff --git a/crates/agent/src/thread.rs b/crates/agent/src/thread.rs index f24b529fbaabf6d64ec29f351e3ddec8a07ba523..444a06589676554bbb5d9e39f6aedec2ab153954 100644 --- a/crates/agent/src/thread.rs +++ b/crates/agent/src/thread.rs @@ -31,6 +31,7 @@ use futures::{ use gpui::{ App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity, }; +use language::Buffer; use language_model::{ LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent, LanguageModelId, LanguageModelImage, LanguageModelProviderId, LanguageModelRegistry, LanguageModelRequest, @@ -680,6 +681,11 @@ enum CompletionError { Other(#[from] anyhow::Error), } +pub struct QueuedMessage { + pub content: Vec, + pub tracked_buffers: Vec>, +} + pub struct Thread { id: acp::SessionId, prompt_id: PromptId, @@ -694,6 +700,7 @@ pub struct Thread { /// Survives across multiple requests as the model performs tool calls and /// we run tools, report their results. running_turn: Option, + queued_messages: Vec, pending_message: Option, tools: BTreeMap>, request_token_usage: HashMap, @@ -752,6 +759,7 @@ impl Thread { messages: Vec::new(), user_store: project.read(cx).user_store(), running_turn: None, + queued_messages: Vec::new(), pending_message: None, tools: BTreeMap::default(), request_token_usage: HashMap::default(), @@ -804,6 +812,7 @@ impl Thread { messages: Vec::new(), user_store: project.read(cx).user_store(), running_turn: None, + queued_messages: Vec::new(), pending_message: None, tools: parent_tools, request_token_usage: HashMap::default(), @@ -1000,6 +1009,7 @@ impl Thread { messages: db_thread.messages, user_store: project.read(cx).user_store(), running_turn: None, + queued_messages: Vec::new(), pending_message: None, tools: BTreeMap::default(), request_token_usage: db_thread.request_token_usage.clone(), @@ -1220,6 +1230,37 @@ impl Thread { }) } + pub fn queue_message( + &mut self, + content: Vec, + tracked_buffers: Vec>, + ) { + self.queued_messages.push(QueuedMessage { + content, + tracked_buffers, + }); + } + + pub fn queued_messages(&self) -> &[QueuedMessage] { + &self.queued_messages + } + + pub fn remove_queued_message(&mut self, index: usize) -> Option { + if index < self.queued_messages.len() { + Some(self.queued_messages.remove(index)) + } else { + None + } + } + + pub fn clear_queued_messages(&mut self) { + self.queued_messages.clear(); + } + + fn has_queued_messages(&self) -> bool { + !self.queued_messages.is_empty() + } + fn update_token_usage(&mut self, update: language_model::TokenUsage, cx: &mut Context) { let Some(last_user_message) = self.last_user_message() else { return; @@ -1634,6 +1675,11 @@ impl Thread { } else if end_turn { return Ok(()); } else { + let has_queued = this.update(cx, |this, _| this.has_queued_messages())?; + if has_queued { + log::debug!("Queued message found, ending turn at message boundary"); + return Ok(()); + } intent = CompletionIntent::ToolResults; attempt = 0; } diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 39923b58b59c475eaedddde28928fb2aef0d615e..ea6b08c6d0c17cb4e0784adaad62c50be746e1c9 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -355,7 +355,6 @@ pub struct AcpThreadView { _subscriptions: [Subscription; 5], show_codex_windows_warning: bool, in_flight_prompt: Option>, - message_queue: Vec, skip_queue_processing_count: usize, user_interrupted_generation: bool, can_fast_track_queue: bool, @@ -368,11 +367,6 @@ pub struct AcpThreadView { hovered_edited_file_buttons: Option, } -struct QueuedMessage { - content: Vec, - tracked_buffers: Vec>, -} - enum ThreadState { Loading(Entity), Ready { @@ -551,7 +545,6 @@ impl AcpThreadView { resume_thread_metadata: resume_thread, show_codex_windows_warning, in_flight_prompt: None, - message_queue: Vec::new(), skip_queue_processing_count: 0, user_interrupted_generation: false, can_fast_track_queue: false, @@ -577,7 +570,6 @@ impl AcpThreadView { ); self.available_commands.replace(vec![]); self.new_server_version_available.take(); - self.message_queue.clear(); self.recent_history_entries.clear(); self.turn_tokens = None; self.last_turn_tokens = None; @@ -1319,11 +1311,10 @@ impl AcpThreadView { // Fast-track: if editor is empty, we're generating, and user can fast-track, // send the first queued message immediately (interrupting current generation) - if is_editor_empty - && is_generating - && self.can_fast_track_queue - && !self.message_queue.is_empty() - { + let has_queued = self + .as_native_thread(cx) + .is_some_and(|t| !t.read(cx).queued_messages().is_empty()); + if is_editor_empty && is_generating && self.can_fast_track_queue && has_queued { self.can_fast_track_queue = false; self.send_queued_message_at_index(0, true, window, cx); return; @@ -1644,10 +1635,11 @@ impl AcpThreadView { } this.update_in(cx, |this, window, cx| { - this.message_queue.push(QueuedMessage { - content, - tracked_buffers, - }); + if let Some(thread) = this.as_native_thread(cx) { + thread.update(cx, |thread, _| { + thread.queue_message(content, tracked_buffers); + }); + } // Enable fast-track: user can press Enter again to send this queued message immediately this.can_fast_track_queue = true; message_editor.update(cx, |message_editor, cx| { @@ -1667,11 +1659,15 @@ impl AcpThreadView { window: &mut Window, cx: &mut Context, ) { - if index >= self.message_queue.len() { + let Some(native_thread) = self.as_native_thread(cx) else { return; - } + }; - let queued = self.message_queue.remove(index); + let Some(queued) = + native_thread.update(cx, |thread, _| thread.remove_queued_message(index)) + else { + return; + }; let content = queued.content; let tracked_buffers = queued.tracked_buffers; @@ -1914,8 +1910,13 @@ impl AcpThreadView { // Manual interruption: don't auto-process queue. // Reset the flag so future completions can process normally. self.user_interrupted_generation = false; - } else if !self.message_queue.is_empty() { - self.send_queued_message_at_index(0, false, window, cx); + } else { + let has_queued = self + .as_native_thread(cx) + .is_some_and(|t| !t.read(cx).queued_messages().is_empty()); + if has_queued { + self.send_queued_message_at_index(0, false, window, cx); + } } self.history.update(cx, |history, cx| history.refresh(cx)); @@ -5108,8 +5109,11 @@ impl AcpThreadView { let telemetry = ActionLogTelemetry::from(thread); let changed_buffers = action_log.read(cx).changed_buffers(cx); let plan = thread.plan(); + let queue_is_empty = self + .as_native_thread(cx) + .map_or(true, |t| t.read(cx).queued_messages().is_empty()); - if changed_buffers.is_empty() && plan.is_empty() && self.message_queue.is_empty() { + if changed_buffers.is_empty() && plan.is_empty() && queue_is_empty { return None; } @@ -5164,7 +5168,7 @@ impl AcpThreadView { )) }) }) - .when(!self.message_queue.is_empty(), |this| { + .when(!queue_is_empty, |this| { this.when(!plan.is_empty() || !changed_buffers.is_empty(), |this| { this.child(Divider::horizontal().color(DividerColor::Border)) }) @@ -5776,7 +5780,9 @@ impl AcpThreadView { _window: &mut Window, cx: &Context, ) -> impl IntoElement { - let queue_count = self.message_queue.len(); + let queue_count = self + .as_native_thread(cx) + .map_or(0, |t| t.read(cx).queued_messages().len()); let title: SharedString = if queue_count == 1 { "1 Queued Message".into() } else { @@ -5807,7 +5813,9 @@ impl AcpThreadView { .label_size(LabelSize::Small) .key_binding(KeyBinding::for_action(&ClearMessageQueue, cx)) .on_click(cx.listener(|this, _, _, cx| { - this.message_queue.clear(); + if let Some(thread) = this.as_native_thread(cx) { + thread.update(cx, |thread, _| thread.clear_queued_messages()); + } this.can_fast_track_queue = false; cx.notify(); })), @@ -5821,23 +5829,34 @@ impl AcpThreadView { ) -> impl IntoElement { let message_editor = self.message_editor.read(cx); let focus_handle = message_editor.focus_handle(cx); - let can_fast_track = self.can_fast_track_queue && !self.message_queue.is_empty(); + + let queued_messages: Vec<_> = self + .as_native_thread(cx) + .map(|t| { + t.read(cx) + .queued_messages() + .iter() + .map(|q| q.content.clone()) + .collect() + }) + .unwrap_or_default(); + + let queue_len = queued_messages.len(); + let can_fast_track = self.can_fast_track_queue && queue_len > 0; v_flex() .id("message_queue_list") .max_h_40() .overflow_y_scroll() .children( - self.message_queue - .iter() + queued_messages + .into_iter() .enumerate() - .map(|(index, queued)| { + .map(|(index, content)| { let is_next = index == 0; let icon_color = if is_next { Color::Accent } else { Color::Muted }; - let queue_len = self.message_queue.len(); - let preview: String = queued - .content + let preview: String = content .iter() .filter_map(|block| match block { acp::ContentBlock::Text(text) => { @@ -5927,10 +5946,12 @@ impl AcpThreadView { ) }) .on_click(cx.listener(move |this, _, _, cx| { - if index < this.message_queue.len() { - this.message_queue.remove(index); - cx.notify(); + if let Some(thread) = this.as_native_thread(cx) { + thread.update(cx, |thread, _| { + thread.remove_queued_message(index); + }); } + cx.notify(); })), ) .child( @@ -7896,13 +7917,17 @@ impl Render for AcpThreadView { this.send_queued_message_at_index(0, true, window, cx); })) .on_action(cx.listener(|this, _: &RemoveFirstQueuedMessage, _, cx| { - if !this.message_queue.is_empty() { - this.message_queue.remove(0); + if let Some(thread) = this.as_native_thread(cx) { + thread.update(cx, |thread, _| { + thread.remove_queued_message(0); + }); cx.notify(); } })) .on_action(cx.listener(|this, _: &ClearMessageQueue, _, cx| { - this.message_queue.clear(); + if let Some(thread) = this.as_native_thread(cx) { + thread.update(cx, |thread, _| thread.clear_queued_messages()); + } this.can_fast_track_queue = false; cx.notify(); }))