1use std::{
2 path::Path,
3 sync::{Arc, Weak},
4};
5
6use crate::{
7 Agent, AgentThread, AgentThreadEntryContent, AgentThreadSummary, Message, MessageChunk,
8 ResponseEvent, Role, ThreadId,
9};
10use agentic_coding_protocol::{self as acp};
11use anyhow::{Context as _, Result};
12use async_trait::async_trait;
13use collections::HashMap;
14use futures::channel::mpsc::UnboundedReceiver;
15use gpui::{AppContext, AsyncApp, Entity, Task};
16use parking_lot::Mutex;
17use project::Project;
18use smol::process::Child;
19use util::ResultExt;
20
21pub struct AcpAgent {
22 connection: Arc<acp::AgentConnection>,
23 threads: Mutex<HashMap<acp::ThreadId, Weak<AcpAgentThread>>>,
24 _handler_task: Task<()>,
25 _io_task: Task<()>,
26}
27
28struct AcpClientDelegate {
29 project: Entity<Project>,
30 cx: AsyncApp,
31 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
32}
33
34#[async_trait(?Send)]
35impl acp::Client for AcpClientDelegate {
36 async fn stream_message_chunk(
37 &self,
38 request: acp::StreamMessageChunkParams,
39 ) -> Result<acp::StreamMessageChunkResponse> {
40 Ok(acp::StreamMessageChunkResponse)
41 }
42
43 async fn read_file(&self, request: acp::ReadFileParams) -> Result<acp::ReadFileResponse> {
44 let cx = &mut self.cx.clone();
45 let buffer = self
46 .project
47 .update(cx, |project, cx| {
48 let path = project
49 .project_path_for_absolute_path(Path::new(&request.path), cx)
50 .context("Failed to get project path")?;
51 anyhow::Ok(project.open_buffer(path, cx))
52 })??
53 .await?;
54
55 buffer.update(cx, |buffer, _| acp::ReadFileResponse {
56 content: buffer.text(),
57 version: acp::FileVersion(0),
58 })
59 }
60
61 async fn glob_search(&self, request: acp::GlobSearchParams) -> Result<acp::GlobSearchResponse> {
62 todo!()
63 }
64
65 async fn end_turn(&self, request: acp::EndTurnParams) -> Result<acp::EndTurnResponse> {
66 todo!()
67 }
68}
69
70impl AcpAgent {
71 pub fn stdio(mut process: Child, project: Entity<Project>, cx: AsyncApp) -> Self {
72 let stdin = process.stdin.take().expect("process didn't have stdin");
73 let stdout = process.stdout.take().expect("process didn't have stdout");
74
75 let (connection, handler_fut, io_fut) = acp::AgentConnection::connect_to_agent(
76 AcpClientDelegate {
77 project,
78 cx: cx.clone(),
79 },
80 stdin,
81 stdout,
82 );
83
84 let io_task = cx.background_spawn(async move {
85 io_fut.await.log_err();
86 process.status().await.log_err();
87 });
88
89 Self {
90 connection: Arc::new(connection),
91 threads: Mutex::default(),
92 _handler_task: cx.foreground_executor().spawn(handler_fut),
93 _io_task: io_task,
94 }
95 }
96}
97
98impl Agent for AcpAgent {
99 type Thread = AcpAgentThread;
100
101 async fn threads(&self) -> Result<Vec<AgentThreadSummary>> {
102 let response = self.connection.request(acp::GetThreadsParams).await?;
103 response
104 .threads
105 .into_iter()
106 .map(|thread| {
107 Ok(AgentThreadSummary {
108 id: thread.id.into(),
109 title: thread.title,
110 created_at: thread.modified_at,
111 })
112 })
113 .collect()
114 }
115
116 async fn create_thread(&self) -> Result<Arc<Self::Thread>> {
117 let response = self.connection.request(acp::CreateThreadParams).await?;
118 let thread = Arc::new(AcpAgentThread {
119 id: response.thread_id.clone(),
120 connection: self.connection.clone(),
121 state: Mutex::new(AcpAgentThreadState { turn: None }),
122 });
123 self.threads
124 .lock()
125 .insert(response.thread_id, Arc::downgrade(&thread));
126 Ok(thread)
127 }
128
129 async fn open_thread(&self, id: ThreadId) -> Result<Arc<Self::Thread>> {
130 todo!()
131 }
132}
133
134pub struct AcpAgentThread {
135 id: acp::ThreadId,
136 connection: Arc<acp::AgentConnection>,
137 state: Mutex<AcpAgentThreadState>,
138}
139
140struct AcpAgentThreadState {
141 turn: Option<AcpAgentThreadTurn>,
142}
143
144struct AcpAgentThreadTurn {}
145
146impl AgentThread for AcpAgentThread {
147 async fn entries(&self) -> Result<Vec<AgentThreadEntryContent>> {
148 let response = self
149 .connection
150 .request(acp::GetThreadEntriesParams {
151 thread_id: self.id.clone(),
152 })
153 .await?;
154
155 Ok(response
156 .entries
157 .into_iter()
158 .map(|entry| match entry {
159 acp::ThreadEntry::Message { message } => {
160 AgentThreadEntryContent::Message(Message {
161 role: match message.role {
162 acp::Role::User => Role::User,
163 acp::Role::Assistant => Role::Assistant,
164 },
165 chunks: message
166 .chunks
167 .into_iter()
168 .map(|chunk| match chunk {
169 acp::MessageChunk::Text { chunk } => MessageChunk::Text { chunk },
170 })
171 .collect(),
172 })
173 }
174 acp::ThreadEntry::ReadFile { path, content } => {
175 AgentThreadEntryContent::ReadFile { path, content }
176 }
177 })
178 .collect())
179 }
180
181 async fn send(
182 &self,
183 message: crate::Message,
184 ) -> Result<UnboundedReceiver<Result<ResponseEvent>>> {
185 let response = self
186 .connection
187 .request(acp::SendMessageParams {
188 thread_id: self.id.clone(),
189 message: acp::Message {
190 role: match message.role {
191 Role::User => acp::Role::User,
192 Role::Assistant => acp::Role::Assistant,
193 },
194 chunks: message
195 .chunks
196 .into_iter()
197 .map(|chunk| match chunk {
198 MessageChunk::Text { chunk } => acp::MessageChunk::Text { chunk },
199 MessageChunk::File { content } => todo!(),
200 MessageChunk::Directory { path, contents } => todo!(),
201 MessageChunk::Symbol {
202 path,
203 range,
204 version,
205 name,
206 content,
207 } => todo!(),
208 MessageChunk::Thread { title, content } => todo!(),
209 MessageChunk::Fetch { url, content } => todo!(),
210 })
211 .collect(),
212 },
213 })
214 .await?;
215 todo!()
216 }
217}
218
219impl From<acp::ThreadId> for ThreadId {
220 fn from(thread_id: acp::ThreadId) -> Self {
221 Self(thread_id.0)
222 }
223}
224
225impl From<ThreadId> for acp::ThreadId {
226 fn from(thread_id: ThreadId) -> Self {
227 acp::ThreadId(thread_id.0)
228 }
229}