assistant: Make `/file` emit events as they occur (#19743)

Marshall Bowers created

This PR updates the `/file` command to emit its `SlashCommandEvent`s in
a way that can actually be streamed.

Previously it was buffering up all of the events and then returning them
all at once.

Note that we still don't yet support streaming in the context editor on
`main`, so there won't be any visible changes just yet.

Release Notes:

- N/A

Change summary

crates/assistant/src/slash_command/file_command.rs | 53 ++++++++-------
1 file changed, 29 insertions(+), 24 deletions(-)

Detailed changes

crates/assistant/src/slash_command/file_command.rs 🔗

@@ -4,6 +4,7 @@ use assistant_slash_command::{
     SlashCommandOutput, SlashCommandOutputSection, SlashCommandResult,
 };
 use futures::channel::mpsc;
+use futures::Stream;
 use fuzzy::PathMatch;
 use gpui::{AppContext, Model, Task, View, WeakView};
 use language::{BufferSnapshot, CodeLabel, HighlightId, LineEnding, LspAdapterDelegate};
@@ -196,7 +197,12 @@ impl SlashCommand for FileSlashCommand {
             return Task::ready(Err(anyhow!("missing path")));
         };
 
-        collect_files(workspace.read(cx).project().clone(), arguments, cx)
+        Task::ready(Ok(collect_files(
+            workspace.read(cx).project().clone(),
+            arguments,
+            cx,
+        )
+        .boxed()))
     }
 }
 
@@ -204,7 +210,7 @@ fn collect_files(
     project: Model<Project>,
     glob_inputs: &[String],
     cx: &mut AppContext,
-) -> Task<SlashCommandResult> {
+) -> impl Stream<Item = Result<SlashCommandEvent>> {
     let Ok(matchers) = glob_inputs
         .into_iter()
         .map(|glob_input| {
@@ -213,7 +219,7 @@ fn collect_files(
         })
         .collect::<anyhow::Result<Vec<custom_path_matcher::PathMatcher>>>()
     else {
-        return Task::ready(Err(anyhow!("invalid path")));
+        return futures::stream::once(async { Err(anyhow!("invalid path")) }).boxed();
     };
 
     let project_handle = project.downgrade();
@@ -357,8 +363,12 @@ fn collect_files(
                 events_tx.unbounded_send(Ok(SlashCommandEvent::EndSection { metadata: None }))?;
             }
         }
-        Ok(events_rx.boxed())
+
+        anyhow::Ok(())
     })
+    .detach_and_log_err(cx);
+
+    events_rx.boxed()
 }
 
 pub fn codeblock_fence_for_path(
@@ -550,6 +560,7 @@ mod test {
     use project::Project;
     use serde_json::json;
     use settings::SettingsStore;
+    use smol::stream::StreamExt;
 
     use crate::slash_command::file_command::collect_files;
 
@@ -590,11 +601,9 @@ mod test {
 
         let project = Project::test(fs, ["/root".as_ref()], cx).await;
 
-        let result_1 = cx
-            .update(|cx| collect_files(project.clone(), &["root/dir".to_string()], cx))
-            .await
-            .unwrap();
-        let result_1 = SlashCommandOutput::from_event_stream(result_1)
+        let result_1 =
+            cx.update(|cx| collect_files(project.clone(), &["root/dir".to_string()], cx));
+        let result_1 = SlashCommandOutput::from_event_stream(result_1.boxed())
             .await
             .unwrap();
 
@@ -602,20 +611,16 @@ mod test {
         // 4 files + 2 directories
         assert_eq!(result_1.sections.len(), 6);
 
-        let result_2 = cx
-            .update(|cx| collect_files(project.clone(), &["root/dir/".to_string()], cx))
-            .await
-            .unwrap();
-        let result_2 = SlashCommandOutput::from_event_stream(result_2)
+        let result_2 =
+            cx.update(|cx| collect_files(project.clone(), &["root/dir/".to_string()], cx));
+        let result_2 = SlashCommandOutput::from_event_stream(result_2.boxed())
             .await
             .unwrap();
 
         assert_eq!(result_1, result_2);
 
-        let result = cx
-            .update(|cx| collect_files(project.clone(), &["root/dir*".to_string()], cx))
-            .await
-            .unwrap();
+        let result =
+            cx.update(|cx| collect_files(project.clone(), &["root/dir*".to_string()], cx).boxed());
         let result = SlashCommandOutput::from_event_stream(result).await.unwrap();
 
         assert!(result.text.starts_with("root/dir"));
@@ -659,11 +664,11 @@ mod test {
 
         let project = Project::test(fs, ["/zed".as_ref()], cx).await;
 
-        let result = cx
-            .update(|cx| collect_files(project.clone(), &["zed/assets/themes".to_string()], cx))
+        let result =
+            cx.update(|cx| collect_files(project.clone(), &["zed/assets/themes".to_string()], cx));
+        let result = SlashCommandOutput::from_event_stream(result.boxed())
             .await
             .unwrap();
-        let result = SlashCommandOutput::from_event_stream(result).await.unwrap();
 
         // Sanity check
         assert!(result.text.starts_with("zed/assets/themes\n"));
@@ -721,11 +726,11 @@ mod test {
 
         let project = Project::test(fs, ["/zed".as_ref()], cx).await;
 
-        let result = cx
-            .update(|cx| collect_files(project.clone(), &["zed/assets/themes".to_string()], cx))
+        let result =
+            cx.update(|cx| collect_files(project.clone(), &["zed/assets/themes".to_string()], cx));
+        let result = SlashCommandOutput::from_event_stream(result.boxed())
             .await
             .unwrap();
-        let result = SlashCommandOutput::from_event_stream(result).await.unwrap();
 
         assert!(result.text.starts_with("zed/assets/themes\n"));
         assert_eq!(result.sections[0].label, "zed/assets/themes/LICENSE");