diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 95030443f642b019b27758f53fd413c5146857b1..7b6c198e5d77f6f962c6d259929d065feb76e48d 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -976,6 +976,30 @@ pub struct AcpThread { draft_prompt: Option>, /// The initial scroll position for the thread view, set during session registration. ui_scroll_position: Option, + /// Buffer for smooth text streaming. Holds text that has been received from + /// the model but not yet revealed in the UI. A timer task drains this buffer + /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time + /// updates. + streaming_text_buffer: Option, +} + +struct StreamingTextBuffer { + /// Text received from the model but not yet appended to the Markdown source. + pending: String, + /// The number of bytes to reveal per timer turn. + bytes_to_reveal_per_tick: usize, + /// The Markdown entity being streamed into. + target: Entity, + /// Timer task that periodically moves text from `pending` into `source`. + _reveal_task: Task<()>, +} + +impl StreamingTextBuffer { + /// The number of milliseconds between each timer tick, controlling how quickly + /// text is revealed. + const TASK_UPDATE_MS: u64 = 16; + /// The time in milliseconds to reveal the entire pending text. + const REVEAL_TARGET: f32 = 200.0; } impl From<&AcpThread> for ActionLogTelemetry { @@ -1137,6 +1161,7 @@ impl AcpThread { had_error: false, draft_prompt: None, ui_scroll_position: None, + streaming_text_buffer: None, } } @@ -1343,6 +1368,7 @@ impl AcpThread { }) = last_entry && *existing_indented == indented { + Self::flush_streaming_text(&mut self.streaming_text_buffer, cx); *id = message_id.or(id.take()); content.append(chunk.clone(), &language_registry, path_style, cx); chunks.push(chunk); @@ -1379,8 +1405,20 @@ impl AcpThread { indented: bool, cx: &mut Context, ) { - let language_registry = self.project.read(cx).languages().clone(); let path_style = self.project.read(cx).path_style(cx); + + // For text chunks going to an existing Markdown block, buffer for smooth + // streaming instead of appending all at once which may feel more choppy. + if let acp::ContentBlock::Text(text_content) = &chunk { + if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) { + let entries_len = self.entries.len(); + cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1)); + self.buffer_streaming_text(&markdown, text_content.text.clone(), cx); + return; + } + } + + let language_registry = self.project.read(cx).languages().clone(); let entries_len = self.entries.len(); if let Some(last_entry) = self.entries.last_mut() && let AgentThreadEntry::AssistantMessage(AssistantMessage { @@ -1391,6 +1429,7 @@ impl AcpThread { && *existing_indented == indented { let idx = entries_len - 1; + Self::flush_streaming_text(&mut self.streaming_text_buffer, cx); cx.emit(AcpThreadEvent::EntryUpdated(idx)); match (chunks.last_mut(), is_thought) { (Some(AssistantMessageChunk::Message { block }), false) @@ -1425,7 +1464,134 @@ impl AcpThread { } } + fn streaming_markdown_target( + &self, + is_thought: bool, + indented: bool, + ) -> Option> { + let last_entry = self.entries.last()?; + if let AgentThreadEntry::AssistantMessage(AssistantMessage { + chunks, + indented: existing_indented, + .. + }) = last_entry + && *existing_indented == indented + && let [.., chunk] = chunks.as_slice() + { + match (chunk, is_thought) { + ( + AssistantMessageChunk::Message { + block: ContentBlock::Markdown { markdown }, + }, + false, + ) + | ( + AssistantMessageChunk::Thought { + block: ContentBlock::Markdown { markdown }, + }, + true, + ) => Some(markdown.clone()), + _ => None, + } + } else { + None + } + } + + /// Add text to the streaming buffer. If the target changed (e.g. switching + /// from thoughts to message text), flush the old buffer first. + fn buffer_streaming_text( + &mut self, + markdown: &Entity, + text: String, + cx: &mut Context, + ) { + if let Some(buffer) = &mut self.streaming_text_buffer { + if buffer.target.entity_id() == markdown.entity_id() { + buffer.pending.push_str(&text); + + buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32 + / StreamingTextBuffer::REVEAL_TARGET + * StreamingTextBuffer::TASK_UPDATE_MS as f32) + .ceil() as usize; + return; + } + Self::flush_streaming_text(&mut self.streaming_text_buffer, cx); + } + + let target = markdown.clone(); + let _reveal_task = self.start_streaming_reveal(cx); + let pending_len = text.len(); + let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET + * StreamingTextBuffer::TASK_UPDATE_MS as f32) + .ceil() as usize; + self.streaming_text_buffer = Some(StreamingTextBuffer { + pending: text, + bytes_to_reveal_per_tick: bytes_to_reveal, + target, + _reveal_task, + }); + } + + /// Flush all buffered streaming text into the Markdown entity immediately. + fn flush_streaming_text( + streaming_text_buffer: &mut Option, + cx: &mut Context, + ) { + if let Some(buffer) = streaming_text_buffer.take() { + if !buffer.pending.is_empty() { + buffer + .target + .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx)); + } + } + } + + /// Spawns a foreground task that periodically drains + /// `streaming_text_buffer.pending` into the target `Markdown` entity, + /// producing smooth, continuous text output. + fn start_streaming_reveal(&self, cx: &mut Context) -> Task<()> { + cx.spawn(async move |this, cx| { + loop { + cx.background_executor() + .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS)) + .await; + + let should_continue = this + .update(cx, |this, cx| { + let Some(buffer) = &mut this.streaming_text_buffer else { + return false; + }; + + if buffer.pending.is_empty() { + return true; + } + + let pending_len = buffer.pending.len(); + + let byte_boundary = buffer + .pending + .ceil_char_boundary(buffer.bytes_to_reveal_per_tick) + .min(pending_len); + + buffer.target.update(cx, |markdown: &mut Markdown, cx| { + markdown.append(&buffer.pending[..byte_boundary], cx); + buffer.pending.drain(..byte_boundary); + }); + + true + }) + .unwrap_or(false); + + if !should_continue { + break; + } + } + }) + } + fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context) { + Self::flush_streaming_text(&mut self.streaming_text_buffer, cx); self.entries.push(entry); cx.emit(AcpThreadEvent::NewEntry); } @@ -1970,6 +2136,8 @@ impl AcpThread { match response { Ok(r) => { + Self::flush_streaming_text(&mut this.streaming_text_buffer, cx); + if r.stop_reason == acp::StopReason::MaxTokens { this.had_error = true; cx.emit(AcpThreadEvent::Error); @@ -2022,6 +2190,8 @@ impl AcpThread { Ok(Some(r)) } Err(e) => { + Self::flush_streaming_text(&mut this.streaming_text_buffer, cx); + this.had_error = true; cx.emit(AcpThreadEvent::Error); log::error!("Error in run turn: {:?}", e); @@ -2039,6 +2209,7 @@ impl AcpThread { }; self.connection.cancel(&self.session_id, cx); + Self::flush_streaming_text(&mut self.streaming_text_buffer, cx); self.mark_pending_tools_as_canceled(); // Wait for the send task to complete @@ -2103,6 +2274,7 @@ impl AcpThread { return Task::ready(Err(anyhow!("not supported"))); }; + Self::flush_streaming_text(&mut self.streaming_text_buffer, cx); let telemetry = ActionLogTelemetry::from(&*self); cx.spawn(async move |this, cx| { cx.update(|cx| truncate.run(id.clone(), cx)).await?;