From f1bd531a3287a6beb103b0980550e6c96affa9ed Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Tue, 24 Jun 2025 16:30:29 -0300 Subject: [PATCH] Handle pending requests Co-authored-by: Ben Brandt --- Cargo.lock | 1 + crates/agent2/Cargo.toml | 1 + crates/agent2/src/agent2.rs | 63 ++++++++++++++++++++++++++++--------- 3 files changed, 50 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ba2b4d862bff0f53c1c762844812a30fe7b067b..d089d533dcc3d2f632ce459767dba03d35833e65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,6 +117,7 @@ dependencies = [ "serde_json", "util", "uuid", + "workspace-hack", ] [[package]] diff --git a/crates/agent2/Cargo.toml b/crates/agent2/Cargo.toml index 7bad05021614f0a30265e0993692a02d158203c4..b61acffb7817af658f50e31460bf17690011a1ec 100644 --- a/crates/agent2/Cargo.toml +++ b/crates/agent2/Cargo.toml @@ -24,6 +24,7 @@ futures.workspace = true gpui.workspace = true project.workspace = true uuid.workspace = true +workspace-hack.workspace = true [dev-dependencies] gpui = { workspace = true, "features" = ["test-support"] } diff --git a/crates/agent2/src/agent2.rs b/crates/agent2/src/agent2.rs index 30377f2cfba72fff67a5e8efe0ccd1652ad657c8..64d717209e1ccc855b4ef7c3e555ced1a3bf4675 100644 --- a/crates/agent2/src/agent2.rs +++ b/crates/agent2/src/agent2.rs @@ -1,9 +1,14 @@ use anyhow::{Result, anyhow}; use chrono::{DateTime, Utc}; -use futures::{StreamExt, channel::oneshot, stream::BoxStream}; +use futures::{ + FutureExt, StreamExt, + channel::{mpsc, oneshot}, + select_biased, + stream::{BoxStream, FuturesUnordered}, +}; use gpui::{AppContext, AsyncApp, Context, Entity, Task, WeakEntity}; use project::Project; -use std::{ops::Range, path::PathBuf, sync::Arc}; +use std::{future, ops::Range, path::PathBuf, pin::pin, sync::Arc}; use uuid::Uuid; pub trait Agent: 'static { @@ -19,7 +24,7 @@ pub trait AgentThread: 'static { fn send( &self, message: Message, - ) -> impl Future>>>; + ) -> impl Future>>>; } pub enum ResponseEvent { @@ -111,7 +116,7 @@ pub struct ThreadEntryId(usize); impl ThreadEntryId { pub fn post_inc(&mut self) -> Self { - let id = *self; + let ed = *self; self.0 += 1; id } @@ -260,19 +265,47 @@ impl Thread { let agent_thread = self.agent_thread.clone(); cx.spawn(async move |this, cx| { let mut events = agent_thread.send(message).await?; - while let Some(event) = events.next().await { - match event { - Ok(ResponseEvent::MessageResponse(message)) => { - this.update(cx, |this, cx| this.handle_message_response(message, cx))? - .await?; + let mut pending_event_handlers = FuturesUnordered::new(); + + loop { + let mut next_event_handler_result = pin!(async { + if pending_event_handlers.is_empty() { + future::pending::<()>().await; } - Ok(ResponseEvent::ReadFileRequest(request)) => { - this.update(cx, |this, cx| this.handle_read_file_request(request, cx))? - .await?; + + pending_event_handlers.next().await + }.fuse()); + + select_biased! { + event = events.next() => { + let Some(event) = event else { + while let Some(result) = pending_event_handlers.next().await { + result?; + } + + break; + }; + + let task = match event { + Ok(ResponseEvent::MessageResponse(message)) => { + this.update(cx, |this, cx| this.handle_message_response(message, cx))? + } + Ok(ResponseEvent::ReadFileRequest(request)) => { + this.update(cx, |this, cx| this.handle_read_file_request(request, cx))? + } + Err(_) => todo!(), + }; + pending_event_handlers.push(task); + } + result = next_event_handler_result => { + // Event handlers should only return errors that are + // unrecoverable and should therefore stop this turn of + // the agentic loop. + result.unwrap()?; } - Err(_) => todo!(), } } + Ok(()) }) } @@ -388,8 +421,8 @@ mod tests { async fn send( &self, - message: Message, - ) -> Result>> { + _message: Message, + ) -> Result>> { todo!() } }