Handle pending requests

Agus Zubiaga and Ben Brandt created

Co-authored-by: Ben Brandt <benjamin.j.brandt@gmail.com>

Change summary

Cargo.lock                  |  1 
crates/agent2/Cargo.toml    |  1 
crates/agent2/src/agent2.rs | 63 +++++++++++++++++++++++++++++---------
3 files changed, 50 insertions(+), 15 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -117,6 +117,7 @@ dependencies = [
  "serde_json",
  "util",
  "uuid",
+ "workspace-hack",
 ]
 
 [[package]]

crates/agent2/Cargo.toml 🔗

@@ -24,6 +24,7 @@ futures.workspace = true
 gpui.workspace = true
 project.workspace = true
 uuid.workspace = true
+workspace-hack.workspace = true
 
 [dev-dependencies]
 gpui = { workspace = true, "features" = ["test-support"] }

crates/agent2/src/agent2.rs 🔗

@@ -1,9 +1,14 @@
 use anyhow::{Result, anyhow};
 use chrono::{DateTime, Utc};
-use futures::{StreamExt, channel::oneshot, stream::BoxStream};
+use futures::{
+    FutureExt, StreamExt,
+    channel::{mpsc, oneshot},
+    select_biased,
+    stream::{BoxStream, FuturesUnordered},
+};
 use gpui::{AppContext, AsyncApp, Context, Entity, Task, WeakEntity};
 use project::Project;
-use std::{ops::Range, path::PathBuf, sync::Arc};
+use std::{future, ops::Range, path::PathBuf, pin::pin, sync::Arc};
 use uuid::Uuid;
 
 pub trait Agent: 'static {
@@ -19,7 +24,7 @@ pub trait AgentThread: 'static {
     fn send(
         &self,
         message: Message,
-    ) -> impl Future<Output = Result<BoxStream<'static, Result<ResponseEvent>>>>;
+    ) -> impl Future<Output = Result<mpsc::UnboundedReceiver<Result<ResponseEvent>>>>;
 }
 
 pub enum ResponseEvent {
@@ -111,7 +116,7 @@ pub struct ThreadEntryId(usize);
 
 impl ThreadEntryId {
     pub fn post_inc(&mut self) -> Self {
-        let id = *self;
+        let ed = *self;
         self.0 += 1;
         id
     }
@@ -260,19 +265,47 @@ impl<T: AgentThread> Thread<T> {
         let agent_thread = self.agent_thread.clone();
         cx.spawn(async move |this, cx| {
             let mut events = agent_thread.send(message).await?;
-            while let Some(event) = events.next().await {
-                match event {
-                    Ok(ResponseEvent::MessageResponse(message)) => {
-                        this.update(cx, |this, cx| this.handle_message_response(message, cx))?
-                            .await?;
+            let mut pending_event_handlers = FuturesUnordered::new();
+
+            loop {
+                let mut next_event_handler_result = pin!(async {
+                    if pending_event_handlers.is_empty() {
+                        future::pending::<()>().await;
                     }
-                    Ok(ResponseEvent::ReadFileRequest(request)) => {
-                        this.update(cx, |this, cx| this.handle_read_file_request(request, cx))?
-                            .await?;
+
+                    pending_event_handlers.next().await
+                }.fuse());
+
+                select_biased! {
+                    event = events.next() => {
+                        let Some(event) = event else {
+                            while let Some(result) = pending_event_handlers.next().await {
+                                result?;
+                            }
+
+                            break;
+                        };
+
+                        let task = match event {
+                            Ok(ResponseEvent::MessageResponse(message)) => {
+                                this.update(cx, |this, cx| this.handle_message_response(message, cx))?
+                            }
+                            Ok(ResponseEvent::ReadFileRequest(request)) => {
+                                this.update(cx, |this, cx| this.handle_read_file_request(request, cx))?
+                            }
+                            Err(_) => todo!(),
+                        };
+                        pending_event_handlers.push(task);
+                    }
+                    result = next_event_handler_result => {
+                        // Event handlers should only return errors that are
+                        // unrecoverable and should therefore stop this turn of
+                        // the agentic loop.
+                        result.unwrap()?;
                     }
-                    Err(_) => todo!(),
                 }
             }
+
             Ok(())
         })
     }
@@ -388,8 +421,8 @@ mod tests {
 
         async fn send(
             &self,
-            message: Message,
-        ) -> Result<BoxStream<'static, Result<ResponseEvent>>> {
+            _message: Message,
+        ) -> Result<mpsc::UnboundedReceiver<Result<ResponseEvent>>> {
             todo!()
         }
     }