diff --git a/crates/agent/src/tools/streaming_edit_file_tool.rs b/crates/agent/src/tools/streaming_edit_file_tool.rs index 74e91ee1d2607ad1f68a5d327cd0519699cce88b..f2d3feb33af8c6a5f56e330b3dfc96e126ccfd28 100644 --- a/crates/agent/src/tools/streaming_edit_file_tool.rs +++ b/crates/agent/src/tools/streaming_edit_file_tool.rs @@ -428,22 +428,25 @@ pub struct EditSession { buffer: Entity, old_text: Arc, diff: Entity, - mode: StreamingEditFileMode, parser: ToolEditParser, - pipeline: EditPipeline, + pipeline: Pipeline, _finalize_diff_guard: Deferred>, } -enum EditPipeline { - Write { - content_written: bool, - streaming_diff: StreamingDiff, - line_diff: LineDiff, - original_snapshot: text::BufferSnapshot, - }, - Edit { - current_edit: Option, - }, +enum Pipeline { + Write(WritePipeline), + Edit(EditPipeline), +} + +struct WritePipeline { + content_written: bool, + streaming_diff: StreamingDiff, + line_diff: LineDiff, + original_snapshot: text::BufferSnapshot, +} + +struct EditPipeline { + current_edit: Option, } enum EditPipelineEntry { @@ -459,29 +462,261 @@ enum EditPipelineEntry { }, } -impl EditPipeline { +impl Pipeline { fn new(mode: StreamingEditFileMode, original_snapshot: text::BufferSnapshot) -> Self { match mode { - StreamingEditFileMode::Write => Self::Write { + StreamingEditFileMode::Write => Self::Write(WritePipeline { streaming_diff: StreamingDiff::new(original_snapshot.text()), line_diff: LineDiff::default(), content_written: false, original_snapshot, - }, - StreamingEditFileMode::Edit => Self::Edit { current_edit: None }, + }), + StreamingEditFileMode::Edit => Self::Edit(EditPipeline { current_edit: None }), } } +} +impl WritePipeline { + fn process_event( + &mut self, + event: &ToolEditEvent, + buffer: &Entity, + diff: &Entity, + tool: &StreamingEditFileTool, + cx: &mut AsyncApp, + ) { + let ToolEditEvent::ContentChunk { chunk } = event else { + return; + }; + + let (buffer_id, buffer_len) = + buffer.read_with(cx, |buffer, _cx| (buffer.remote_id(), buffer.len())); + let edit_range = if self.content_written { + buffer_len..buffer_len + } else { + 0..buffer_len + }; + + agent_edit_buffer(buffer, [(edit_range, chunk.as_str())], &tool.action_log, cx); + let char_ops = self.streaming_diff.push_new(chunk); + self.line_diff + .push_char_operations(&char_ops, self.original_snapshot.as_rope()); + diff.update(cx, |diff, cx| { + diff.update_pending( + self.line_diff.line_operations(), + self.original_snapshot.clone(), + cx, + ) + }); + + cx.update(|cx| { + tool.set_agent_location( + buffer.downgrade(), + text::Anchor::max_for_buffer(buffer_id), + cx, + ); + }); + self.content_written = true; + } +} + +impl EditPipeline { fn ensure_resolving_old_text(&mut self, buffer: &Entity, cx: &mut AsyncApp) { - if let Self::Edit { current_edit } = self - && current_edit.is_none() - { + if self.current_edit.is_none() { let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.text_snapshot()); - *current_edit = Some(EditPipelineEntry::ResolvingOldText { + self.current_edit = Some(EditPipelineEntry::ResolvingOldText { matcher: StreamingFuzzyMatcher::new(snapshot), }); } } + + fn process_event( + &mut self, + event: &ToolEditEvent, + buffer: &Entity, + diff: &Entity, + abs_path: &PathBuf, + tool: &StreamingEditFileTool, + event_stream: &ToolCallEventStream, + cx: &mut AsyncApp, + ) -> Result<(), StreamingEditFileToolOutput> { + match event { + ToolEditEvent::ContentChunk { .. } => {} + ToolEditEvent::OldTextChunk { + chunk, done: false, .. + } => { + self.ensure_resolving_old_text(buffer, cx); + if let Some(EditPipelineEntry::ResolvingOldText { matcher }) = + &mut self.current_edit + && !chunk.is_empty() + { + if let Some(match_range) = matcher.push(chunk, None) { + let anchor_range = buffer.read_with(cx, |buffer, _cx| { + buffer.anchor_range_between(match_range.clone()) + }); + diff.update(cx, |diff, cx| diff.reveal_range(anchor_range, cx)); + + cx.update(|cx| { + let position = buffer.read(cx).anchor_before(match_range.end); + tool.set_agent_location(buffer.downgrade(), position, cx); + }); + } + } + } + ToolEditEvent::OldTextChunk { + edit_index, + chunk, + done: true, + } => { + self.ensure_resolving_old_text(buffer, cx); + let Some(EditPipelineEntry::ResolvingOldText { matcher }) = &mut self.current_edit + else { + return Ok(()); + }; + + if !chunk.is_empty() { + matcher.push(chunk, None); + } + let range = extract_match(matcher.finish(), buffer, edit_index, cx)?; + + let anchor_range = + buffer.read_with(cx, |buffer, _cx| buffer.anchor_range_between(range.clone())); + diff.update(cx, |diff, cx| diff.reveal_range(anchor_range, cx)); + + let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); + + let line = snapshot.offset_to_point(range.start).row; + event_stream.update_fields( + ToolCallUpdateFields::new() + .locations(vec![ToolCallLocation::new(abs_path).line(Some(line))]), + ); + + let buffer_indent = snapshot.line_indent_for_row(line); + let query_indent = text::LineIndent::from_iter( + matcher + .query_lines() + .first() + .map(|s| s.as_str()) + .unwrap_or("") + .chars(), + ); + let indent_delta = compute_indent_delta(buffer_indent, query_indent); + + let old_text_in_buffer = snapshot.text_for_range(range.clone()).collect::(); + + let text_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.text_snapshot()); + self.current_edit = Some(EditPipelineEntry::StreamingNewText { + streaming_diff: StreamingDiff::new(old_text_in_buffer), + line_diff: LineDiff::default(), + edit_cursor: range.start, + reindenter: Reindenter::new(indent_delta), + original_snapshot: text_snapshot, + }); + + cx.update(|cx| { + let position = buffer.read(cx).anchor_before(range.end); + tool.set_agent_location(buffer.downgrade(), position, cx); + }); + } + ToolEditEvent::NewTextChunk { + chunk, done: false, .. + } => { + let Some(EditPipelineEntry::StreamingNewText { + streaming_diff, + line_diff, + edit_cursor, + reindenter, + original_snapshot, + .. + }) = &mut self.current_edit + else { + return Ok(()); + }; + + let reindented = reindenter.push(chunk); + if reindented.is_empty() { + return Ok(()); + } + + let char_ops = streaming_diff.push_new(&reindented); + apply_char_operations( + &char_ops, + buffer, + original_snapshot, + edit_cursor, + &tool.action_log, + cx, + ); + line_diff.push_char_operations(&char_ops, original_snapshot.as_rope()); + diff.update(cx, |diff, cx| { + diff.update_pending(line_diff.line_operations(), original_snapshot.clone(), cx) + }); + + let position = original_snapshot.anchor_before(*edit_cursor); + cx.update(|cx| { + tool.set_agent_location(buffer.downgrade(), position, cx); + }); + } + ToolEditEvent::NewTextChunk { + chunk, done: true, .. + } => { + let Some(EditPipelineEntry::StreamingNewText { + mut streaming_diff, + mut line_diff, + mut edit_cursor, + mut reindenter, + original_snapshot, + }) = self.current_edit.take() + else { + return Ok(()); + }; + + let mut final_text = reindenter.push(chunk); + final_text.push_str(&reindenter.finish()); + + if !final_text.is_empty() { + let char_ops = streaming_diff.push_new(&final_text); + apply_char_operations( + &char_ops, + buffer, + &original_snapshot, + &mut edit_cursor, + &tool.action_log, + cx, + ); + line_diff.push_char_operations(&char_ops, original_snapshot.as_rope()); + diff.update(cx, |diff, cx| { + diff.update_pending( + line_diff.line_operations(), + original_snapshot.clone(), + cx, + ) + }); + } + + let remaining_ops = streaming_diff.finish(); + apply_char_operations( + &remaining_ops, + buffer, + &original_snapshot, + &mut edit_cursor, + &tool.action_log, + cx, + ); + line_diff.push_char_operations(&remaining_ops, original_snapshot.as_rope()); + line_diff.finish(original_snapshot.as_rope()); + diff.update(cx, |diff, cx| { + diff.update_pending(line_diff.line_operations(), original_snapshot.clone(), cx) + }); + + let position = original_snapshot.anchor_before(edit_cursor); + cx.update(|cx| { + tool.set_agent_location(buffer.downgrade(), position, cx); + }); + } + } + Ok(()) + } } impl EditSession { @@ -521,19 +756,24 @@ impl EditSession { ensure_buffer_saved(&buffer, &abs_path, tool, cx)?; + if matches!(mode, StreamingEditFileMode::Write) { + tool.action_log.update(cx, |log, cx| { + log.buffer_created(buffer.clone(), cx); + }); + } + tool.action_log + .update(cx, |log, cx| log.buffer_read(buffer.clone(), cx)); + let diff = cx.new(|cx| Diff::new(buffer.clone(), cx)); event_stream.update_diff(diff.clone()); let finalize_diff_guard = util::defer(Box::new({ let diff = diff.downgrade(); let mut cx = cx.clone(); move || { - // diff.update(&mut cx, |diff, cx| diff.finalize(cx)).ok(); + diff.update(&mut cx, |diff, cx| diff.finalize(cx)).ok(); } }) as Box); - tool.action_log - .update(cx, |log, cx| log.buffer_read(buffer.clone(), cx)); - let old_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); let old_text = cx .background_spawn({ @@ -547,9 +787,8 @@ impl EditSession { buffer, old_text, diff, - mode: mode.clone(), parser: ToolEditParser::default(), - pipeline: EditPipeline::new(mode, old_snapshot.text), + pipeline: Pipeline::new(mode, old_snapshot.text), _finalize_diff_guard: finalize_diff_guard, }) } @@ -571,10 +810,6 @@ impl EditSession { let events = self.parser.finalize_content(&content); self.process_events(&events, tool, event_stream, cx)?; - - tool.action_log.update(cx, |log, cx| { - log.buffer_created(self.buffer.clone(), cx); - }); } StreamingEditFileMode::Edit => { let edits = input.edits.ok_or_else(|| { @@ -659,14 +894,14 @@ impl EditSession { event_stream: &ToolCallEventStream, cx: &mut AsyncApp, ) -> Result<(), StreamingEditFileToolOutput> { - match &self.mode { - StreamingEditFileMode::Write => { + match &self.pipeline { + Pipeline::Write(_) => { if let Some(content) = &partial.content { let events = self.parser.push_content(content); self.process_events(&events, tool, event_stream, cx)?; } } - StreamingEditFileMode::Edit => { + Pipeline::Edit(_) => { if let Some(edits) = partial.edits { let events = self.parser.push_edits(&edits); self.process_events(&events, tool, event_stream, cx)?; @@ -684,256 +919,20 @@ impl EditSession { cx: &mut AsyncApp, ) -> Result<(), StreamingEditFileToolOutput> { for event in events { - match event { - ToolEditEvent::ContentChunk { chunk } => { - let EditPipeline::Write { - streaming_diff, - line_diff, - content_written, - original_snapshot, - } = &mut self.pipeline - else { - continue; - }; - - let (buffer_id, buffer_len) = self - .buffer - .read_with(cx, |buffer, _cx| (buffer.remote_id(), buffer.len())); - let edit_range = if *content_written { - buffer_len..buffer_len - } else { - 0..buffer_len - }; - - agent_edit_buffer( - &self.buffer, - [(edit_range, chunk.as_str())], - &tool.action_log, - cx, - ); - let char_ops = streaming_diff.push_new(chunk); - line_diff.push_char_operations(&char_ops, original_snapshot.as_rope()); - self.diff.update(cx, |diff, cx| { - diff.update_pending( - line_diff.line_operations(), - original_snapshot.clone(), - cx, - ) - }); - - cx.update(|cx| { - tool.set_agent_location( - self.buffer.downgrade(), - text::Anchor::max_for_buffer(buffer_id), - cx, - ); - }); - *content_written = true; - } - - ToolEditEvent::OldTextChunk { - chunk, done: false, .. - } => { - self.pipeline.ensure_resolving_old_text(&self.buffer, cx); - let EditPipeline::Edit { current_edit } = &mut self.pipeline else { - continue; - }; - - if let Some(EditPipelineEntry::ResolvingOldText { matcher }) = current_edit - && !chunk.is_empty() - { - if let Some(match_range) = matcher.push(chunk, None) { - let anchor_range = self.buffer.read_with(cx, |buffer, _cx| { - buffer.anchor_range_between(match_range.clone()) - }); - self.diff - .update(cx, |diff, cx| diff.reveal_range(anchor_range, cx)); - - cx.update(|cx| { - let position = self.buffer.read(cx).anchor_before(match_range.end); - tool.set_agent_location(self.buffer.downgrade(), position, cx); - }); - } - } + match &mut self.pipeline { + Pipeline::Write(write) => { + write.process_event(event, &self.buffer, &self.diff, tool, cx); } - - ToolEditEvent::OldTextChunk { - edit_index, - chunk, - done: true, - } => { - self.pipeline.ensure_resolving_old_text(&self.buffer, cx); - let EditPipeline::Edit { current_edit } = &mut self.pipeline else { - continue; - }; - - let Some(EditPipelineEntry::ResolvingOldText { matcher }) = current_edit else { - continue; - }; - - if !chunk.is_empty() { - matcher.push(chunk, None); - } - let range = extract_match(matcher.finish(), &self.buffer, edit_index, cx)?; - - let anchor_range = self - .buffer - .read_with(cx, |buffer, _cx| buffer.anchor_range_between(range.clone())); - self.diff - .update(cx, |diff, cx| diff.reveal_range(anchor_range, cx)); - - let snapshot = self.buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); - - let line = snapshot.offset_to_point(range.start).row; - event_stream.update_fields( - ToolCallUpdateFields::new().locations(vec![ - ToolCallLocation::new(&self.abs_path).line(Some(line)), - ]), - ); - - let buffer_indent = snapshot.line_indent_for_row(line); - let query_indent = text::LineIndent::from_iter( - matcher - .query_lines() - .first() - .map(|s| s.as_str()) - .unwrap_or("") - .chars(), - ); - let indent_delta = compute_indent_delta(buffer_indent, query_indent); - - let old_text_in_buffer = - snapshot.text_for_range(range.clone()).collect::(); - - let text_snapshot = self - .buffer - .read_with(cx, |buffer, _cx| buffer.text_snapshot()); - *current_edit = Some(EditPipelineEntry::StreamingNewText { - streaming_diff: StreamingDiff::new(old_text_in_buffer), - line_diff: LineDiff::default(), - edit_cursor: range.start, - reindenter: Reindenter::new(indent_delta), - original_snapshot: text_snapshot, - }); - - cx.update(|cx| { - let position = self.buffer.read(cx).anchor_before(range.end); - tool.set_agent_location(self.buffer.downgrade(), position, cx); - }); - } - - ToolEditEvent::NewTextChunk { - chunk, done: false, .. - } => { - let EditPipeline::Edit { current_edit } = &mut self.pipeline else { - continue; - }; - - let Some(EditPipelineEntry::StreamingNewText { - streaming_diff, - line_diff, - edit_cursor, - reindenter, - original_snapshot, - .. - }) = current_edit - else { - continue; - }; - - let reindented = reindenter.push(chunk); - if reindented.is_empty() { - continue; - } - - let char_ops = streaming_diff.push_new(&reindented); - apply_char_operations( - &char_ops, - &self.buffer, - original_snapshot, - edit_cursor, - &tool.action_log, - cx, - ); - line_diff.push_char_operations(&char_ops, original_snapshot.as_rope()); - self.diff.update(cx, |diff, cx| { - diff.update_pending( - line_diff.line_operations(), - original_snapshot.clone(), - cx, - ) - }); - - let position = original_snapshot.anchor_before(*edit_cursor); - cx.update(|cx| { - tool.set_agent_location(self.buffer.downgrade(), position, cx); - }); - } - - ToolEditEvent::NewTextChunk { - chunk, done: true, .. - } => { - let EditPipeline::Edit { current_edit } = &mut self.pipeline else { - continue; - }; - let Some(EditPipelineEntry::StreamingNewText { - mut streaming_diff, - mut line_diff, - mut edit_cursor, - mut reindenter, - original_snapshot, - }) = current_edit.take() - else { - continue; - }; - - // Flush any remaining reindent buffer + final chunk. - let mut final_text = reindenter.push(chunk); - final_text.push_str(&reindenter.finish()); - - if !final_text.is_empty() { - let char_ops = streaming_diff.push_new(&final_text); - apply_char_operations( - &char_ops, - &self.buffer, - &original_snapshot, - &mut edit_cursor, - &tool.action_log, - cx, - ); - line_diff.push_char_operations(&char_ops, original_snapshot.as_rope()); - self.diff.update(cx, |diff, cx| { - diff.update_pending( - line_diff.line_operations(), - original_snapshot.clone(), - cx, - ) - }); - } - - let remaining_ops = streaming_diff.finish(); - apply_char_operations( - &remaining_ops, + Pipeline::Edit(edit) => { + edit.process_event( + event, &self.buffer, - &original_snapshot, - &mut edit_cursor, - &tool.action_log, + &self.diff, + &self.abs_path, + tool, + event_stream, cx, - ); - line_diff.push_char_operations(&remaining_ops, original_snapshot.as_rope()); - line_diff.finish(original_snapshot.as_rope()); - self.diff.update(cx, |diff, cx| { - diff.update_pending( - line_diff.line_operations(), - original_snapshot.clone(), - cx, - ) - }); - - let position = original_snapshot.anchor_before(edit_cursor); - cx.update(|cx| { - tool.set_agent_location(self.buffer.downgrade(), position, cx); - }); + )?; } } }