acp.rs

  1use std::{
  2    path::Path,
  3    sync::{Arc, Weak},
  4};
  5
  6use crate::{
  7    Agent, AgentThread, AgentThreadEntryContent, AgentThreadSummary, Message, MessageChunk,
  8    ResponseEvent, Role, ThreadId,
  9};
 10use agentic_coding_protocol::{self as acp};
 11use anyhow::{Context as _, Result};
 12use async_trait::async_trait;
 13use collections::HashMap;
 14use futures::channel::mpsc::UnboundedReceiver;
 15use gpui::{AppContext, AsyncApp, Entity, Task};
 16use parking_lot::Mutex;
 17use project::Project;
 18use smol::process::Child;
 19use util::ResultExt;
 20
 21pub struct AcpAgent {
 22    connection: Arc<acp::AgentConnection>,
 23    threads: Mutex<HashMap<acp::ThreadId, Weak<AcpAgentThread>>>,
 24    _handler_task: Task<()>,
 25    _io_task: Task<()>,
 26}
 27
 28struct AcpClientDelegate {
 29    project: Entity<Project>,
 30    cx: AsyncApp,
 31    // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
 32}
 33
 34#[async_trait(?Send)]
 35impl acp::Client for AcpClientDelegate {
 36    async fn stream_message_chunk(
 37        &self,
 38        request: acp::StreamMessageChunkParams,
 39    ) -> Result<acp::StreamMessageChunkResponse> {
 40        Ok(acp::StreamMessageChunkResponse)
 41    }
 42
 43    async fn read_file(&self, request: acp::ReadFileParams) -> Result<acp::ReadFileResponse> {
 44        let cx = &mut self.cx.clone();
 45        let buffer = self
 46            .project
 47            .update(cx, |project, cx| {
 48                let path = project
 49                    .project_path_for_absolute_path(Path::new(&request.path), cx)
 50                    .context("Failed to get project path")?;
 51                anyhow::Ok(project.open_buffer(path, cx))
 52            })??
 53            .await?;
 54
 55        buffer.update(cx, |buffer, _| acp::ReadFileResponse {
 56            content: buffer.text(),
 57            version: acp::FileVersion(0),
 58        })
 59    }
 60
 61    async fn glob_search(&self, request: acp::GlobSearchParams) -> Result<acp::GlobSearchResponse> {
 62        todo!()
 63    }
 64
 65    async fn end_turn(&self, request: acp::EndTurnParams) -> Result<acp::EndTurnResponse> {
 66        todo!()
 67    }
 68}
 69
 70impl AcpAgent {
 71    pub fn stdio(mut process: Child, project: Entity<Project>, cx: AsyncApp) -> Self {
 72        let stdin = process.stdin.take().expect("process didn't have stdin");
 73        let stdout = process.stdout.take().expect("process didn't have stdout");
 74
 75        let (connection, handler_fut, io_fut) = acp::AgentConnection::connect_to_agent(
 76            AcpClientDelegate {
 77                project,
 78                cx: cx.clone(),
 79            },
 80            stdin,
 81            stdout,
 82        );
 83
 84        let io_task = cx.background_spawn(async move {
 85            io_fut.await.log_err();
 86            process.status().await.log_err();
 87        });
 88
 89        Self {
 90            connection: Arc::new(connection),
 91            threads: Mutex::default(),
 92            _handler_task: cx.foreground_executor().spawn(handler_fut),
 93            _io_task: io_task,
 94        }
 95    }
 96}
 97
 98impl Agent for AcpAgent {
 99    type Thread = AcpAgentThread;
100
101    async fn threads(&self) -> Result<Vec<AgentThreadSummary>> {
102        let response = self.connection.request(acp::GetThreadsParams).await?;
103        response
104            .threads
105            .into_iter()
106            .map(|thread| {
107                Ok(AgentThreadSummary {
108                    id: thread.id.into(),
109                    title: thread.title,
110                    created_at: thread.modified_at,
111                })
112            })
113            .collect()
114    }
115
116    async fn create_thread(&self) -> Result<Arc<Self::Thread>> {
117        let response = self.connection.request(acp::CreateThreadParams).await?;
118        let thread = Arc::new(AcpAgentThread {
119            id: response.thread_id.clone(),
120            connection: self.connection.clone(),
121            state: Mutex::new(AcpAgentThreadState { turn: None }),
122        });
123        self.threads
124            .lock()
125            .insert(response.thread_id, Arc::downgrade(&thread));
126        Ok(thread)
127    }
128
129    async fn open_thread(&self, id: ThreadId) -> Result<Arc<Self::Thread>> {
130        todo!()
131    }
132}
133
134pub struct AcpAgentThread {
135    id: acp::ThreadId,
136    connection: Arc<acp::AgentConnection>,
137    state: Mutex<AcpAgentThreadState>,
138}
139
140struct AcpAgentThreadState {
141    turn: Option<AcpAgentThreadTurn>,
142}
143
144struct AcpAgentThreadTurn {}
145
146impl AgentThread for AcpAgentThread {
147    async fn entries(&self) -> Result<Vec<AgentThreadEntryContent>> {
148        let response = self
149            .connection
150            .request(acp::GetThreadEntriesParams {
151                thread_id: self.id.clone(),
152            })
153            .await?;
154
155        Ok(response
156            .entries
157            .into_iter()
158            .map(|entry| match entry {
159                acp::ThreadEntry::Message { message } => {
160                    AgentThreadEntryContent::Message(Message {
161                        role: match message.role {
162                            acp::Role::User => Role::User,
163                            acp::Role::Assistant => Role::Assistant,
164                        },
165                        chunks: message
166                            .chunks
167                            .into_iter()
168                            .map(|chunk| match chunk {
169                                acp::MessageChunk::Text { chunk } => MessageChunk::Text { chunk },
170                            })
171                            .collect(),
172                    })
173                }
174                acp::ThreadEntry::ReadFile { path, content } => {
175                    AgentThreadEntryContent::ReadFile { path, content }
176                }
177            })
178            .collect())
179    }
180
181    async fn send(
182        &self,
183        message: crate::Message,
184    ) -> Result<UnboundedReceiver<Result<ResponseEvent>>> {
185        let response = self
186            .connection
187            .request(acp::SendMessageParams {
188                thread_id: self.id.clone(),
189                message: acp::Message {
190                    role: match message.role {
191                        Role::User => acp::Role::User,
192                        Role::Assistant => acp::Role::Assistant,
193                    },
194                    chunks: message
195                        .chunks
196                        .into_iter()
197                        .map(|chunk| match chunk {
198                            MessageChunk::Text { chunk } => acp::MessageChunk::Text { chunk },
199                            MessageChunk::File { content } => todo!(),
200                            MessageChunk::Directory { path, contents } => todo!(),
201                            MessageChunk::Symbol {
202                                path,
203                                range,
204                                version,
205                                name,
206                                content,
207                            } => todo!(),
208                            MessageChunk::Thread { title, content } => todo!(),
209                            MessageChunk::Fetch { url, content } => todo!(),
210                        })
211                        .collect(),
212                },
213            })
214            .await?;
215        todo!()
216    }
217}
218
219impl From<acp::ThreadId> for ThreadId {
220    fn from(thread_id: acp::ThreadId) -> Self {
221        Self(thread_id.0)
222    }
223}
224
225impl From<ThreadId> for acp::ThreadId {
226    fn from(thread_id: ThreadId) -> Self {
227        acp::ThreadId(thread_id.0)
228    }
229}