Start implementing send

Ben Brandt , Antonio Scandurra , and Agus Zubiaga created

Co-authored-by: Antonio Scandurra <me@as-cii.com>
Co-authored-by: Agus Zubiaga <agus@zed.dev>

Change summary

Cargo.lock                  |   1 
crates/agent2/Cargo.toml    |   1 
crates/agent2/src/acp.rs    | 109 +++++++++++++++++++++++++++++++++++---
crates/agent2/src/agent2.rs |   8 +-
4 files changed, 105 insertions(+), 14 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -118,6 +118,7 @@ dependencies = [
  "futures 0.3.31",
  "gpui",
  "language",
+ "parking_lot",
  "project",
  "serde_json",
  "settings",

crates/agent2/Cargo.toml 🔗

@@ -26,6 +26,7 @@ chrono.workspace = true
 futures.workspace = true
 language.workspace = true
 gpui.workspace = true
+parking_lot.workspace = true
 project.workspace = true
 smol.workspace = true
 uuid.workspace = true

crates/agent2/src/acp.rs 🔗

@@ -1,19 +1,26 @@
-use std::path::Path;
+use std::{
+    path::Path,
+    sync::{Arc, Weak},
+};
 
 use crate::{
-    Agent, AgentThread, AgentThreadEntryContent, AgentThreadSummary, ResponseEvent, ThreadId,
+    Agent, AgentThread, AgentThreadEntryContent, AgentThreadSummary, Message, MessageChunk,
+    ResponseEvent, Role, ThreadId,
 };
 use agentic_coding_protocol::{self as acp};
 use anyhow::{Context as _, Result};
 use async_trait::async_trait;
+use collections::HashMap;
 use futures::channel::mpsc::UnboundedReceiver;
 use gpui::{AppContext, AsyncApp, Entity, Task};
+use parking_lot::Mutex;
 use project::Project;
 use smol::process::Child;
 use util::ResultExt;
 
 pub struct AcpAgent {
-    connection: acp::AgentConnection,
+    connection: Arc<acp::AgentConnection>,
+    threads: Mutex<HashMap<acp::ThreadId, Weak<AcpAgentThread>>>,
     _handler_task: Task<()>,
     _io_task: Task<()>,
 }
@@ -26,6 +33,13 @@ struct AcpClientDelegate {
 
 #[async_trait(?Send)]
 impl acp::Client for AcpClientDelegate {
+    async fn stream_message_chunk(
+        &self,
+        request: acp::StreamMessageChunkParams,
+    ) -> Result<acp::StreamMessageChunkResponse> {
+        Ok(acp::StreamMessageChunkResponse)
+    }
+
     async fn read_file(&self, request: acp::ReadFileParams) -> Result<acp::ReadFileResponse> {
         let cx = &mut self.cx.clone();
         let buffer = self
@@ -73,7 +87,8 @@ impl AcpAgent {
         });
 
         Self {
-            connection,
+            connection: Arc::new(connection),
+            threads: Mutex::default(),
             _handler_task: cx.foreground_executor().spawn(handler_fut),
             _io_task: io_task,
         }
@@ -98,31 +113,105 @@ impl Agent for AcpAgent {
             .collect()
     }
 
-    async fn create_thread(&self) -> Result<Self::Thread> {
+    async fn create_thread(&self) -> Result<Arc<Self::Thread>> {
         let response = self.connection.request(acp::CreateThreadParams).await?;
-        Ok(AcpAgentThread {
-            id: response.thread_id,
-        })
+        let thread = Arc::new(AcpAgentThread {
+            id: response.thread_id.clone(),
+            connection: self.connection.clone(),
+            state: Mutex::new(AcpAgentThreadState { turn: None }),
+        });
+        self.threads
+            .lock()
+            .insert(response.thread_id, Arc::downgrade(&thread));
+        Ok(thread)
     }
 
-    async fn open_thread(&self, id: ThreadId) -> Result<Self::Thread> {
+    async fn open_thread(&self, id: ThreadId) -> Result<Arc<Self::Thread>> {
         todo!()
     }
 }
 
 pub struct AcpAgentThread {
     id: acp::ThreadId,
+    connection: Arc<acp::AgentConnection>,
+    state: Mutex<AcpAgentThreadState>,
+}
+
+struct AcpAgentThreadState {
+    turn: Option<AcpAgentThreadTurn>,
 }
 
+struct AcpAgentThreadTurn {}
+
 impl AgentThread for AcpAgentThread {
     async fn entries(&self) -> Result<Vec<AgentThreadEntryContent>> {
-        todo!()
+        let response = self
+            .connection
+            .request(acp::GetThreadEntriesParams {
+                thread_id: self.id.clone(),
+            })
+            .await?;
+
+        Ok(response
+            .entries
+            .into_iter()
+            .map(|entry| match entry {
+                acp::ThreadEntry::Message { message } => {
+                    AgentThreadEntryContent::Message(Message {
+                        role: match message.role {
+                            acp::Role::User => Role::User,
+                            acp::Role::Assistant => Role::Assistant,
+                        },
+                        chunks: message
+                            .chunks
+                            .into_iter()
+                            .map(|chunk| match chunk {
+                                acp::MessageChunk::Text { chunk } => MessageChunk::Text { chunk },
+                            })
+                            .collect(),
+                    })
+                }
+                acp::ThreadEntry::ReadFile { path, content } => {
+                    AgentThreadEntryContent::ReadFile { path, content }
+                }
+            })
+            .collect())
     }
 
     async fn send(
         &self,
         message: crate::Message,
     ) -> Result<UnboundedReceiver<Result<ResponseEvent>>> {
+        let response = self
+            .connection
+            .request(acp::SendMessageParams {
+                thread_id: self.id.clone(),
+                message: acp::Message {
+                    role: match message.role {
+                        Role::User => acp::Role::User,
+                        Role::Assistant => acp::Role::Assistant,
+                    },
+                    chunks: message
+                        .chunks
+                        .into_iter()
+                        .map(|chunk| match chunk {
+                            MessageChunk::Text { chunk } => acp::MessageChunk::Text { chunk },
+                            MessageChunk::File { content } => todo!(),
+                            MessageChunk::Directory { path, contents } => todo!(),
+                            MessageChunk::Symbol {
+                                path,
+                                range,
+                                version,
+                                name,
+                                content,
+                            } => todo!(),
+                            MessageChunk::Thread { title, content } => todo!(),
+                            MessageChunk::Fetch { url, content } => todo!(),
+                        })
+                        .collect(),
+                },
+            })
+            .await?;
         todo!()
     }
 }

crates/agent2/src/agent2.rs 🔗

@@ -16,8 +16,8 @@ pub trait Agent: 'static {
     type Thread: AgentThread;
 
     fn threads(&self) -> impl Future<Output = Result<Vec<AgentThreadSummary>>>;
-    fn create_thread(&self) -> impl Future<Output = Result<Self::Thread>>;
-    fn open_thread(&self, id: ThreadId) -> impl Future<Output = Result<Self::Thread>>;
+    fn create_thread(&self) -> impl Future<Output = Result<Arc<Self::Thread>>>;
+    fn open_thread(&self, id: ThreadId) -> impl Future<Output = Result<Arc<Self::Thread>>>;
 }
 
 pub trait AgentThread: 'static {
@@ -182,7 +182,7 @@ impl<T: Agent> ThreadStore<T> {
         let project = self.project.clone();
         cx.spawn(async move |_, cx| {
             let agent_thread = agent.open_thread(id).await?;
-            Thread::load(Arc::new(agent_thread), project, cx).await
+            Thread::load(agent_thread, project, cx).await
         })
     }
 
@@ -192,7 +192,7 @@ impl<T: Agent> ThreadStore<T> {
         let project = self.project.clone();
         cx.spawn(async move |_, cx| {
             let agent_thread = agent.create_thread().await?;
-            Thread::load(Arc::new(agent_thread), project, cx).await
+            Thread::load(agent_thread, project, cx).await
         })
     }
 }