Batch completion events to reduce notify() calls during streaming (#46802)

Richard Feldman created

## Problem

Profiling showed that during agent streaming, `thread.rs:1393:23` was
appearing constantly as a hotspot. The issue was that every single token
from the model triggered:
1. `this.update(cx, ...)` 
2. `handle_text_event()` (or thinking/redacted_thinking)
3. `cx.notify()`
4. UI re-render

This created significant foreground thread pressure, contributing to
~500ms delays visible in the profiler.

## Solution

Batch all immediately-available events using `now_or_never()` and
process them in a single `update()` call with one `notify()` at the end.

This approach is deterministic - it processes exactly what's available
right now, adapting naturally to network speed:
- When tokens arrive slowly, you get one at a time
- When they arrive in bursts, they batch automatically

## Changes

- Remove `cx.notify()` from `handle_text_event`,
`handle_thinking_event`, and `handle_redacted_thinking_event`
- Batch events in the streaming loop using `now_or_never()`
- Call `cx.notify()` once per batch instead of per-event
- Keep `cx.notify()` in `handle_tool_use_event` for immediate tool
feedback

Release Notes:
- Improve streaming tool call performance by batching UI updates.

Change summary

crates/agent/src/thread.rs | 86 ++++++++++++++++++++++++---------------
1 file changed, 52 insertions(+), 34 deletions(-)

Detailed changes

crates/agent/src/thread.rs 🔗

@@ -1471,8 +1471,8 @@ impl Thread {
             let mut tool_results = FuturesUnordered::new();
             let mut cancelled = false;
             loop {
-                // Race between getting the next event and cancellation
-                let event = futures::select! {
+                // Race between getting the first event and cancellation
+                let first_event = futures::select! {
                     event = events.next().fuse() => event,
                     _ = cancellation_rx.changed().fuse() => {
                         if *cancellation_rx.borrow() {
@@ -1482,25 +1482,54 @@ impl Thread {
                         continue;
                     }
                 };
-                let Some(event) = event else {
+                let Some(first_event) = first_event else {
                     break;
                 };
-                log::trace!("Received completion event: {:?}", event);
-                match event {
-                    Ok(event) => {
-                        tool_results.extend(this.update(cx, |this, cx| {
-                            this.handle_completion_event(
-                                event,
-                                event_stream,
-                                cancellation_rx.clone(),
-                                cx,
-                            )
-                        })??);
-                    }
-                    Err(err) => {
-                        error = Some(err);
-                        break;
+
+                // Collect all immediately available events to process as a batch
+                let mut batch = vec![first_event];
+                while let Some(event) = events.next().now_or_never().flatten() {
+                    batch.push(event);
+                }
+
+                // Process the batch in a single update
+                let batch_result = this.update(cx, |this, cx| {
+                    let mut batch_tool_results = Vec::new();
+                    let mut batch_error = None;
+
+                    for event in batch {
+                        log::trace!("Received completion event: {:?}", event);
+                        match event {
+                            Ok(event) => {
+                                match this.handle_completion_event(
+                                    event,
+                                    event_stream,
+                                    cancellation_rx.clone(),
+                                    cx,
+                                ) {
+                                    Ok(Some(task)) => batch_tool_results.push(task),
+                                    Ok(None) => {}
+                                    Err(err) => {
+                                        batch_error = Some(err);
+                                        break;
+                                    }
+                                }
+                            }
+                            Err(err) => {
+                                batch_error = Some(err.into());
+                                break;
+                            }
+                        }
                     }
+
+                    cx.notify();
+                    (batch_tool_results, batch_error)
+                })?;
+
+                tool_results.extend(batch_result.0);
+                if let Some(err) = batch_result.1 {
+                    error = Some(err.downcast()?);
+                    break;
                 }
             }
 
@@ -1638,11 +1667,11 @@ impl Thread {
                 self.flush_pending_message(cx);
                 self.pending_message = Some(AgentMessage::default());
             }
-            Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
+            Text(new_text) => self.handle_text_event(new_text, event_stream),
             Thinking { text, signature } => {
-                self.handle_thinking_event(text, signature, event_stream, cx)
+                self.handle_thinking_event(text, signature, event_stream)
             }
-            RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
+            RedactedThinking { data } => self.handle_redacted_thinking_event(data),
             ReasoningDetails(details) => {
                 let last_message = self.pending_message();
                 // Store the last non-empty reasoning_details (overwrites earlier ones)
@@ -1702,12 +1731,7 @@ impl Thread {
         Ok(None)
     }
 
-    fn handle_text_event(
-        &mut self,
-        new_text: String,
-        event_stream: &ThreadEventStream,
-        cx: &mut Context<Self>,
-    ) {
+    fn handle_text_event(&mut self, new_text: String, event_stream: &ThreadEventStream) {
         event_stream.send_text(&new_text);
 
         let last_message = self.pending_message();
@@ -1718,8 +1742,6 @@ impl Thread {
                 .content
                 .push(AgentMessageContent::Text(new_text));
         }
-
-        cx.notify();
     }
 
     fn handle_thinking_event(
@@ -1727,7 +1749,6 @@ impl Thread {
         new_text: String,
         new_signature: Option<String>,
         event_stream: &ThreadEventStream,
-        cx: &mut Context<Self>,
     ) {
         event_stream.send_thinking(&new_text);
 
@@ -1743,16 +1764,13 @@ impl Thread {
                 signature: new_signature,
             });
         }
-
-        cx.notify();
     }
 
-    fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
+    fn handle_redacted_thinking_event(&mut self, data: String) {
         let last_message = self.pending_message();
         last_message
             .content
             .push(AgentMessageContent::RedactedThinking(data));
-        cx.notify();
     }
 
     fn handle_tool_use_event(