@@ -67,9 +67,10 @@ use crate::profile_selector::{ProfileProvider, ProfileSelector};
use crate::ui::{AgentNotification, AgentNotificationEvent, BurnModeTooltip, UsageCallout};
use crate::{
- AgentDiffPane, AgentPanel, AllowAlways, AllowOnce, ContinueThread, ContinueWithBurnMode,
- CycleFavoriteModels, CycleModeSelector, ExpandMessageEditor, Follow, KeepAll, NewThread,
- OpenAgentDiff, OpenHistory, RejectAll, RejectOnce, ToggleBurnMode, ToggleProfileSelector,
+ AgentDiffPane, AgentPanel, AllowAlways, AllowOnce, ClearMessageQueue, ContinueThread,
+ ContinueWithBurnMode, CycleFavoriteModels, CycleModeSelector, ExpandMessageEditor, Follow,
+ KeepAll, NewThread, OpenAgentDiff, OpenHistory, QueueMessage, RejectAll, RejectOnce,
+ SendNextQueuedMessage, ToggleBurnMode, ToggleProfileSelector,
};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -288,6 +289,7 @@ pub struct AcpThreadView {
expanded_thinking_blocks: HashSet<(usize, usize)>,
edits_expanded: bool,
plan_expanded: bool,
+ queue_expanded: bool,
editor_expanded: bool,
should_be_following: bool,
editing_message: Option<usize>,
@@ -300,6 +302,14 @@ pub struct AcpThreadView {
_subscriptions: [Subscription; 5],
show_codex_windows_warning: bool,
in_flight_prompt: Option<Vec<acp::ContentBlock>>,
+ message_queue: Vec<QueuedMessage>,
+ skip_queue_processing_count: usize,
+ user_interrupted_generation: bool,
+}
+
+struct QueuedMessage {
+ content: Vec<acp::ContentBlock>,
+ tracked_buffers: Vec<Entity<Buffer>>,
}
enum ThreadState {
@@ -448,6 +458,7 @@ impl AcpThreadView {
editing_message: None,
edits_expanded: false,
plan_expanded: false,
+ queue_expanded: true,
prompt_capabilities,
available_commands,
editor_expanded: false,
@@ -462,6 +473,9 @@ impl AcpThreadView {
resume_thread_metadata: resume_thread,
show_codex_windows_warning,
in_flight_prompt: None,
+ message_queue: Vec::new(),
+ skip_queue_processing_count: 0,
+ user_interrupted_generation: false,
}
}
@@ -477,6 +491,7 @@ impl AcpThreadView {
);
self.available_commands.replace(vec![]);
self.new_server_version_available.take();
+ self.message_queue.clear();
cx.notify();
}
@@ -991,6 +1006,7 @@ impl AcpThreadView {
) {
match event {
MessageEditorEvent::Send => self.send(window, cx),
+ MessageEditorEvent::Queue => self.queue_message(window, cx),
MessageEditorEvent::Cancel => self.cancel_generation(cx),
MessageEditorEvent::Focus => {
self.cancel_editing(&Default::default(), window, cx);
@@ -1042,6 +1058,7 @@ impl AcpThreadView {
}
}
}
+ ViewEvent::MessageEditorEvent(_editor, MessageEditorEvent::Queue) => {}
ViewEvent::MessageEditorEvent(editor, MessageEditorEvent::Send) => {
self.regenerate(event.entry_index, editor.clone(), window, cx);
}
@@ -1141,6 +1158,9 @@ impl AcpThreadView {
return;
};
+ self.skip_queue_processing_count = 0;
+ self.user_interrupted_generation = true;
+
let cancelled = thread.update(cx, |thread, cx| thread.cancel(cx));
cx.spawn_in(window, async move |this, cx| {
@@ -1276,6 +1296,178 @@ impl AcpThreadView {
.detach();
}
+ fn queue_message(&mut self, window: &mut Window, cx: &mut Context<Self>) {
+ let is_idle = self
+ .thread()
+ .map(|t| t.read(cx).status() == acp_thread::ThreadStatus::Idle)
+ .unwrap_or(true);
+
+ if is_idle {
+ self.send_impl(self.message_editor.clone(), window, cx);
+ return;
+ }
+
+ let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
+ let thread = thread.read(cx);
+ AgentSettings::get_global(cx)
+ .profiles
+ .get(thread.profile())
+ .is_some_and(|profile| profile.tools.is_empty())
+ });
+
+ let contents = self.message_editor.update(cx, |message_editor, cx| {
+ message_editor.contents(full_mention_content, cx)
+ });
+
+ let message_editor = self.message_editor.clone();
+
+ cx.spawn_in(window, async move |this, cx| {
+ let (content, tracked_buffers) = contents.await?;
+
+ if content.is_empty() {
+ return Ok::<(), anyhow::Error>(());
+ }
+
+ this.update_in(cx, |this, window, cx| {
+ this.message_queue.push(QueuedMessage {
+ content,
+ tracked_buffers,
+ });
+ message_editor.update(cx, |message_editor, cx| {
+ message_editor.clear(window, cx);
+ });
+ cx.notify();
+ })?;
+ Ok(())
+ })
+ .detach_and_log_err(cx);
+ }
+
+ fn send_queued_message_at_index(
+ &mut self,
+ index: usize,
+ is_send_now: bool,
+ window: &mut Window,
+ cx: &mut Context<Self>,
+ ) {
+ if index >= self.message_queue.len() {
+ return;
+ }
+
+ let queued = self.message_queue.remove(index);
+ let content = queued.content;
+ let tracked_buffers = queued.tracked_buffers;
+
+ let Some(thread) = self.thread().cloned() else {
+ return;
+ };
+
+ // Only increment skip count for "Send Now" operations (out-of-order sends)
+ // Normal auto-processing from the Stopped handler doesn't need to skip
+ if is_send_now {
+ let is_generating = thread.read(cx).status() == acp_thread::ThreadStatus::Generating;
+ self.skip_queue_processing_count += if is_generating { 2 } else { 1 };
+ }
+
+ // Ensure we don't end up with multiple concurrent generations
+ let cancelled = thread.update(cx, |thread, cx| thread.cancel(cx));
+
+ let session_id = thread.read(cx).session_id().clone();
+ let agent_telemetry_id = thread.read(cx).connection().telemetry_id();
+ let thread = thread.downgrade();
+
+ let should_be_following = self.should_be_following;
+ let workspace = self.workspace.clone();
+
+ self.is_loading_contents = true;
+ let model_id = self.current_model_id(cx);
+ let mode_id = self.current_mode_id(cx);
+ let guard = cx.new(|_| ());
+
+ cx.observe_release(&guard, |this, _guard, cx| {
+ this.is_loading_contents = false;
+ cx.notify();
+ })
+ .detach();
+
+ let task = cx.spawn_in(window, async move |this, cx| {
+ cancelled.await;
+ this.update_in(cx, |this, window, cx| {
+ if should_be_following {
+ workspace
+ .update(cx, |workspace, cx| {
+ workspace.follow(CollaboratorId::Agent, window, cx);
+ })
+ .ok();
+ }
+
+ this.in_flight_prompt = Some(content.clone());
+ this.set_editor_is_expanded(false, cx);
+ this.scroll_to_bottom(cx);
+ })?;
+
+ let turn_start_time = Instant::now();
+ let send = thread.update(cx, |thread, cx| {
+ thread.action_log().update(cx, |action_log, cx| {
+ for buffer in tracked_buffers {
+ action_log.buffer_read(buffer, cx)
+ }
+ });
+ drop(guard);
+
+ telemetry::event!(
+ "Agent Message Sent",
+ agent = agent_telemetry_id,
+ session = session_id,
+ model = model_id,
+ mode = mode_id
+ );
+
+ thread.send(content, cx)
+ })?;
+
+ let res = send.await;
+ let turn_time_ms = turn_start_time.elapsed().as_millis();
+ let status = if res.is_ok() {
+ this.update(cx, |this, _| this.in_flight_prompt.take()).ok();
+ "success"
+ } else {
+ "failure"
+ };
+
+ telemetry::event!(
+ "Agent Turn Completed",
+ agent = agent_telemetry_id,
+ session = session_id,
+ model = model_id,
+ mode = mode_id,
+ status,
+ turn_time_ms,
+ );
+ res
+ });
+
+ cx.spawn(async move |this, cx| {
+ if let Err(err) = task.await {
+ this.update(cx, |this, cx| {
+ this.handle_thread_error(err, cx);
+ })
+ .ok();
+ } else {
+ this.update(cx, |this, cx| {
+ this.should_be_following = this
+ .workspace
+ .update(cx, |workspace, _| {
+ workspace.is_being_followed(CollaboratorId::Agent)
+ })
+ .unwrap_or_default();
+ })
+ .ok();
+ }
+ })
+ .detach();
+ }
+
fn cancel_editing(&mut self, _: &ClickEvent, window: &mut Window, cx: &mut Context<Self>) {
let Some(thread) = self.thread().cloned() else {
return;
@@ -1474,6 +1666,16 @@ impl AcpThreadView {
window,
cx,
);
+
+ if self.skip_queue_processing_count > 0 {
+ self.skip_queue_processing_count -= 1;
+ } else if self.user_interrupted_generation {
+ // Manual interruption: don't auto-process queue.
+ // Reset the flag so future completions can process normally.
+ self.user_interrupted_generation = false;
+ } else if !self.message_queue.is_empty() {
+ self.send_queued_message_at_index(0, false, window, cx);
+ }
}
AcpThreadEvent::Refusal => {
self.thread_retry_status.take();
@@ -3831,7 +4033,7 @@ impl AcpThreadView {
let changed_buffers = action_log.read(cx).changed_buffers(cx);
let plan = thread.plan();
- if changed_buffers.is_empty() && plan.is_empty() {
+ if changed_buffers.is_empty() && plan.is_empty() && self.message_queue.is_empty() {
return None;
}
@@ -3882,6 +4084,15 @@ impl AcpThreadView {
))
})
})
+ .when(!self.message_queue.is_empty(), |this| {
+ this.when(!plan.is_empty() || !changed_buffers.is_empty(), |this| {
+ this.child(Divider::horizontal().color(DividerColor::Border))
+ })
+ .child(self.render_message_queue_summary(window, cx))
+ .when(self.queue_expanded, |parent| {
+ parent.child(self.render_message_queue_entries(window, cx))
+ })
+ })
.into_any()
.into()
}
@@ -4357,6 +4568,154 @@ impl AcpThreadView {
.into_any_element()
}
+ fn render_message_queue_summary(
+ &self,
+ _window: &mut Window,
+ cx: &Context<Self>,
+ ) -> impl IntoElement {
+ let queue_count = self.message_queue.len();
+ let title: SharedString = if queue_count == 1 {
+ "1 Queued Message".into()
+ } else {
+ format!("{} Queued Messages", queue_count).into()
+ };
+
+ h_flex()
+ .p_1()
+ .w_full()
+ .gap_1()
+ .justify_between()
+ .when(self.queue_expanded, |this| {
+ this.border_b_1().border_color(cx.theme().colors().border)
+ })
+ .child(
+ h_flex()
+ .id("queue_summary")
+ .gap_1()
+ .child(Disclosure::new("queue_disclosure", self.queue_expanded))
+ .child(Label::new(title).size(LabelSize::Small).color(Color::Muted))
+ .on_click(cx.listener(|this, _, _, cx| {
+ this.queue_expanded = !this.queue_expanded;
+ cx.notify();
+ })),
+ )
+ .child(
+ Button::new("clear_queue", "Clear All")
+ .label_size(LabelSize::Small)
+ .key_binding(KeyBinding::for_action(&ClearMessageQueue, cx))
+ .on_click(cx.listener(|this, _, _, cx| {
+ this.message_queue.clear();
+ cx.notify();
+ })),
+ )
+ }
+
+ fn render_message_queue_entries(
+ &self,
+ _window: &mut Window,
+ cx: &Context<Self>,
+ ) -> impl IntoElement {
+ let message_editor = self.message_editor.read(cx);
+ let focus_handle = message_editor.focus_handle(cx);
+
+ v_flex()
+ .id("message_queue_list")
+ .max_h_40()
+ .overflow_y_scroll()
+ .children(
+ self.message_queue
+ .iter()
+ .enumerate()
+ .map(|(index, queued)| {
+ let is_next = index == 0;
+ let icon_color = if is_next { Color::Accent } else { Color::Muted };
+ let queue_len = self.message_queue.len();
+
+ let preview = queued
+ .content
+ .iter()
+ .find_map(|block| match block {
+ acp::ContentBlock::Text(text) => {
+ text.text.lines().next().map(str::to_owned)
+ }
+ _ => None,
+ })
+ .unwrap_or_default();
+
+ h_flex()
+ .group("queue_entry")
+ .w_full()
+ .p_1()
+ .pl_2()
+ .gap_1()
+ .justify_between()
+ .bg(cx.theme().colors().editor_background)
+ .when(index < queue_len - 1, |parent| {
+ parent.border_color(cx.theme().colors().border).border_b_1()
+ })
+ .child(
+ h_flex()
+ .id(("queued_prompt", index))
+ .min_w_0()
+ .w_full()
+ .gap_1p5()
+ .child(
+ Icon::new(IconName::Circle)
+ .size(IconSize::Small)
+ .color(icon_color),
+ )
+ .child(
+ Label::new(preview)
+ .size(LabelSize::XSmall)
+ .color(Color::Muted)
+ .buffer_font(cx)
+ .truncate(),
+ )
+ .when(is_next, |this| {
+ this.tooltip(Tooltip::text("Next Prompt in the Queue"))
+ }),
+ )
+ .child(
+ h_flex()
+ .flex_none()
+ .gap_1()
+ .visible_on_hover("queue_entry")
+ .child(
+ Button::new(("delete", index), "Remove")
+ .label_size(LabelSize::Small)
+ .on_click(cx.listener(move |this, _, _, cx| {
+ if index < this.message_queue.len() {
+ this.message_queue.remove(index);
+ cx.notify();
+ }
+ })),
+ )
+ .child(
+ Button::new(("send_now", index), "Send Now")
+ .style(ButtonStyle::Outlined)
+ .label_size(LabelSize::Small)
+ .when(is_next, |this| {
+ this.key_binding(
+ KeyBinding::for_action_in(
+ &SendNextQueuedMessage,
+ &focus_handle.clone(),
+ cx,
+ )
+ .map(|kb| kb.size(rems_from_px(10.))),
+ )
+ })
+ .on_click(cx.listener(move |this, _, window, cx| {
+ this.send_queued_message_at_index(
+ index, true, window, cx,
+ );
+ })),
+ ),
+ )
+ }),
+ )
+ .into_any_element()
+ }
+
fn render_message_editor(&mut self, window: &mut Window, cx: &mut Context<Self>) -> AnyElement {
let focus_handle = self.message_editor.focus_handle(cx);
let editor_bg_color = cx.theme().colors().editor_background;
@@ -4639,7 +4998,10 @@ impl AcpThreadView {
}
fn render_send_button(&self, cx: &mut Context<Self>) -> AnyElement {
- let is_editor_empty = self.message_editor.read(cx).is_empty(cx);
+ let message_editor = self.message_editor.read(cx);
+ let is_editor_empty = message_editor.is_empty(cx);
+ let focus_handle = message_editor.focus_handle(cx);
+
let is_generating = self
.thread()
.is_some_and(|thread| thread.read(cx).status() != ThreadStatus::Idle);
@@ -4654,21 +5016,13 @@ impl AcpThreadView {
} else if is_generating && is_editor_empty {
IconButton::new("stop-generation", IconName::Stop)
.icon_color(Color::Error)
- .style(ButtonStyle::Tinted(ui::TintColor::Error))
+ .style(ButtonStyle::Tinted(TintColor::Error))
.tooltip(move |_window, cx| {
Tooltip::for_action("Stop Generation", &editor::actions::Cancel, cx)
})
.on_click(cx.listener(|this, _event, _, cx| this.cancel_generation(cx)))
.into_any_element()
} else {
- let send_btn_tooltip = if is_editor_empty && !is_generating {
- "Type to Send"
- } else if is_generating {
- "Stop and Send Message"
- } else {
- "Send"
- };
-
IconButton::new("send-message", IconName::Send)
.style(ButtonStyle::Filled)
.map(|this| {
@@ -4678,7 +5032,46 @@ impl AcpThreadView {
this.icon_color(Color::Accent)
}
})
- .tooltip(move |_window, cx| Tooltip::for_action(send_btn_tooltip, &Chat, cx))
+ .tooltip(move |_window, cx| {
+ if is_editor_empty && !is_generating {
+ Tooltip::for_action("Type to Send", &Chat, cx)
+ } else {
+ let title = if is_generating {
+ "Stop and Send Message"
+ } else {
+ "Send"
+ };
+
+ let focus_handle = focus_handle.clone();
+
+ Tooltip::element(move |_window, cx| {
+ v_flex()
+ .gap_1()
+ .child(
+ h_flex()
+ .gap_2()
+ .justify_between()
+ .child(Label::new(title))
+ .child(KeyBinding::for_action_in(&Chat, &focus_handle, cx)),
+ )
+ .child(
+ h_flex()
+ .pt_1()
+ .gap_2()
+ .justify_between()
+ .border_t_1()
+ .border_color(cx.theme().colors().border_variant)
+ .child(Label::new("Queue Message"))
+ .child(KeyBinding::for_action_in(
+ &QueueMessage,
+ &focus_handle,
+ cx,
+ )),
+ )
+ .into_any_element()
+ })(_window, cx)
+ }
+ })
.on_click(cx.listener(|this, _, window, cx| {
this.send(window, cx);
}))
@@ -6073,6 +6466,13 @@ impl Render for AcpThreadView {
.on_action(cx.listener(Self::allow_always))
.on_action(cx.listener(Self::allow_once))
.on_action(cx.listener(Self::reject_once))
+ .on_action(cx.listener(|this, _: &SendNextQueuedMessage, window, cx| {
+ this.send_queued_message_at_index(0, true, window, cx);
+ }))
+ .on_action(cx.listener(|this, _: &ClearMessageQueue, _, cx| {
+ this.message_queue.clear();
+ cx.notify();
+ }))
.on_action(cx.listener(|this, _: &ToggleProfileSelector, window, cx| {
if let Some(profile_selector) = this.profile_selector.as_ref() {
profile_selector.read(cx).menu_handle().toggle(window, cx);