diff --git a/crates/agent/src/thread.rs b/crates/agent/src/thread.rs index 3cbda35c3d7102912f418f0cec269d8b4f45aefc..d60a47578e0257d9fe042305e8c67f7f2922a16f 100644 --- a/crates/agent/src/thread.rs +++ b/crates/agent/src/thread.rs @@ -1471,8 +1471,8 @@ impl Thread { let mut tool_results = FuturesUnordered::new(); let mut cancelled = false; loop { - // Race between getting the next event and cancellation - let event = futures::select! { + // Race between getting the first event and cancellation + let first_event = futures::select! { event = events.next().fuse() => event, _ = cancellation_rx.changed().fuse() => { if *cancellation_rx.borrow() { @@ -1482,25 +1482,54 @@ impl Thread { continue; } }; - let Some(event) = event else { + let Some(first_event) = first_event else { break; }; - log::trace!("Received completion event: {:?}", event); - match event { - Ok(event) => { - tool_results.extend(this.update(cx, |this, cx| { - this.handle_completion_event( - event, - event_stream, - cancellation_rx.clone(), - cx, - ) - })??); - } - Err(err) => { - error = Some(err); - break; + + // Collect all immediately available events to process as a batch + let mut batch = vec![first_event]; + while let Some(event) = events.next().now_or_never().flatten() { + batch.push(event); + } + + // Process the batch in a single update + let batch_result = this.update(cx, |this, cx| { + let mut batch_tool_results = Vec::new(); + let mut batch_error = None; + + for event in batch { + log::trace!("Received completion event: {:?}", event); + match event { + Ok(event) => { + match this.handle_completion_event( + event, + event_stream, + cancellation_rx.clone(), + cx, + ) { + Ok(Some(task)) => batch_tool_results.push(task), + Ok(None) => {} + Err(err) => { + batch_error = Some(err); + break; + } + } + } + Err(err) => { + batch_error = Some(err.into()); + break; + } + } } + + cx.notify(); + (batch_tool_results, batch_error) + })?; + + tool_results.extend(batch_result.0); + if let Some(err) = batch_result.1 { + error = Some(err.downcast()?); + break; } } @@ -1638,11 +1667,11 @@ impl Thread { self.flush_pending_message(cx); self.pending_message = Some(AgentMessage::default()); } - Text(new_text) => self.handle_text_event(new_text, event_stream, cx), + Text(new_text) => self.handle_text_event(new_text, event_stream), Thinking { text, signature } => { - self.handle_thinking_event(text, signature, event_stream, cx) + self.handle_thinking_event(text, signature, event_stream) } - RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx), + RedactedThinking { data } => self.handle_redacted_thinking_event(data), ReasoningDetails(details) => { let last_message = self.pending_message(); // Store the last non-empty reasoning_details (overwrites earlier ones) @@ -1702,12 +1731,7 @@ impl Thread { Ok(None) } - fn handle_text_event( - &mut self, - new_text: String, - event_stream: &ThreadEventStream, - cx: &mut Context, - ) { + fn handle_text_event(&mut self, new_text: String, event_stream: &ThreadEventStream) { event_stream.send_text(&new_text); let last_message = self.pending_message(); @@ -1718,8 +1742,6 @@ impl Thread { .content .push(AgentMessageContent::Text(new_text)); } - - cx.notify(); } fn handle_thinking_event( @@ -1727,7 +1749,6 @@ impl Thread { new_text: String, new_signature: Option, event_stream: &ThreadEventStream, - cx: &mut Context, ) { event_stream.send_thinking(&new_text); @@ -1743,16 +1764,13 @@ impl Thread { signature: new_signature, }); } - - cx.notify(); } - fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context) { + fn handle_redacted_thinking_event(&mut self, data: String) { let last_message = self.pending_message(); last_message .content .push(AgentMessageContent::RedactedThinking(data)); - cx.notify(); } fn handle_tool_use_event(