acp_thread: Stream in agent text in a more continous manner (#51499)

Lukas Wirth created

Release Notes:

- N/A *or* Added/Fixed/Improved ...

Change summary

crates/acp_thread/src/acp_thread.rs | 174 ++++++++++++++++++++++++++++++
1 file changed, 173 insertions(+), 1 deletion(-)

Detailed changes

crates/acp_thread/src/acp_thread.rs 🔗

@@ -976,6 +976,30 @@ pub struct AcpThread {
     draft_prompt: Option<Vec<acp::ContentBlock>>,
     /// The initial scroll position for the thread view, set during session registration.
     ui_scroll_position: Option<gpui::ListOffset>,
+    /// Buffer for smooth text streaming. Holds text that has been received from
+    /// the model but not yet revealed in the UI. A timer task drains this buffer
+    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
+    /// updates.
+    streaming_text_buffer: Option<StreamingTextBuffer>,
+}
+
+struct StreamingTextBuffer {
+    /// Text received from the model but not yet appended to the Markdown source.
+    pending: String,
+    /// The number of bytes to reveal per timer turn.
+    bytes_to_reveal_per_tick: usize,
+    /// The Markdown entity being streamed into.
+    target: Entity<Markdown>,
+    /// Timer task that periodically moves text from `pending` into `source`.
+    _reveal_task: Task<()>,
+}
+
+impl StreamingTextBuffer {
+    /// The number of milliseconds between each timer tick, controlling how quickly
+    /// text is revealed.
+    const TASK_UPDATE_MS: u64 = 16;
+    /// The time in milliseconds to reveal the entire pending text.
+    const REVEAL_TARGET: f32 = 200.0;
 }
 
 impl From<&AcpThread> for ActionLogTelemetry {
@@ -1137,6 +1161,7 @@ impl AcpThread {
             had_error: false,
             draft_prompt: None,
             ui_scroll_position: None,
+            streaming_text_buffer: None,
         }
     }
 
@@ -1343,6 +1368,7 @@ impl AcpThread {
             }) = last_entry
             && *existing_indented == indented
         {
+            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
             *id = message_id.or(id.take());
             content.append(chunk.clone(), &language_registry, path_style, cx);
             chunks.push(chunk);
@@ -1379,8 +1405,20 @@ impl AcpThread {
         indented: bool,
         cx: &mut Context<Self>,
     ) {
-        let language_registry = self.project.read(cx).languages().clone();
         let path_style = self.project.read(cx).path_style(cx);
+
+        // For text chunks going to an existing Markdown block, buffer for smooth
+        // streaming instead of appending all at once which may feel more choppy.
+        if let acp::ContentBlock::Text(text_content) = &chunk {
+            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
+                let entries_len = self.entries.len();
+                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
+                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
+                return;
+            }
+        }
+
+        let language_registry = self.project.read(cx).languages().clone();
         let entries_len = self.entries.len();
         if let Some(last_entry) = self.entries.last_mut()
             && let AgentThreadEntry::AssistantMessage(AssistantMessage {
@@ -1391,6 +1429,7 @@ impl AcpThread {
             && *existing_indented == indented
         {
             let idx = entries_len - 1;
+            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
             cx.emit(AcpThreadEvent::EntryUpdated(idx));
             match (chunks.last_mut(), is_thought) {
                 (Some(AssistantMessageChunk::Message { block }), false)
@@ -1425,7 +1464,134 @@ impl AcpThread {
         }
     }
 
+    fn streaming_markdown_target(
+        &self,
+        is_thought: bool,
+        indented: bool,
+    ) -> Option<Entity<Markdown>> {
+        let last_entry = self.entries.last()?;
+        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
+            chunks,
+            indented: existing_indented,
+            ..
+        }) = last_entry
+            && *existing_indented == indented
+            && let [.., chunk] = chunks.as_slice()
+        {
+            match (chunk, is_thought) {
+                (
+                    AssistantMessageChunk::Message {
+                        block: ContentBlock::Markdown { markdown },
+                    },
+                    false,
+                )
+                | (
+                    AssistantMessageChunk::Thought {
+                        block: ContentBlock::Markdown { markdown },
+                    },
+                    true,
+                ) => Some(markdown.clone()),
+                _ => None,
+            }
+        } else {
+            None
+        }
+    }
+
+    /// Add text to the streaming buffer. If the target changed (e.g. switching
+    /// from thoughts to message text), flush the old buffer first.
+    fn buffer_streaming_text(
+        &mut self,
+        markdown: &Entity<Markdown>,
+        text: String,
+        cx: &mut Context<Self>,
+    ) {
+        if let Some(buffer) = &mut self.streaming_text_buffer {
+            if buffer.target.entity_id() == markdown.entity_id() {
+                buffer.pending.push_str(&text);
+
+                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
+                    / StreamingTextBuffer::REVEAL_TARGET
+                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
+                    .ceil() as usize;
+                return;
+            }
+            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
+        }
+
+        let target = markdown.clone();
+        let _reveal_task = self.start_streaming_reveal(cx);
+        let pending_len = text.len();
+        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
+            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
+            .ceil() as usize;
+        self.streaming_text_buffer = Some(StreamingTextBuffer {
+            pending: text,
+            bytes_to_reveal_per_tick: bytes_to_reveal,
+            target,
+            _reveal_task,
+        });
+    }
+
+    /// Flush all buffered streaming text into the Markdown entity immediately.
+    fn flush_streaming_text(
+        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
+        cx: &mut Context<Self>,
+    ) {
+        if let Some(buffer) = streaming_text_buffer.take() {
+            if !buffer.pending.is_empty() {
+                buffer
+                    .target
+                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
+            }
+        }
+    }
+
+    /// Spawns a foreground task that periodically drains
+    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
+    /// producing smooth, continuous text output.
+    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
+        cx.spawn(async move |this, cx| {
+            loop {
+                cx.background_executor()
+                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
+                    .await;
+
+                let should_continue = this
+                    .update(cx, |this, cx| {
+                        let Some(buffer) = &mut this.streaming_text_buffer else {
+                            return false;
+                        };
+
+                        if buffer.pending.is_empty() {
+                            return true;
+                        }
+
+                        let pending_len = buffer.pending.len();
+
+                        let byte_boundary = buffer
+                            .pending
+                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
+                            .min(pending_len);
+
+                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
+                            markdown.append(&buffer.pending[..byte_boundary], cx);
+                            buffer.pending.drain(..byte_boundary);
+                        });
+
+                        true
+                    })
+                    .unwrap_or(false);
+
+                if !should_continue {
+                    break;
+                }
+            }
+        })
+    }
+
     fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
+        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
         self.entries.push(entry);
         cx.emit(AcpThreadEvent::NewEntry);
     }
@@ -1970,6 +2136,8 @@ impl AcpThread {
 
                 match response {
                     Ok(r) => {
+                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
+
                         if r.stop_reason == acp::StopReason::MaxTokens {
                             this.had_error = true;
                             cx.emit(AcpThreadEvent::Error);
@@ -2022,6 +2190,8 @@ impl AcpThread {
                         Ok(Some(r))
                     }
                     Err(e) => {
+                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
+
                         this.had_error = true;
                         cx.emit(AcpThreadEvent::Error);
                         log::error!("Error in run turn: {:?}", e);
@@ -2039,6 +2209,7 @@ impl AcpThread {
         };
         self.connection.cancel(&self.session_id, cx);
 
+        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
         self.mark_pending_tools_as_canceled();
 
         // Wait for the send task to complete
@@ -2103,6 +2274,7 @@ impl AcpThread {
             return Task::ready(Err(anyhow!("not supported")));
         };
 
+        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
         let telemetry = ActionLogTelemetry::from(&*self);
         cx.spawn(async move |this, cx| {
             cx.update(|cx| truncate.run(id.clone(), cx)).await?;