Simplify state management

Bennet Bo Fenner created

Change summary

crates/agent/src/tools/streaming_edit_file_tool.rs | 559 +++++++--------
1 file changed, 279 insertions(+), 280 deletions(-)

Detailed changes

crates/agent/src/tools/streaming_edit_file_tool.rs 🔗

@@ -428,22 +428,25 @@ pub struct EditSession {
     buffer: Entity<Buffer>,
     old_text: Arc<String>,
     diff: Entity<Diff>,
-    mode: StreamingEditFileMode,
     parser: ToolEditParser,
-    pipeline: EditPipeline,
+    pipeline: Pipeline,
     _finalize_diff_guard: Deferred<Box<dyn FnOnce()>>,
 }
 
-enum EditPipeline {
-    Write {
-        content_written: bool,
-        streaming_diff: StreamingDiff,
-        line_diff: LineDiff,
-        original_snapshot: text::BufferSnapshot,
-    },
-    Edit {
-        current_edit: Option<EditPipelineEntry>,
-    },
+enum Pipeline {
+    Write(WritePipeline),
+    Edit(EditPipeline),
+}
+
+struct WritePipeline {
+    content_written: bool,
+    streaming_diff: StreamingDiff,
+    line_diff: LineDiff,
+    original_snapshot: text::BufferSnapshot,
+}
+
+struct EditPipeline {
+    current_edit: Option<EditPipelineEntry>,
 }
 
 enum EditPipelineEntry {
@@ -459,29 +462,261 @@ enum EditPipelineEntry {
     },
 }
 
-impl EditPipeline {
+impl Pipeline {
     fn new(mode: StreamingEditFileMode, original_snapshot: text::BufferSnapshot) -> Self {
         match mode {
-            StreamingEditFileMode::Write => Self::Write {
+            StreamingEditFileMode::Write => Self::Write(WritePipeline {
                 streaming_diff: StreamingDiff::new(original_snapshot.text()),
                 line_diff: LineDiff::default(),
                 content_written: false,
                 original_snapshot,
-            },
-            StreamingEditFileMode::Edit => Self::Edit { current_edit: None },
+            }),
+            StreamingEditFileMode::Edit => Self::Edit(EditPipeline { current_edit: None }),
         }
     }
+}
 
+impl WritePipeline {
+    fn process_event(
+        &mut self,
+        event: &ToolEditEvent,
+        buffer: &Entity<Buffer>,
+        diff: &Entity<Diff>,
+        tool: &StreamingEditFileTool,
+        cx: &mut AsyncApp,
+    ) {
+        let ToolEditEvent::ContentChunk { chunk } = event else {
+            return;
+        };
+
+        let (buffer_id, buffer_len) =
+            buffer.read_with(cx, |buffer, _cx| (buffer.remote_id(), buffer.len()));
+        let edit_range = if self.content_written {
+            buffer_len..buffer_len
+        } else {
+            0..buffer_len
+        };
+
+        agent_edit_buffer(buffer, [(edit_range, chunk.as_str())], &tool.action_log, cx);
+        let char_ops = self.streaming_diff.push_new(chunk);
+        self.line_diff
+            .push_char_operations(&char_ops, self.original_snapshot.as_rope());
+        diff.update(cx, |diff, cx| {
+            diff.update_pending(
+                self.line_diff.line_operations(),
+                self.original_snapshot.clone(),
+                cx,
+            )
+        });
+
+        cx.update(|cx| {
+            tool.set_agent_location(
+                buffer.downgrade(),
+                text::Anchor::max_for_buffer(buffer_id),
+                cx,
+            );
+        });
+        self.content_written = true;
+    }
+}
+
+impl EditPipeline {
     fn ensure_resolving_old_text(&mut self, buffer: &Entity<Buffer>, cx: &mut AsyncApp) {
-        if let Self::Edit { current_edit } = self
-            && current_edit.is_none()
-        {
+        if self.current_edit.is_none() {
             let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.text_snapshot());
-            *current_edit = Some(EditPipelineEntry::ResolvingOldText {
+            self.current_edit = Some(EditPipelineEntry::ResolvingOldText {
                 matcher: StreamingFuzzyMatcher::new(snapshot),
             });
         }
     }
+
+    fn process_event(
+        &mut self,
+        event: &ToolEditEvent,
+        buffer: &Entity<Buffer>,
+        diff: &Entity<Diff>,
+        abs_path: &PathBuf,
+        tool: &StreamingEditFileTool,
+        event_stream: &ToolCallEventStream,
+        cx: &mut AsyncApp,
+    ) -> Result<(), StreamingEditFileToolOutput> {
+        match event {
+            ToolEditEvent::ContentChunk { .. } => {}
+            ToolEditEvent::OldTextChunk {
+                chunk, done: false, ..
+            } => {
+                self.ensure_resolving_old_text(buffer, cx);
+                if let Some(EditPipelineEntry::ResolvingOldText { matcher }) =
+                    &mut self.current_edit
+                    && !chunk.is_empty()
+                {
+                    if let Some(match_range) = matcher.push(chunk, None) {
+                        let anchor_range = buffer.read_with(cx, |buffer, _cx| {
+                            buffer.anchor_range_between(match_range.clone())
+                        });
+                        diff.update(cx, |diff, cx| diff.reveal_range(anchor_range, cx));
+
+                        cx.update(|cx| {
+                            let position = buffer.read(cx).anchor_before(match_range.end);
+                            tool.set_agent_location(buffer.downgrade(), position, cx);
+                        });
+                    }
+                }
+            }
+            ToolEditEvent::OldTextChunk {
+                edit_index,
+                chunk,
+                done: true,
+            } => {
+                self.ensure_resolving_old_text(buffer, cx);
+                let Some(EditPipelineEntry::ResolvingOldText { matcher }) = &mut self.current_edit
+                else {
+                    return Ok(());
+                };
+
+                if !chunk.is_empty() {
+                    matcher.push(chunk, None);
+                }
+                let range = extract_match(matcher.finish(), buffer, edit_index, cx)?;
+
+                let anchor_range =
+                    buffer.read_with(cx, |buffer, _cx| buffer.anchor_range_between(range.clone()));
+                diff.update(cx, |diff, cx| diff.reveal_range(anchor_range, cx));
+
+                let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+
+                let line = snapshot.offset_to_point(range.start).row;
+                event_stream.update_fields(
+                    ToolCallUpdateFields::new()
+                        .locations(vec![ToolCallLocation::new(abs_path).line(Some(line))]),
+                );
+
+                let buffer_indent = snapshot.line_indent_for_row(line);
+                let query_indent = text::LineIndent::from_iter(
+                    matcher
+                        .query_lines()
+                        .first()
+                        .map(|s| s.as_str())
+                        .unwrap_or("")
+                        .chars(),
+                );
+                let indent_delta = compute_indent_delta(buffer_indent, query_indent);
+
+                let old_text_in_buffer = snapshot.text_for_range(range.clone()).collect::<String>();
+
+                let text_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.text_snapshot());
+                self.current_edit = Some(EditPipelineEntry::StreamingNewText {
+                    streaming_diff: StreamingDiff::new(old_text_in_buffer),
+                    line_diff: LineDiff::default(),
+                    edit_cursor: range.start,
+                    reindenter: Reindenter::new(indent_delta),
+                    original_snapshot: text_snapshot,
+                });
+
+                cx.update(|cx| {
+                    let position = buffer.read(cx).anchor_before(range.end);
+                    tool.set_agent_location(buffer.downgrade(), position, cx);
+                });
+            }
+            ToolEditEvent::NewTextChunk {
+                chunk, done: false, ..
+            } => {
+                let Some(EditPipelineEntry::StreamingNewText {
+                    streaming_diff,
+                    line_diff,
+                    edit_cursor,
+                    reindenter,
+                    original_snapshot,
+                    ..
+                }) = &mut self.current_edit
+                else {
+                    return Ok(());
+                };
+
+                let reindented = reindenter.push(chunk);
+                if reindented.is_empty() {
+                    return Ok(());
+                }
+
+                let char_ops = streaming_diff.push_new(&reindented);
+                apply_char_operations(
+                    &char_ops,
+                    buffer,
+                    original_snapshot,
+                    edit_cursor,
+                    &tool.action_log,
+                    cx,
+                );
+                line_diff.push_char_operations(&char_ops, original_snapshot.as_rope());
+                diff.update(cx, |diff, cx| {
+                    diff.update_pending(line_diff.line_operations(), original_snapshot.clone(), cx)
+                });
+
+                let position = original_snapshot.anchor_before(*edit_cursor);
+                cx.update(|cx| {
+                    tool.set_agent_location(buffer.downgrade(), position, cx);
+                });
+            }
+            ToolEditEvent::NewTextChunk {
+                chunk, done: true, ..
+            } => {
+                let Some(EditPipelineEntry::StreamingNewText {
+                    mut streaming_diff,
+                    mut line_diff,
+                    mut edit_cursor,
+                    mut reindenter,
+                    original_snapshot,
+                }) = self.current_edit.take()
+                else {
+                    return Ok(());
+                };
+
+                let mut final_text = reindenter.push(chunk);
+                final_text.push_str(&reindenter.finish());
+
+                if !final_text.is_empty() {
+                    let char_ops = streaming_diff.push_new(&final_text);
+                    apply_char_operations(
+                        &char_ops,
+                        buffer,
+                        &original_snapshot,
+                        &mut edit_cursor,
+                        &tool.action_log,
+                        cx,
+                    );
+                    line_diff.push_char_operations(&char_ops, original_snapshot.as_rope());
+                    diff.update(cx, |diff, cx| {
+                        diff.update_pending(
+                            line_diff.line_operations(),
+                            original_snapshot.clone(),
+                            cx,
+                        )
+                    });
+                }
+
+                let remaining_ops = streaming_diff.finish();
+                apply_char_operations(
+                    &remaining_ops,
+                    buffer,
+                    &original_snapshot,
+                    &mut edit_cursor,
+                    &tool.action_log,
+                    cx,
+                );
+                line_diff.push_char_operations(&remaining_ops, original_snapshot.as_rope());
+                line_diff.finish(original_snapshot.as_rope());
+                diff.update(cx, |diff, cx| {
+                    diff.update_pending(line_diff.line_operations(), original_snapshot.clone(), cx)
+                });
+
+                let position = original_snapshot.anchor_before(edit_cursor);
+                cx.update(|cx| {
+                    tool.set_agent_location(buffer.downgrade(), position, cx);
+                });
+            }
+        }
+        Ok(())
+    }
 }
 
 impl EditSession {
@@ -521,19 +756,24 @@ impl EditSession {
 
         ensure_buffer_saved(&buffer, &abs_path, tool, cx)?;
 
+        if matches!(mode, StreamingEditFileMode::Write) {
+            tool.action_log.update(cx, |log, cx| {
+                log.buffer_created(buffer.clone(), cx);
+            });
+        }
+        tool.action_log
+            .update(cx, |log, cx| log.buffer_read(buffer.clone(), cx));
+
         let diff = cx.new(|cx| Diff::new(buffer.clone(), cx));
         event_stream.update_diff(diff.clone());
         let finalize_diff_guard = util::defer(Box::new({
             let diff = diff.downgrade();
             let mut cx = cx.clone();
             move || {
-                // diff.update(&mut cx, |diff, cx| diff.finalize(cx)).ok();
+                diff.update(&mut cx, |diff, cx| diff.finalize(cx)).ok();
             }
         }) as Box<dyn FnOnce()>);
 
-        tool.action_log
-            .update(cx, |log, cx| log.buffer_read(buffer.clone(), cx));
-
         let old_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
         let old_text = cx
             .background_spawn({
@@ -547,9 +787,8 @@ impl EditSession {
             buffer,
             old_text,
             diff,
-            mode: mode.clone(),
             parser: ToolEditParser::default(),
-            pipeline: EditPipeline::new(mode, old_snapshot.text),
+            pipeline: Pipeline::new(mode, old_snapshot.text),
             _finalize_diff_guard: finalize_diff_guard,
         })
     }
@@ -571,10 +810,6 @@ impl EditSession {
 
                 let events = self.parser.finalize_content(&content);
                 self.process_events(&events, tool, event_stream, cx)?;
-
-                tool.action_log.update(cx, |log, cx| {
-                    log.buffer_created(self.buffer.clone(), cx);
-                });
             }
             StreamingEditFileMode::Edit => {
                 let edits = input.edits.ok_or_else(|| {
@@ -659,14 +894,14 @@ impl EditSession {
         event_stream: &ToolCallEventStream,
         cx: &mut AsyncApp,
     ) -> Result<(), StreamingEditFileToolOutput> {
-        match &self.mode {
-            StreamingEditFileMode::Write => {
+        match &self.pipeline {
+            Pipeline::Write(_) => {
                 if let Some(content) = &partial.content {
                     let events = self.parser.push_content(content);
                     self.process_events(&events, tool, event_stream, cx)?;
                 }
             }
-            StreamingEditFileMode::Edit => {
+            Pipeline::Edit(_) => {
                 if let Some(edits) = partial.edits {
                     let events = self.parser.push_edits(&edits);
                     self.process_events(&events, tool, event_stream, cx)?;
@@ -684,256 +919,20 @@ impl EditSession {
         cx: &mut AsyncApp,
     ) -> Result<(), StreamingEditFileToolOutput> {
         for event in events {
-            match event {
-                ToolEditEvent::ContentChunk { chunk } => {
-                    let EditPipeline::Write {
-                        streaming_diff,
-                        line_diff,
-                        content_written,
-                        original_snapshot,
-                    } = &mut self.pipeline
-                    else {
-                        continue;
-                    };
-
-                    let (buffer_id, buffer_len) = self
-                        .buffer
-                        .read_with(cx, |buffer, _cx| (buffer.remote_id(), buffer.len()));
-                    let edit_range = if *content_written {
-                        buffer_len..buffer_len
-                    } else {
-                        0..buffer_len
-                    };
-
-                    agent_edit_buffer(
-                        &self.buffer,
-                        [(edit_range, chunk.as_str())],
-                        &tool.action_log,
-                        cx,
-                    );
-                    let char_ops = streaming_diff.push_new(chunk);
-                    line_diff.push_char_operations(&char_ops, original_snapshot.as_rope());
-                    self.diff.update(cx, |diff, cx| {
-                        diff.update_pending(
-                            line_diff.line_operations(),
-                            original_snapshot.clone(),
-                            cx,
-                        )
-                    });
-
-                    cx.update(|cx| {
-                        tool.set_agent_location(
-                            self.buffer.downgrade(),
-                            text::Anchor::max_for_buffer(buffer_id),
-                            cx,
-                        );
-                    });
-                    *content_written = true;
-                }
-
-                ToolEditEvent::OldTextChunk {
-                    chunk, done: false, ..
-                } => {
-                    self.pipeline.ensure_resolving_old_text(&self.buffer, cx);
-                    let EditPipeline::Edit { current_edit } = &mut self.pipeline else {
-                        continue;
-                    };
-
-                    if let Some(EditPipelineEntry::ResolvingOldText { matcher }) = current_edit
-                        && !chunk.is_empty()
-                    {
-                        if let Some(match_range) = matcher.push(chunk, None) {
-                            let anchor_range = self.buffer.read_with(cx, |buffer, _cx| {
-                                buffer.anchor_range_between(match_range.clone())
-                            });
-                            self.diff
-                                .update(cx, |diff, cx| diff.reveal_range(anchor_range, cx));
-
-                            cx.update(|cx| {
-                                let position = self.buffer.read(cx).anchor_before(match_range.end);
-                                tool.set_agent_location(self.buffer.downgrade(), position, cx);
-                            });
-                        }
-                    }
+            match &mut self.pipeline {
+                Pipeline::Write(write) => {
+                    write.process_event(event, &self.buffer, &self.diff, tool, cx);
                 }
-
-                ToolEditEvent::OldTextChunk {
-                    edit_index,
-                    chunk,
-                    done: true,
-                } => {
-                    self.pipeline.ensure_resolving_old_text(&self.buffer, cx);
-                    let EditPipeline::Edit { current_edit } = &mut self.pipeline else {
-                        continue;
-                    };
-
-                    let Some(EditPipelineEntry::ResolvingOldText { matcher }) = current_edit else {
-                        continue;
-                    };
-
-                    if !chunk.is_empty() {
-                        matcher.push(chunk, None);
-                    }
-                    let range = extract_match(matcher.finish(), &self.buffer, edit_index, cx)?;
-
-                    let anchor_range = self
-                        .buffer
-                        .read_with(cx, |buffer, _cx| buffer.anchor_range_between(range.clone()));
-                    self.diff
-                        .update(cx, |diff, cx| diff.reveal_range(anchor_range, cx));
-
-                    let snapshot = self.buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
-
-                    let line = snapshot.offset_to_point(range.start).row;
-                    event_stream.update_fields(
-                        ToolCallUpdateFields::new().locations(vec![
-                            ToolCallLocation::new(&self.abs_path).line(Some(line)),
-                        ]),
-                    );
-
-                    let buffer_indent = snapshot.line_indent_for_row(line);
-                    let query_indent = text::LineIndent::from_iter(
-                        matcher
-                            .query_lines()
-                            .first()
-                            .map(|s| s.as_str())
-                            .unwrap_or("")
-                            .chars(),
-                    );
-                    let indent_delta = compute_indent_delta(buffer_indent, query_indent);
-
-                    let old_text_in_buffer =
-                        snapshot.text_for_range(range.clone()).collect::<String>();
-
-                    let text_snapshot = self
-                        .buffer
-                        .read_with(cx, |buffer, _cx| buffer.text_snapshot());
-                    *current_edit = Some(EditPipelineEntry::StreamingNewText {
-                        streaming_diff: StreamingDiff::new(old_text_in_buffer),
-                        line_diff: LineDiff::default(),
-                        edit_cursor: range.start,
-                        reindenter: Reindenter::new(indent_delta),
-                        original_snapshot: text_snapshot,
-                    });
-
-                    cx.update(|cx| {
-                        let position = self.buffer.read(cx).anchor_before(range.end);
-                        tool.set_agent_location(self.buffer.downgrade(), position, cx);
-                    });
-                }
-
-                ToolEditEvent::NewTextChunk {
-                    chunk, done: false, ..
-                } => {
-                    let EditPipeline::Edit { current_edit } = &mut self.pipeline else {
-                        continue;
-                    };
-
-                    let Some(EditPipelineEntry::StreamingNewText {
-                        streaming_diff,
-                        line_diff,
-                        edit_cursor,
-                        reindenter,
-                        original_snapshot,
-                        ..
-                    }) = current_edit
-                    else {
-                        continue;
-                    };
-
-                    let reindented = reindenter.push(chunk);
-                    if reindented.is_empty() {
-                        continue;
-                    }
-
-                    let char_ops = streaming_diff.push_new(&reindented);
-                    apply_char_operations(
-                        &char_ops,
-                        &self.buffer,
-                        original_snapshot,
-                        edit_cursor,
-                        &tool.action_log,
-                        cx,
-                    );
-                    line_diff.push_char_operations(&char_ops, original_snapshot.as_rope());
-                    self.diff.update(cx, |diff, cx| {
-                        diff.update_pending(
-                            line_diff.line_operations(),
-                            original_snapshot.clone(),
-                            cx,
-                        )
-                    });
-
-                    let position = original_snapshot.anchor_before(*edit_cursor);
-                    cx.update(|cx| {
-                        tool.set_agent_location(self.buffer.downgrade(), position, cx);
-                    });
-                }
-
-                ToolEditEvent::NewTextChunk {
-                    chunk, done: true, ..
-                } => {
-                    let EditPipeline::Edit { current_edit } = &mut self.pipeline else {
-                        continue;
-                    };
-                    let Some(EditPipelineEntry::StreamingNewText {
-                        mut streaming_diff,
-                        mut line_diff,
-                        mut edit_cursor,
-                        mut reindenter,
-                        original_snapshot,
-                    }) = current_edit.take()
-                    else {
-                        continue;
-                    };
-
-                    // Flush any remaining reindent buffer + final chunk.
-                    let mut final_text = reindenter.push(chunk);
-                    final_text.push_str(&reindenter.finish());
-
-                    if !final_text.is_empty() {
-                        let char_ops = streaming_diff.push_new(&final_text);
-                        apply_char_operations(
-                            &char_ops,
-                            &self.buffer,
-                            &original_snapshot,
-                            &mut edit_cursor,
-                            &tool.action_log,
-                            cx,
-                        );
-                        line_diff.push_char_operations(&char_ops, original_snapshot.as_rope());
-                        self.diff.update(cx, |diff, cx| {
-                            diff.update_pending(
-                                line_diff.line_operations(),
-                                original_snapshot.clone(),
-                                cx,
-                            )
-                        });
-                    }
-
-                    let remaining_ops = streaming_diff.finish();
-                    apply_char_operations(
-                        &remaining_ops,
+                Pipeline::Edit(edit) => {
+                    edit.process_event(
+                        event,
                         &self.buffer,
-                        &original_snapshot,
-                        &mut edit_cursor,
-                        &tool.action_log,
+                        &self.diff,
+                        &self.abs_path,
+                        tool,
+                        event_stream,
                         cx,
-                    );
-                    line_diff.push_char_operations(&remaining_ops, original_snapshot.as_rope());
-                    line_diff.finish(original_snapshot.as_rope());
-                    self.diff.update(cx, |diff, cx| {
-                        diff.update_pending(
-                            line_diff.line_operations(),
-                            original_snapshot.clone(),
-                            cx,
-                        )
-                    });
-
-                    let position = original_snapshot.anchor_before(edit_cursor);
-                    cx.update(|cx| {
-                        tool.set_agent_location(self.buffer.downgrade(), position, cx);
-                    });
+                    )?;
                 }
             }
         }