agent: Implement streaming for edit file tool (#50004)

Bennet Bo Fenner and Zed Zippy created

Before you mark this PR as ready for review, make sure that you have:
- [x] Added a solid test coverage and/or screenshots from doing manual
testing
- [x] Done a self-review taking into account security and performance
aspects
- [x] Aligned any UI changes with the [UI
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)

Release Notes:

- N/A

---------

Co-authored-by: Zed Zippy <234243425+zed-zippy[bot]@users.noreply.github.com>

Change summary

crates/agent/src/thread.rs                         |   1 
crates/agent/src/tools.rs                          |   1 
crates/agent/src/tools/streaming_edit_file_tool.rs | 910 ++++++++++-----
crates/agent_ui/src/buffer_codegen.rs              |   2 
crates/anthropic/src/anthropic.rs                  |   6 
crates/language_model/src/request.rs               |   1 
crates/language_models/src/provider/anthropic.rs   |   2 
crates/language_models/src/provider/open_ai.rs     |   1 
8 files changed, 634 insertions(+), 290 deletions(-)

Detailed changes

crates/agent/src/thread.rs 🔗

@@ -2461,6 +2461,7 @@ impl Thread {
                         name: tool_name.to_string(),
                         description: tool.description().to_string(),
                         input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
+                        use_input_streaming: tool.supports_input_streaming(),
                     })
                 })
                 .collect::<Vec<_>>()

crates/agent/src/tools.rs 🔗

@@ -100,6 +100,7 @@ macro_rules! tools {
                     name: T::NAME.to_string(),
                     description: T::description().to_string(),
                     input_schema: T::input_schema(LanguageModelToolSchemaFormat::JsonSchema).to_value(),
+                    use_input_streaming: T::supports_input_streaming(),
                 }
             }
             [

crates/agent/src/tools/streaming_edit_file_tool.rs 🔗

@@ -11,8 +11,8 @@ use anyhow::{Context as _, Result, anyhow};
 use collections::HashSet;
 use futures::FutureExt as _;
 use gpui::{App, AppContext, AsyncApp, Entity, Task, WeakEntity};
-use language::LanguageRegistry;
 use language::language_settings::{self, FormatOnSave};
+use language::{Buffer, LanguageRegistry};
 use language_model::LanguageModelToolResultContent;
 use project::lsp_store::{FormatTrigger, LspFormatTarget};
 use project::{Project, ProjectPath};
@@ -23,8 +23,8 @@ use std::path::PathBuf;
 use std::sync::Arc;
 use text::BufferSnapshot;
 use ui::SharedString;
-use util::ResultExt;
 use util::rel_path::RelPath;
+use util::{Deferred, ResultExt, debug_panic};
 
 const DEFAULT_UI_TEXT: &str = "Editing file";
 
@@ -67,7 +67,7 @@ pub struct StreamingEditFileToolInput {
     /// <example>
     /// `frontend/db.js`
     /// </example>
-    pub path: PathBuf,
+    pub path: String,
 
     /// The mode of operation on the file. Possible values:
     /// - 'create': Create a new file if it doesn't exist. Requires 'content' field.
@@ -109,12 +109,488 @@ pub struct EditOperation {
     pub new_text: String,
 }
 
-#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
+#[derive(Default, Debug, Deserialize)]
 struct StreamingEditFileToolPartialInput {
     #[serde(default)]
-    path: String,
+    display_description: Option<String>,
+    #[serde(default)]
+    path: Option<String>,
+    #[serde(default)]
+    mode: Option<StreamingEditFileMode>,
+    #[serde(default)]
+    #[allow(dead_code)]
+    content: Option<String>,
+    #[serde(default)]
+    edits: Option<Vec<PartialEditOperation>>,
+}
+
+#[derive(Default, Debug, Deserialize)]
+struct PartialEditOperation {
+    #[serde(default)]
+    old_text: Option<String>,
     #[serde(default)]
-    display_description: String,
+    new_text: Option<String>,
+}
+
+enum StreamingEditState {
+    Idle,
+    BufferResolved {
+        abs_path: PathBuf,
+        buffer: Entity<Buffer>,
+        old_text: Arc<String>,
+        diff: Entity<Diff>,
+        edit_state: IncrementalEditState,
+        _finalize_diff_guard: Deferred<Box<dyn FnOnce()>>,
+    },
+}
+
+#[derive(Default)]
+struct IncrementalEditState {
+    applied_count: usize,
+    in_progress_matcher: Option<StreamingFuzzyMatcher>,
+    last_old_text_len: usize,
+}
+
+impl StreamingEditState {
+    async fn finalize(
+        &mut self,
+        input: StreamingEditFileToolInput,
+        tool: &StreamingEditFileTool,
+        event_stream: &ToolCallEventStream,
+        cx: &mut AsyncApp,
+    ) -> Result<StreamingEditFileToolOutput, StreamingEditFileToolOutput> {
+        let remaining_edits_start_ix = match self {
+            StreamingEditState::Idle => {
+                *self = Self::transition_to_buffer_resolved(
+                    &input.path,
+                    &input.display_description,
+                    input.mode.clone(),
+                    tool,
+                    event_stream,
+                    cx,
+                )
+                .await?;
+                0
+            }
+            StreamingEditState::BufferResolved { edit_state, .. } => edit_state.applied_count,
+        };
+
+        let StreamingEditState::BufferResolved {
+            buffer,
+            old_text,
+            diff,
+            abs_path,
+            ..
+        } = self
+        else {
+            debug_panic!("Invalid state");
+            return Ok(StreamingEditFileToolOutput::Error {
+                error: "Internal error. Try to apply the edits again".to_string(),
+            });
+        };
+
+        let result: anyhow::Result<StreamingEditFileToolOutput> = async {
+            let action_log = tool
+                .thread
+                .read_with(cx, |thread, _cx| thread.action_log().clone())?;
+
+            match input.mode {
+                StreamingEditFileMode::Create | StreamingEditFileMode::Overwrite => {
+                    action_log.update(cx, |log, cx| {
+                        log.buffer_created(buffer.clone(), cx);
+                    });
+                    let content = input.content.ok_or_else(|| {
+                        anyhow!("'content' field is required for create and overwrite modes")
+                    })?;
+                    cx.update(|cx| {
+                        buffer.update(cx, |buffer, cx| {
+                            buffer.edit([(0..buffer.len(), content.as_str())], None, cx);
+                        });
+                        action_log.update(cx, |log, cx| {
+                            log.buffer_edited(buffer.clone(), cx);
+                        });
+                    });
+                }
+                StreamingEditFileMode::Edit => {
+                    let edits = input
+                        .edits
+                        .ok_or_else(|| anyhow!("'edits' field is required for edit mode"))?;
+
+                    let remaining_edits = &edits[remaining_edits_start_ix..];
+                    apply_edits(
+                        &buffer,
+                        &action_log,
+                        remaining_edits,
+                        &diff,
+                        event_stream,
+                        &abs_path,
+                        cx,
+                    )?;
+                }
+            }
+
+            let format_on_save_enabled = buffer.read_with(cx, |buffer, cx| {
+                let settings = language_settings::language_settings(
+                    buffer.language().map(|l| l.name()),
+                    buffer.file(),
+                    cx,
+                );
+                settings.format_on_save != FormatOnSave::Off
+            });
+
+            if format_on_save_enabled {
+                action_log.update(cx, |log, cx| {
+                    log.buffer_edited(buffer.clone(), cx);
+                });
+
+                let format_task = tool.project.update(cx, |project, cx| {
+                    project.format(
+                        HashSet::from_iter([buffer.clone()]),
+                        LspFormatTarget::Buffers,
+                        false,
+                        FormatTrigger::Save,
+                        cx,
+                    )
+                });
+                futures::select! {
+                    result = format_task.fuse() => { result.log_err(); },
+                    _ = event_stream.cancelled_by_user().fuse() => {
+                        anyhow::bail!("Edit cancelled by user");
+                    }
+                };
+            }
+
+            let save_task = tool
+                .project
+                .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx));
+            futures::select! {
+                result = save_task.fuse() => { result?; },
+                _ = event_stream.cancelled_by_user().fuse() => {
+                    anyhow::bail!("Edit cancelled by user");
+                }
+            };
+
+            action_log.update(cx, |log, cx| {
+                log.buffer_edited(buffer.clone(), cx);
+            });
+
+            if let Some(new_mtime) = buffer.read_with(cx, |buffer, _| {
+                buffer.file().and_then(|file| file.disk_state().mtime())
+            }) {
+                tool.thread.update(cx, |thread, _| {
+                    thread
+                        .file_read_times
+                        .insert(abs_path.to_path_buf(), new_mtime);
+                })?;
+            }
+
+            let new_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+            let (new_text, unified_diff) = cx
+                .background_spawn({
+                    let new_snapshot = new_snapshot.clone();
+                    let old_text = old_text.clone();
+                    async move {
+                        let new_text = new_snapshot.text();
+                        let diff = language::unified_diff(&old_text, &new_text);
+                        (new_text, diff)
+                    }
+                })
+                .await;
+
+            let output = StreamingEditFileToolOutput::Success {
+                input_path: PathBuf::from(input.path),
+                new_text,
+                old_text: old_text.clone(),
+                diff: unified_diff,
+            };
+            Ok(output)
+        }
+        .await;
+        result.map_err(|e| StreamingEditFileToolOutput::Error {
+            error: e.to_string(),
+        })
+    }
+
+    async fn process(
+        &mut self,
+        partial: StreamingEditFileToolPartialInput,
+        tool: &StreamingEditFileTool,
+        event_stream: &ToolCallEventStream,
+        cx: &mut AsyncApp,
+    ) -> Result<(), StreamingEditFileToolOutput> {
+        match self {
+            Self::Idle => {
+                if let Some(path_str) = partial.path
+                    && let Some(display_description) = partial.display_description
+                    && let Some(mode) = partial.mode
+                {
+                    *self = Self::transition_to_buffer_resolved(
+                        &path_str,
+                        &display_description,
+                        mode,
+                        tool,
+                        event_stream,
+                        cx,
+                    )
+                    .await?;
+                }
+            }
+            Self::BufferResolved {
+                abs_path,
+                buffer,
+                edit_state,
+                diff,
+                ..
+            } => {
+                if let Some(edits) = partial.edits {
+                    Self::process_streaming_edits(
+                        buffer,
+                        diff,
+                        edit_state,
+                        &edits,
+                        abs_path,
+                        tool,
+                        event_stream,
+                        cx,
+                    )?;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    async fn transition_to_buffer_resolved(
+        path_str: &str,
+        display_description: &str,
+        mode: StreamingEditFileMode,
+        tool: &StreamingEditFileTool,
+        event_stream: &ToolCallEventStream,
+        cx: &mut AsyncApp,
+    ) -> Result<Self, StreamingEditFileToolOutput> {
+        let path = PathBuf::from(path_str);
+        let project_path = cx
+            .update(|cx| resolve_path(mode, &path, &tool.project, cx))
+            .map_err(|e| StreamingEditFileToolOutput::Error {
+                error: e.to_string(),
+            })?;
+
+        let Some(abs_path) = cx.update(|cx| tool.project.read(cx).absolute_path(&project_path, cx))
+        else {
+            return Err(StreamingEditFileToolOutput::Error {
+                error: format!("File '{path_str}' does not exist"),
+            });
+        };
+
+        event_stream.update_fields(
+            ToolCallUpdateFields::new().locations(vec![ToolCallLocation::new(abs_path.clone())]),
+        );
+
+        cx.update(|cx| tool.authorize(&path, &display_description, event_stream, cx))
+            .await
+            .map_err(|e| StreamingEditFileToolOutput::Error {
+                error: e.to_string(),
+            })?;
+
+        let buffer = tool
+            .project
+            .update(cx, |project, cx| project.open_buffer(project_path, cx))
+            .await
+            .map_err(|e| StreamingEditFileToolOutput::Error {
+                error: e.to_string(),
+            })?;
+
+        ensure_buffer_saved(&buffer, &abs_path, tool, cx)?;
+
+        let diff = cx.new(|cx| Diff::new(buffer.clone(), cx));
+        event_stream.update_diff(diff.clone());
+        let finalize_diff_guard = util::defer(Box::new({
+            let diff = diff.downgrade();
+            let mut cx = cx.clone();
+            move || {
+                diff.update(&mut cx, |diff, cx| diff.finalize(cx)).ok();
+            }
+        }) as Box<dyn FnOnce()>);
+
+        let old_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+        let old_text = cx
+            .background_spawn({
+                let old_snapshot = old_snapshot.clone();
+                async move { Arc::new(old_snapshot.text()) }
+            })
+            .await;
+
+        Ok(Self::BufferResolved {
+            abs_path,
+            buffer,
+            old_text,
+            diff,
+            edit_state: IncrementalEditState::default(),
+            _finalize_diff_guard: finalize_diff_guard,
+        })
+    }
+
+    fn process_streaming_edits(
+        buffer: &Entity<Buffer>,
+        diff: &Entity<Diff>,
+        edit_state: &mut IncrementalEditState,
+        edits: &[PartialEditOperation],
+        abs_path: &PathBuf,
+        tool: &StreamingEditFileTool,
+        event_stream: &ToolCallEventStream,
+        cx: &mut AsyncApp,
+    ) -> Result<(), StreamingEditFileToolOutput> {
+        if edits.is_empty() {
+            return Ok(());
+        }
+
+        // Edits at indices applied_count..edits.len()-1 are newly complete
+        // (a subsequent edit exists, proving the LLM moved on).
+        // The last edit (edits.len()-1) is potentially still in progress.
+        let completed_count = edits.len().saturating_sub(1);
+
+        // Apply newly-complete edits
+        while edit_state.applied_count < completed_count {
+            let edit_index = edit_state.applied_count;
+            let partial_edit = &edits[edit_index];
+
+            let old_text = match &partial_edit.old_text {
+                Some(t) => t.clone(),
+                None => {
+                    edit_state.applied_count += 1;
+                    continue;
+                }
+            };
+            let new_text = partial_edit.new_text.clone().unwrap_or_default();
+
+            edit_state.in_progress_matcher = None;
+            edit_state.last_old_text_len = 0;
+
+            let edit_op = EditOperation {
+                old_text: old_text.clone(),
+                new_text: new_text.clone(),
+            };
+
+            let action_log = tool
+                .thread
+                .read_with(cx, |thread, _cx| thread.action_log().clone())
+                .ok();
+
+            // On the first edit, mark the buffer as read
+            if edit_state.applied_count == 0 {
+                if let Some(action_log) = &action_log {
+                    action_log.update(cx, |log, cx| {
+                        log.buffer_read(buffer.clone(), cx);
+                    });
+                }
+            }
+
+            resolve_reveal_and_apply_edit(
+                buffer,
+                diff,
+                &edit_op,
+                edit_index,
+                abs_path,
+                action_log.as_ref(),
+                event_stream,
+                cx,
+            )
+            .map_err(|e| StreamingEditFileToolOutput::Error {
+                error: e.to_string(),
+            })?;
+
+            edit_state.applied_count += 1;
+        }
+
+        // Feed the in-progress last edit's old_text to the matcher for live preview
+        if let Some(partial_edit) = edits.last() {
+            if let Some(old_text) = &partial_edit.old_text {
+                let old_text_len = old_text.len();
+                if old_text_len > edit_state.last_old_text_len {
+                    let new_chunk = &old_text[edit_state.last_old_text_len..];
+
+                    let matcher = edit_state.in_progress_matcher.get_or_insert_with(|| {
+                        let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.text_snapshot());
+                        StreamingFuzzyMatcher::new(snapshot)
+                    });
+
+                    if let Some(match_range) = matcher.push(new_chunk, None) {
+                        let anchor_range = buffer.read_with(cx, |buffer, _cx| {
+                            buffer.anchor_range_between(match_range.clone())
+                        });
+                        diff.update(cx, |card, cx| card.reveal_range(anchor_range, cx));
+                    }
+
+                    edit_state.last_old_text_len = old_text_len;
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
+fn ensure_buffer_saved(
+    buffer: &Entity<Buffer>,
+    abs_path: &PathBuf,
+    tool: &StreamingEditFileTool,
+    cx: &mut AsyncApp,
+) -> Result<(), StreamingEditFileToolOutput> {
+    let check_result = tool.thread.update(cx, |thread, cx| {
+        let last_read = thread.file_read_times.get(abs_path).copied();
+        let current = buffer
+            .read(cx)
+            .file()
+            .and_then(|file| file.disk_state().mtime());
+        let dirty = buffer.read(cx).is_dirty();
+        let has_save = thread.has_tool(SaveFileTool::NAME);
+        let has_restore = thread.has_tool(RestoreFileFromDiskTool::NAME);
+        (last_read, current, dirty, has_save, has_restore)
+    });
+
+    let Ok((last_read_mtime, current_mtime, is_dirty, has_save_tool, has_restore_tool)) =
+        check_result
+    else {
+        return Ok(());
+    };
+
+    if is_dirty {
+        let message = match (has_save_tool, has_restore_tool) {
+            (true, true) => {
+                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes. \
+                         If they want to keep them, ask for confirmation then use the save_file tool to save the file, then retry this edit. \
+                         If they want to discard them, ask for confirmation then use the restore_file_from_disk tool to restore the on-disk contents, then retry this edit."
+            }
+            (true, false) => {
+                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes. \
+                         If they want to keep them, ask for confirmation then use the save_file tool to save the file, then retry this edit. \
+                         If they want to discard them, ask the user to manually revert the file, then inform you when it's ok to proceed."
+            }
+            (false, true) => {
+                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes. \
+                         If they want to keep them, ask the user to manually save the file, then inform you when it's ok to proceed. \
+                         If they want to discard them, ask for confirmation then use the restore_file_from_disk tool to restore the on-disk contents, then retry this edit."
+            }
+            (false, false) => {
+                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes, \
+                         then ask them to save or revert the file manually and inform you when it's ok to proceed."
+            }
+        };
+        return Err(StreamingEditFileToolOutput::Error {
+            error: message.to_string(),
+        });
+    }
+
+    if let (Some(last_read), Some(current)) = (last_read_mtime, current_mtime) {
+        if current != last_read {
+            return Err(StreamingEditFileToolOutput::Error {
+                error: "The file has been modified since you last read it. \
+                             Please read the file again to get the current state before editing it."
+                    .to_string(),
+            });
+        }
+    }
+
+    Ok(())
 }
 
 #[derive(Debug, Serialize, Deserialize)]
@@ -179,24 +655,17 @@ impl StreamingEditFileTool {
         }
     }
 
-    pub fn with_thread(&self, new_thread: WeakEntity<Thread>) -> Self {
-        Self {
-            project: self.project.clone(),
-            thread: new_thread,
-            language_registry: self.language_registry.clone(),
-        }
-    }
-
     fn authorize(
         &self,
-        input: &StreamingEditFileToolInput,
+        path: &PathBuf,
+        description: &str,
         event_stream: &ToolCallEventStream,
         cx: &mut App,
     ) -> Task<Result<()>> {
         super::tool_permissions::authorize_file_edit(
             EditFileTool::NAME,
-            &input.path,
-            &input.display_description,
+            path,
+            description,
             &self.thread,
             event_stream,
             cx,
@@ -210,6 +679,10 @@ impl AgentTool for StreamingEditFileTool {
 
     const NAME: &'static str = "streaming_edit_file";
 
+    fn supports_input_streaming() -> bool {
+        true
+    }
+
     fn kind() -> acp::ToolKind {
         acp::ToolKind::Edit
     }
@@ -229,28 +702,30 @@ impl AgentTool for StreamingEditFileTool {
                         .read(cx)
                         .short_full_path_for_project_path(&project_path, cx)
                 })
-                .unwrap_or(input.path.to_string_lossy().into_owned())
+                .unwrap_or(input.path)
                 .into(),
             Err(raw_input) => {
                 if let Some(input) =
                     serde_json::from_value::<StreamingEditFileToolPartialInput>(raw_input).ok()
                 {
-                    let path = input.path.trim();
+                    let path = input.path.unwrap_or_default();
+                    let path = path.trim();
                     if !path.is_empty() {
                         return self
                             .project
                             .read(cx)
-                            .find_project_path(&input.path, cx)
+                            .find_project_path(&path, cx)
                             .and_then(|project_path| {
                                 self.project
                                     .read(cx)
                                     .short_full_path_for_project_path(&project_path, cx)
                             })
-                            .unwrap_or(input.path)
+                            .unwrap_or_else(|| path.to_string())
                             .into();
                     }
 
-                    let description = input.display_description.trim();
+                    let description = input.display_description.unwrap_or_default();
+                    let description = description.trim();
                     if !description.is_empty() {
                         return description.to_string().into();
                     }
@@ -263,230 +738,36 @@ impl AgentTool for StreamingEditFileTool {
 
     fn run(
         self: Arc<Self>,
-        input: ToolInput<Self::Input>,
+        mut input: ToolInput<Self::Input>,
         event_stream: ToolCallEventStream,
         cx: &mut App,
     ) -> Task<Result<Self::Output, Self::Output>> {
         cx.spawn(async move |cx: &mut AsyncApp| {
-            let input = input.recv().await.map_err(|e| {
-                StreamingEditFileToolOutput::Error {
-                    error: format!("Failed to receive tool input: {e}"),
-                }
-            })?;
-
-            let project = self
-                .thread
-                .read_with(cx, |thread, _cx| thread.project().clone())
-                .map_err(|_| StreamingEditFileToolOutput::Error {
-                    error: "thread was dropped".to_string(),
-                })?;
-
-            let (project_path, abs_path, authorize) = cx.update(|cx| {
-                let project_path =
-                    resolve_path(&input, project.clone(), cx).map_err(|err| {
-                        StreamingEditFileToolOutput::Error {
-                            error: err.to_string(),
-                        }
-                    })?;
-                let abs_path = project.read(cx).absolute_path(&project_path, cx);
-                if let Some(abs_path) = abs_path.clone() {
-                    event_stream.update_fields(
-                        ToolCallUpdateFields::new()
-                            .locations(vec![acp::ToolCallLocation::new(abs_path)]),
-                    );
-                }
-                let authorize = self.authorize(&input, &event_stream, cx);
-                Ok::<_, StreamingEditFileToolOutput>((project_path, abs_path, authorize))
-            })?;
-            let result: anyhow::Result<StreamingEditFileToolOutput> = async {
-                authorize.await?;
-
-                let buffer = project
-                    .update(cx, |project, cx| {
-                        project.open_buffer(project_path.clone(), cx)
-                    })
-                    .await?;
-
-                if let Some(abs_path) = abs_path.as_ref() {
-                    let (last_read_mtime, current_mtime, is_dirty, has_save_tool, has_restore_tool) =
-                        self.thread.update(cx, |thread, cx| {
-                            let last_read = thread.file_read_times.get(abs_path).copied();
-                            let current = buffer
-                                .read(cx)
-                                .file()
-                                .and_then(|file| file.disk_state().mtime());
-                            let dirty = buffer.read(cx).is_dirty();
-                            let has_save = thread.has_tool(SaveFileTool::NAME);
-                            let has_restore = thread.has_tool(RestoreFileFromDiskTool::NAME);
-                            (last_read, current, dirty, has_save, has_restore)
-                        })?;
-
-                    if is_dirty {
-                        let message = match (has_save_tool, has_restore_tool) {
-                            (true, true) => {
-                                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes. \
-                                 If they want to keep them, ask for confirmation then use the save_file tool to save the file, then retry this edit. \
-                                 If they want to discard them, ask for confirmation then use the restore_file_from_disk tool to restore the on-disk contents, then retry this edit."
-                            }
-                            (true, false) => {
-                                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes. \
-                                 If they want to keep them, ask for confirmation then use the save_file tool to save the file, then retry this edit. \
-                                 If they want to discard them, ask the user to manually revert the file, then inform you when it's ok to proceed."
-                            }
-                            (false, true) => {
-                                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes. \
-                                 If they want to keep them, ask the user to manually save the file, then inform you when it's ok to proceed. \
-                                 If they want to discard them, ask for confirmation then use the restore_file_from_disk tool to restore the on-disk contents, then retry this edit."
-                            }
-                            (false, false) => {
-                                "This file has unsaved changes. Ask the user whether they want to keep or discard those changes, \
-                                 then ask them to save or revert the file manually and inform you when it's ok to proceed."
-                            }
-                        };
-                        anyhow::bail!("{}", message);
-                    }
-
-                    if let (Some(last_read), Some(current)) = (last_read_mtime, current_mtime) {
-                        if current != last_read {
-                            anyhow::bail!(
-                                "The file {} has been modified since you last read it. \
-                                 Please read the file again to get the current state before editing it.",
-                                input.path.display()
-                            );
+            let mut state = StreamingEditState::Idle;
+            loop {
+                futures::select! {
+                    partial = input.recv_partial().fuse() => {
+                        let Some(partial_value) = partial else { break };
+                        if let Ok(parsed) = serde_json::from_value::<StreamingEditFileToolPartialInput>(partial_value) {
+                            state.process(parsed, &self, &event_stream, cx).await?;
                         }
                     }
-                }
-
-                let diff = cx.new(|cx| Diff::new(buffer.clone(), cx));
-                event_stream.update_diff(diff.clone());
-                let _finalize_diff = util::defer({
-                    let diff = diff.downgrade();
-                    let mut cx = cx.clone();
-                    move || {
-                        diff.update(&mut cx, |diff, cx| diff.finalize(cx)).ok();
-                    }
-                });
-
-                let old_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
-                let old_text = cx
-                    .background_spawn({
-                        let old_snapshot = old_snapshot.clone();
-                        async move { Arc::new(old_snapshot.text()) }
-                    })
-                    .await;
-
-                let action_log = self.thread.read_with(cx, |thread, _cx| thread.action_log().clone())?;
-
-                // Edit the buffer and report edits to the action log as part of the
-                // same effect cycle, otherwise the edit will be reported as if the
-                // user made it (due to the buffer subscription in action_log).
-                match input.mode {
-                    StreamingEditFileMode::Create | StreamingEditFileMode::Overwrite => {
-                        action_log.update(cx, |log, cx| {
-                            log.buffer_created(buffer.clone(), cx);
-                        });
-                        let content = input.content.ok_or_else(|| {
-                            anyhow!("'content' field is required for create and overwrite modes")
-                        })?;
-                        cx.update(|cx| {
-                            buffer.update(cx, |buffer, cx| {
-                                buffer.edit([(0..buffer.len(), content.as_str())], None, cx);
-                            });
-                            action_log.update(cx, |log, cx| {
-                                log.buffer_edited(buffer.clone(), cx);
-                            });
-                        });
-                    }
-                    StreamingEditFileMode::Edit => {
-                        action_log.update(cx, |log, cx| {
-                            log.buffer_read(buffer.clone(), cx);
-                        });
-                        let edits = input.edits.ok_or_else(|| {
-                            anyhow!("'edits' field is required for edit mode")
-                        })?;
-                        // apply_edits now handles buffer_edited internally in the same effect cycle
-                        apply_edits(&buffer, &action_log, &edits, &diff, &event_stream, &abs_path, cx)?;
-                    }
-                }
-
-                let format_on_save_enabled = buffer.read_with(cx, |buffer, cx| {
-                    let settings = language_settings::language_settings(
-                        buffer.language().map(|l| l.name()),
-                        buffer.file(),
-                        cx,
-                    );
-                    settings.format_on_save != FormatOnSave::Off
-                });
-
-                if format_on_save_enabled {
-                    action_log.update(cx, |log, cx| {
-                        log.buffer_edited(buffer.clone(), cx);
-                    });
-
-                    let format_task = project.update(cx, |project, cx| {
-                        project.format(
-                            HashSet::from_iter([buffer.clone()]),
-                            LspFormatTarget::Buffers,
-                            false,
-                            FormatTrigger::Save,
-                            cx,
-                        )
-                    });
-                    futures::select! {
-                        result = format_task.fuse() => { result.log_err(); },
-                        _ = event_stream.cancelled_by_user().fuse() => {
-                            anyhow::bail!("Edit cancelled by user");
-                        }
-                    };
-                }
-
-                let save_task = project
-                    .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx));
-                futures::select! {
-                    result = save_task.fuse() => { result?; },
                     _ = event_stream.cancelled_by_user().fuse() => {
-                        anyhow::bail!("Edit cancelled by user");
-                    }
-                };
-
-                action_log.update(cx, |log, cx| {
-                    log.buffer_edited(buffer.clone(), cx);
-                });
-
-                if let Some(abs_path) = abs_path.as_ref() {
-                    if let Some(new_mtime) = buffer.read_with(cx, |buffer, _| {
-                        buffer.file().and_then(|file| file.disk_state().mtime())
-                    }) {
-                        self.thread.update(cx, |thread, _| {
-                            thread.file_read_times.insert(abs_path.to_path_buf(), new_mtime);
-                        })?;
+                        return Err(StreamingEditFileToolOutput::Error {
+                            error: "Edit cancelled by user".to_string(),
+                        });
                     }
                 }
+            }
+            let full_input =
+                input
+                    .recv()
+                    .await
+                    .map_err(|e| StreamingEditFileToolOutput::Error {
+                        error: format!("Failed to receive tool input: {e}"),
+                    })?;
 
-                let new_snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
-                let (new_text, unified_diff) = cx
-                    .background_spawn({
-                        let new_snapshot = new_snapshot.clone();
-                        let old_text = old_text.clone();
-                        async move {
-                            let new_text = new_snapshot.text();
-                            let diff = language::unified_diff(&old_text, &new_text);
-                            (new_text, diff)
-                        }
-                    })
-                    .await;
-
-                let output = StreamingEditFileToolOutput::Success {
-                    input_path: input.path,
-                    new_text,
-                    old_text,
-                    diff: unified_diff,
-                };
-
-                Ok(output)
-            }.await;
-            result
-                .map_err(|e| StreamingEditFileToolOutput::Error { error: e.to_string() })
+            state.finalize(full_input, &self, &event_stream, cx).await
         })
     }
 
@@ -526,42 +807,28 @@ fn apply_edits(
     edits: &[EditOperation],
     diff: &Entity<Diff>,
     event_stream: &ToolCallEventStream,
-    abs_path: &Option<PathBuf>,
+    abs_path: &PathBuf,
     cx: &mut AsyncApp,
 ) -> Result<()> {
     let mut failed_edits = Vec::new();
     let mut ambiguous_edits = Vec::new();
     let mut resolved_edits: Vec<(Range<usize>, String)> = Vec::new();
 
-    // First pass: resolve all edits without applying them
     let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
     for (index, edit) in edits.iter().enumerate() {
-        let result = resolve_edit(&snapshot, edit);
-
-        match result {
-            Ok(Some((range, new_text))) => {
-                // Reveal the range in the diff view
-                let (start_anchor, end_anchor) = buffer.read_with(cx, |buffer, _cx| {
-                    (
-                        buffer.anchor_before(range.start),
-                        buffer.anchor_after(range.end),
-                    )
-                });
-                diff.update(cx, |card, cx| {
-                    card.reveal_range(start_anchor..end_anchor, cx)
-                });
+        match resolve_and_reveal_edit(buffer, diff, &snapshot, edit, cx) {
+            Ok((range, new_text)) => {
                 resolved_edits.push((range, new_text));
             }
-            Ok(None) => {
+            Err(EditResolveError::NotFound) => {
                 failed_edits.push(index);
             }
-            Err(ranges) => {
+            Err(EditResolveError::Ambiguous(ranges)) => {
                 ambiguous_edits.push((index, ranges));
             }
         }
     }
 
-    // Check for errors before applying any edits
     if !failed_edits.is_empty() {
         let indices = failed_edits
             .iter()
@@ -595,22 +862,17 @@ fn apply_edits(
         );
     }
 
-    // Sort edits by position so buffer.edit() can handle offset translation
     let mut edits_sorted = resolved_edits;
     edits_sorted.sort_by(|a, b| a.0.start.cmp(&b.0.start));
 
-    // Emit location for the earliest edit in the file
     if let Some((first_range, _)) = edits_sorted.first() {
-        if let Some(abs_path) = abs_path.clone() {
-            let line = snapshot.offset_to_point(first_range.start).row;
-            event_stream.update_fields(
-                ToolCallUpdateFields::new()
-                    .locations(vec![ToolCallLocation::new(abs_path).line(Some(line))]),
-            );
-        }
+        let line = snapshot.offset_to_point(first_range.start).row;
+        event_stream.update_fields(
+            ToolCallUpdateFields::new()
+                .locations(vec![ToolCallLocation::new(abs_path).line(Some(line))]),
+        );
     }
 
-    // Validate no overlaps (sorted ascending by start)
     for window in edits_sorted.windows(2) {
         if let [(earlier_range, _), (later_range, _)] = window
             && (earlier_range.end > later_range.start || earlier_range.start == later_range.start)
@@ -630,9 +892,6 @@ fn apply_edits(
         }
     }
 
-    // Apply all edits in a single batch and report to action_log in the same
-    // effect cycle. This prevents the buffer subscription from treating these
-    // as user edits.
     if !edits_sorted.is_empty() {
         cx.update(|cx| {
             buffer.update(cx, |buffer, cx| {
@@ -653,40 +912,111 @@ fn apply_edits(
     Ok(())
 }
 
-/// Resolves an edit operation by finding the matching text in the buffer.
-/// Returns Ok(Some((range, new_text))) if a unique match is found,
-/// Ok(None) if no match is found, or Err(ranges) if multiple matches are found.
-fn resolve_edit(
-    snapshot: &BufferSnapshot,
+/// Resolves, reveals, and applies a single edit to the buffer. Emits
+/// a location update and reports the change to the action log.
+fn resolve_reveal_and_apply_edit(
+    buffer: &Entity<Buffer>,
+    diff: &Entity<Diff>,
     edit: &EditOperation,
-) -> std::result::Result<Option<(Range<usize>, String)>, Vec<Range<usize>>> {
-    let mut matcher = StreamingFuzzyMatcher::new(snapshot.clone());
-    matcher.push(&edit.old_text, None);
-    let matches = matcher.finish();
-
-    if matches.is_empty() {
-        return Ok(None);
-    }
+    edit_index: usize,
+    abs_path: &PathBuf,
+    action_log: Option<&Entity<action_log::ActionLog>>,
+    event_stream: &ToolCallEventStream,
+    cx: &mut AsyncApp,
+) -> Result<()> {
+    let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
 
-    if matches.len() > 1 {
-        return Err(matches);
-    }
+    match resolve_and_reveal_edit(buffer, diff, &snapshot, edit, cx) {
+        Ok((range, new_text)) => {
+            let line = snapshot.offset_to_point(range.start).row;
+            event_stream.update_fields(
+                ToolCallUpdateFields::new()
+                    .locations(vec![ToolCallLocation::new(abs_path).line(Some(line))]),
+            );
 
-    let match_range = matches.into_iter().next().expect("checked len above");
-    Ok(Some((match_range, edit.new_text.clone())))
+            if let Some(action_log) = action_log {
+                cx.update(|cx| {
+                    buffer.update(cx, |buffer, cx| {
+                        buffer.edit([(range, new_text.as_str())], None, cx);
+                    });
+                    action_log.update(cx, |log, cx| {
+                        log.buffer_edited(buffer.clone(), cx);
+                    });
+                });
+            }
+
+            Ok(())
+        }
+        Err(EditResolveError::NotFound) => {
+            anyhow::bail!(
+                "Could not find matching text for edit at index {}. \
+                 The old_text did not match any content in the file. \
+                 Please read the file again to get the current content.",
+                edit_index
+            );
+        }
+        Err(EditResolveError::Ambiguous(ranges)) => {
+            let lines = ranges
+                .iter()
+                .map(|r| (snapshot.offset_to_point(r.start).row + 1).to_string())
+                .collect::<Vec<_>>()
+                .join(", ");
+            anyhow::bail!(
+                "Edit {} matched multiple locations in the file at lines: {}. \
+                 Please provide more context in old_text to uniquely identify the location.",
+                edit_index,
+                lines
+            );
+        }
+    }
+}
+
+enum EditResolveError {
+    NotFound,
+    Ambiguous(Vec<Range<usize>>),
+}
+
+/// Resolves an edit operation by finding matching text in the buffer,
+/// reveals the matched range in the diff view, and returns the resolved
+/// range and replacement text.
+fn resolve_and_reveal_edit(
+    buffer: &Entity<Buffer>,
+    diff: &Entity<Diff>,
+    snapshot: &BufferSnapshot,
+    edit: &EditOperation,
+    cx: &mut AsyncApp,
+) -> std::result::Result<(Range<usize>, String), EditResolveError> {
+    let mut matcher = StreamingFuzzyMatcher::new(snapshot.clone());
+    matcher.push(&edit.old_text, None);
+    let matches = matcher.finish();
+    if matches.is_empty() {
+        return Err(EditResolveError::NotFound);
+    }
+    if matches.len() > 1 {
+        return Err(EditResolveError::Ambiguous(matches));
+    }
+
+    let range = matches.into_iter().next().expect("checked len above");
+
+    let anchor_range =
+        buffer.read_with(cx, |buffer, _cx| buffer.anchor_range_between(range.clone()));
+    diff.update(cx, |card, cx| card.reveal_range(anchor_range, cx));
+
+    Ok((range, edit.new_text.clone()))
 }
 
 fn resolve_path(
-    input: &StreamingEditFileToolInput,
-    project: Entity<Project>,
+    mode: StreamingEditFileMode,
+    path: &PathBuf,
+    project: &Entity<Project>,
     cx: &mut App,
 ) -> Result<ProjectPath> {
     let project = project.read(cx);
 
-    match input.mode {
+    match mode {
         StreamingEditFileMode::Edit | StreamingEditFileMode::Overwrite => {
             let path = project
-                .find_project_path(&input.path, cx)
+                .find_project_path(&path, cx)
                 .context("Can't edit file: path not found")?;
 
             let entry = project

crates/agent_ui/src/buffer_codegen.rs 🔗

@@ -526,11 +526,13 @@ impl CodegenAlternative {
                     name: REWRITE_SECTION_TOOL_NAME.to_string(),
                     description: "Replaces text in <rewrite_this></rewrite_this> tags with your replacement_text.".to_string(),
                     input_schema: language_model::tool_schema::root_schema_for::<RewriteSectionInput>(tool_input_format).to_value(),
+                    use_input_streaming: false,
                 },
                 LanguageModelRequestTool {
                     name: FAILURE_MESSAGE_TOOL_NAME.to_string(),
                     description: "Use this tool to provide a message to the user when you're unable to complete a task.".to_string(),
                     input_schema: language_model::tool_schema::root_schema_for::<FailureMessageInput>(tool_input_format).to_value(),
+                    use_input_streaming: false,
                 },
             ];
 

crates/anthropic/src/anthropic.rs 🔗

@@ -906,11 +906,17 @@ pub struct ImageSource {
     pub data: String,
 }
 
+fn is_false(value: &bool) -> bool {
+    !value
+}
+
 #[derive(Debug, Serialize, Deserialize)]
 pub struct Tool {
     pub name: String,
     pub description: String,
     pub input_schema: serde_json::Value,
+    #[serde(default, skip_serializing_if = "is_false")]
+    pub eager_input_streaming: bool,
 }
 
 #[derive(Debug, Serialize, Deserialize)]

crates/language_model/src/request.rs 🔗

@@ -431,6 +431,7 @@ pub struct LanguageModelRequestTool {
     pub name: String,
     pub description: String,
     pub input_schema: serde_json::Value,
+    pub use_input_streaming: bool,
 }
 
 #[derive(Debug, PartialEq, Hash, Clone, Serialize, Deserialize)]

crates/language_models/src/provider/anthropic.rs 🔗

@@ -370,6 +370,7 @@ pub fn into_anthropic_count_tokens_request(
                 name: tool.name,
                 description: tool.description,
                 input_schema: tool.input_schema,
+                eager_input_streaming: tool.use_input_streaming,
             })
             .collect(),
         tool_choice: request.tool_choice.map(|choice| match choice {
@@ -713,6 +714,7 @@ pub fn into_anthropic(
                 name: tool.name,
                 description: tool.description,
                 input_schema: tool.input_schema,
+                eager_input_streaming: tool.use_input_streaming,
             })
             .collect(),
         tool_choice: request.tool_choice.map(|choice| match choice {

crates/language_models/src/provider/open_ai.rs 🔗

@@ -1566,6 +1566,7 @@ mod tests {
                 name: "get_weather".into(),
                 description: "Fetches the weather".into(),
                 input_schema: json!({ "type": "object" }),
+                use_input_streaming: false,
             }],
             tool_choice: Some(LanguageModelToolChoice::Any),
             stop: vec!["<STOP>".into()],