From 92adcb6e6379d49a84a229bd543a11ccb40c33dc Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 1 Jul 2025 19:01:02 +0200 Subject: [PATCH] WIP --- crates/acp/src/acp.rs | 556 ++++++++---------- crates/acp/src/agent2.rs | 352 ----------- crates/acp/src/server.rs | 282 +++++++++ .../src/{thread_element.rs => thread_view.rs} | 14 +- crates/agent_ui/src/agent_panel.rs | 30 +- 5 files changed, 553 insertions(+), 681 deletions(-) delete mode 100644 crates/acp/src/agent2.rs create mode 100644 crates/acp/src/server.rs rename crates/acp/src/{thread_element.rs => thread_view.rs} (95%) diff --git a/crates/acp/src/acp.rs b/crates/acp/src/acp.rs index d3327c82b3d9e3ab6cc42c90d4cee1900cc36ba9..f6b58fea3a97348b2e47fe0efc516e62a81e5c7a 100644 --- a/crates/acp/src/acp.rs +++ b/crates/acp/src/acp.rs @@ -1,345 +1,289 @@ -use std::{io::Write as _, path::Path, sync::Arc}; - -use crate::{ - Agent, AgentThreadEntryContent, AgentThreadSummary, Message, MessageChunk, Role, Thread, - ThreadEntryId, ThreadId, -}; -use agentic_coding_protocol as acp; -use anyhow::{Context as _, Result}; -use async_trait::async_trait; -use collections::HashMap; -use gpui::{App, AppContext, AsyncApp, Context, Entity, Task, WeakEntity}; -use parking_lot::Mutex; +mod server; +mod thread_view; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use gpui::{Context, Entity, SharedString, Task}; use project::Project; -use smol::process::Child; -use util::ResultExt; +use std::{ops::Range, path::PathBuf, sync::Arc}; -pub struct AcpAgent { - connection: Arc, - threads: Arc>>>, - project: Entity, - _handler_task: Task<()>, - _io_task: Task<()>, +pub use server::AcpServer; +pub use thread_view::AcpThreadView; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ThreadId(SharedString); + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct FileVersion(u64); + +#[derive(Debug)] +pub struct AgentThreadSummary { + pub id: ThreadId, + pub title: String, + pub created_at: DateTime, } -struct AcpClientDelegate { - project: Entity, - threads: Arc>>>, - cx: AsyncApp, - // sent_buffer_versions: HashMap, HashMap>, +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FileContent { + pub path: PathBuf, + pub version: FileVersion, + pub content: SharedString, } -impl AcpClientDelegate { - fn new( - project: Entity, - threads: Arc>>>, - cx: AsyncApp, - ) -> Self { - Self { - project, - threads, - cx: cx, - } - } +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum Role { + User, + Assistant, +} - fn update_thread( - &self, - thread_id: &ThreadId, - cx: &mut App, - callback: impl FnMut(&mut Thread, &mut Context) -> R, - ) -> Option { - let thread = self.threads.lock().get(&thread_id)?.clone(); - let Some(thread) = thread.upgrade() else { - self.threads.lock().remove(&thread_id); - return None; - }; - Some(thread.update(cx, callback)) - } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Message { + pub role: Role, + pub chunks: Vec, } -#[async_trait(?Send)] -impl acp::Client for AcpClientDelegate { - async fn stat(&self, params: acp::StatParams) -> Result { - let cx = &mut self.cx.clone(); - self.project.update(cx, |project, cx| { - let path = project - .project_path_for_absolute_path(Path::new(¶ms.path), cx) - .context("Failed to get project path")?; - - match project.entry_for_path(&path, cx) { - // todo! refresh entry? - None => Ok(acp::StatResponse { - exists: false, - is_directory: false, - }), - Some(entry) => Ok(acp::StatResponse { - exists: entry.is_created(), - is_directory: entry.is_dir(), - }), - } - })? +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum MessageChunk { + Text { + // todo! should it be shared string? what about streaming? + chunk: String, + }, + File { + content: FileContent, + }, + Directory { + path: PathBuf, + contents: Vec, + }, + Symbol { + path: PathBuf, + range: Range, + version: FileVersion, + name: SharedString, + content: SharedString, + }, + Fetch { + url: SharedString, + content: SharedString, + }, +} + +impl From<&str> for MessageChunk { + fn from(chunk: &str) -> Self { + MessageChunk::Text { + chunk: chunk.to_string().into(), + } } +} - async fn stream_message_chunk( - &self, - params: acp::StreamMessageChunkParams, - ) -> Result { - let cx = &mut self.cx.clone(); +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum AgentThreadEntryContent { + Message(Message), + ReadFile { path: PathBuf, content: String }, +} - cx.update(|cx| { - self.update_thread(¶ms.thread_id.into(), cx, |thread, cx| { - let acp::MessageChunk::Text { chunk } = ¶ms.chunk; - thread.push_assistant_chunk( - MessageChunk::Text { - chunk: chunk.into(), - }, - cx, - ) - }); - })?; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct ThreadEntryId(usize); - Ok(acp::StreamMessageChunkResponse) +impl ThreadEntryId { + pub fn post_inc(&mut self) -> Self { + let id = *self; + self.0 += 1; + id } +} - async fn read_text_file( - &self, - request: acp::ReadTextFileParams, - ) -> Result { - let cx = &mut self.cx.clone(); - let buffer = self - .project - .update(cx, |project, cx| { - let path = project - .project_path_for_absolute_path(Path::new(&request.path), cx) - .context("Failed to get project path")?; - anyhow::Ok(project.open_buffer(path, cx)) - })?? - .await?; - - buffer.update(cx, |buffer, cx| { - let start = language::Point::new(request.line_offset.unwrap_or(0), 0); - let end = match request.line_limit { - None => buffer.max_point(), - Some(limit) => start + language::Point::new(limit + 1, 0), - }; - - let content: String = buffer.text_for_range(start..end).collect(); - self.update_thread(&request.thread_id.into(), cx, |thread, cx| { - thread.push_entry( - AgentThreadEntryContent::ReadFile { - path: request.path.clone(), - content: content.clone(), - }, - cx, - ); - }); +#[derive(Debug)] +pub struct ThreadEntry { + pub id: ThreadEntryId, + pub content: AgentThreadEntryContent, +} - acp::ReadTextFileResponse { - content, - version: acp::FileVersion(0), - } - }) - } +pub struct AcpThread { + id: ThreadId, + next_entry_id: ThreadEntryId, + entries: Vec, + server: Arc, + title: SharedString, + project: Entity, +} - async fn read_binary_file( - &self, - request: acp::ReadBinaryFileParams, - ) -> Result { - let cx = &mut self.cx.clone(); - let file = self - .project - .update(cx, |project, cx| { - let (worktree, path) = project - .find_worktree(Path::new(&request.path), cx) - .context("Failed to get project path")?; - - let task = worktree.update(cx, |worktree, cx| worktree.load_binary_file(&path, cx)); - anyhow::Ok(task) - })?? - .await?; - - // todo! test - let content = cx - .background_spawn(async move { - let start = request.byte_offset.unwrap_or(0) as usize; - let end = request - .byte_limit - .map(|limit| (start + limit as usize).min(file.content.len())) - .unwrap_or(file.content.len()); - - let range_content = &file.content[start..end]; - - let mut base64_content = Vec::new(); - let mut base64_encoder = base64::write::EncoderWriter::new( - std::io::Cursor::new(&mut base64_content), - &base64::engine::general_purpose::STANDARD, - ); - base64_encoder.write_all(range_content)?; - drop(base64_encoder); - - // SAFETY: The base64 encoder should not produce non-UTF8. - unsafe { anyhow::Ok(String::from_utf8_unchecked(base64_content)) } - }) - .await?; +impl AcpThread { + pub fn new( + server: Arc, + thread_id: ThreadId, + entries: Vec, + project: Entity, + _: &mut Context, + ) -> Self { + let mut next_entry_id = ThreadEntryId(0); + Self { + title: "A new agent2 thread".into(), + entries: entries + .into_iter() + .map(|entry| ThreadEntry { + id: next_entry_id.post_inc(), + content: entry, + }) + .collect(), + server, + id: thread_id, + next_entry_id, + project, + } + } - Ok(acp::ReadBinaryFileResponse { - content, - // todo! - version: acp::FileVersion(0), - }) + pub fn title(&self) -> SharedString { + self.title.clone() } - async fn glob_search(&self, request: acp::GlobSearchParams) -> Result { - todo!() + pub fn entries(&self) -> &[ThreadEntry] { + &self.entries } -} -impl AcpAgent { - pub fn stdio(mut process: Child, project: Entity, cx: &mut AsyncApp) -> Arc { - let stdin = process.stdin.take().expect("process didn't have stdin"); - let stdout = process.stdout.take().expect("process didn't have stdout"); - - let threads: Arc>>> = Default::default(); - let (connection, handler_fut, io_fut) = acp::AgentConnection::connect_to_agent( - AcpClientDelegate::new(project.clone(), threads.clone(), cx.clone()), - stdin, - stdout, - ); - - let io_task = cx.background_spawn(async move { - io_fut.await.log_err(); - process.status().await.log_err(); + pub fn push_entry(&mut self, entry: AgentThreadEntryContent, cx: &mut Context) { + self.entries.push(ThreadEntry { + id: self.next_entry_id.post_inc(), + content: entry, }); - - Arc::new(Self { - project, - connection: Arc::new(connection), - threads, - _handler_task: cx.foreground_executor().spawn(handler_fut), - _io_task: io_task, - }) + cx.notify(); } -} -#[async_trait(?Send)] -impl Agent for AcpAgent { - async fn threads(&self, cx: &mut AsyncApp) -> Result> { - let response = self.connection.request(acp::GetThreadsParams).await?; - response - .threads - .into_iter() - .map(|thread| { - Ok(AgentThreadSummary { - id: thread.id.into(), - title: thread.title, - created_at: thread.modified_at, - }) - }) - .collect() - } + pub fn push_assistant_chunk(&mut self, chunk: MessageChunk, cx: &mut Context) { + if let Some(last_entry) = self.entries.last_mut() { + if let AgentThreadEntryContent::Message(Message { + ref mut chunks, + role: Role::Assistant, + }) = last_entry.content + { + if let ( + Some(MessageChunk::Text { chunk: old_chunk }), + MessageChunk::Text { chunk: new_chunk }, + ) = (chunks.last_mut(), &chunk) + { + old_chunk.push_str(&new_chunk); + return cx.notify(); + } - async fn create_thread(self: Arc, cx: &mut AsyncApp) -> Result> { - let response = self.connection.request(acp::CreateThreadParams).await?; - let thread_id: ThreadId = response.thread_id.into(); - let agent = self.clone(); - let thread = cx.new(|_| Thread { - title: "The agent2 thread".into(), - id: thread_id.clone(), - next_entry_id: ThreadEntryId(0), - entries: Vec::default(), - project: self.project.clone(), - agent, - })?; - self.threads.lock().insert(thread_id, thread.downgrade()); - Ok(thread) + chunks.push(chunk); + return cx.notify(); + } + } + + self.entries.push(ThreadEntry { + id: self.next_entry_id.post_inc(), + content: AgentThreadEntryContent::Message(Message { + role: Role::Assistant, + chunks: vec![chunk], + }), + }); + cx.notify(); } - async fn open_thread(&self, id: ThreadId, cx: &mut AsyncApp) -> Result> { - todo!() + pub fn send(&mut self, message: Message, cx: &mut Context) -> Task> { + let agent = self.server.clone(); + let id = self.id.clone(); + self.push_entry(AgentThreadEntryContent::Message(message.clone()), cx); + cx.spawn(async move |_, cx| { + agent.send_message(id, message, cx).await?; + Ok(()) + }) } +} - async fn thread_entries( - &self, - thread_id: ThreadId, - cx: &mut AsyncApp, - ) -> Result> { - let response = self - .connection - .request(acp::GetThreadEntriesParams { - thread_id: thread_id.clone().into(), - }) - .await?; - - Ok(response - .entries - .into_iter() - .map(|entry| match entry { - acp::ThreadEntry::Message { message } => { - AgentThreadEntryContent::Message(Message { - role: match message.role { - acp::Role::User => Role::User, - acp::Role::Assistant => Role::Assistant, - }, - chunks: message - .chunks - .into_iter() - .map(|chunk| match chunk { - acp::MessageChunk::Text { chunk } => MessageChunk::Text { - chunk: chunk.into(), - }, - }) - .collect(), - }) - } - acp::ThreadEntry::ReadFile { path, content } => { - AgentThreadEntryContent::ReadFile { path, content } - } - }) - .collect()) +#[cfg(test)] +mod tests { + use super::*; + use gpui::{AsyncApp, TestAppContext}; + use project::FakeFs; + use serde_json::json; + use settings::SettingsStore; + use std::{env, path::Path, process::Stdio}; + use util::path; + + fn init_test(cx: &mut TestAppContext) { + env_logger::init(); + cx.update(|cx| { + let settings_store = SettingsStore::test(cx); + cx.set_global(settings_store); + Project::init_settings(cx); + language::init(cx); + }); } - async fn send_thread_message( - &self, - thread_id: ThreadId, - message: crate::Message, - cx: &mut AsyncApp, - ) -> Result<()> { - self.connection - .request(acp::SendMessageParams { - thread_id: thread_id.clone().into(), - message: acp::Message { - role: match message.role { - Role::User => acp::Role::User, - Role::Assistant => acp::Role::Assistant, + #[gpui::test] + async fn test_gemini(cx: &mut TestAppContext) { + init_test(cx); + + cx.executor().allow_parking(); + + let fs = FakeFs::new(cx.executor()); + fs.insert_tree( + path!("/private/tmp"), + json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}), + ) + .await; + let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await; + let server = gemini_acp_server(project.clone(), cx.to_async()).unwrap(); + let thread = server.create_thread(&mut cx.to_async()).await.unwrap(); + thread + .update(cx, |thread, cx| { + thread.send( + Message { + role: Role::User, + chunks: vec![ + "Read the '/private/tmp/foo' file and output all of its contents." + .into(), + ], }, - chunks: message - .chunks - .into_iter() - .map(|chunk| match chunk { - MessageChunk::Text { chunk } => acp::MessageChunk::Text { - chunk: chunk.into(), - }, - MessageChunk::File { .. } => todo!(), - MessageChunk::Directory { .. } => todo!(), - MessageChunk::Symbol { .. } => todo!(), - MessageChunk::Fetch { .. } => todo!(), - }) - .collect(), - }, + cx, + ) }) - .await?; - Ok(()) + .await + .unwrap(); + + thread.read_with(cx, |thread, _| { + assert!(matches!( + thread.entries[0].content, + AgentThreadEntryContent::Message(Message { + role: Role::User, + .. + }) + )); + assert!( + thread.entries().iter().any(|entry| { + entry.content + == AgentThreadEntryContent::ReadFile { + path: "/private/tmp/foo".into(), + content: "Lorem ipsum dolor".into(), + } + }), + "Thread does not contain entry. Actual: {:?}", + thread.entries() + ); + }); } -} -impl From for ThreadId { - fn from(thread_id: acp::ThreadId) -> Self { - Self(thread_id.0.into()) - } -} + pub fn gemini_acp_server(project: Entity, mut cx: AsyncApp) -> Result> { + let cli_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini-cli/packages/cli"); + let mut command = util::command::new_smol_command("node"); + command + .arg(cli_path) + .arg("--acp") + .args(["--model", "gemini-2.5-flash"]) + .current_dir("/private/tmp") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .kill_on_drop(true); + + if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") { + command.env("GEMINI_API_KEY", gemini_key); + } + + let child = command.spawn().unwrap(); -impl From for acp::ThreadId { - fn from(thread_id: ThreadId) -> Self { - acp::ThreadId(thread_id.0.to_string()) + Ok(AcpServer::stdio(child, project, &mut cx)) } } diff --git a/crates/acp/src/agent2.rs b/crates/acp/src/agent2.rs deleted file mode 100644 index ee6e8c9e559dac64179ddfd0a8b94203f5851495..0000000000000000000000000000000000000000 --- a/crates/acp/src/agent2.rs +++ /dev/null @@ -1,352 +0,0 @@ -mod acp; -mod thread_element; - -use anyhow::Result; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use gpui::{AppContext, AsyncApp, Context, Entity, SharedString, Task}; -use project::Project; -use std::{ops::Range, path::PathBuf, sync::Arc}; - -pub use acp::AcpAgent; -pub use thread_element::ThreadElement; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ThreadId(SharedString); - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct FileVersion(u64); - -#[derive(Debug)] -pub struct AgentThreadSummary { - pub id: ThreadId, - pub title: String, - pub created_at: DateTime, -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct FileContent { - pub path: PathBuf, - pub version: FileVersion, - pub content: SharedString, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum Role { - User, - Assistant, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct Message { - pub role: Role, - pub chunks: Vec, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum MessageChunk { - Text { - // todo! should it be shared string? what about streaming? - chunk: String, - }, - File { - content: FileContent, - }, - Directory { - path: PathBuf, - contents: Vec, - }, - Symbol { - path: PathBuf, - range: Range, - version: FileVersion, - name: SharedString, - content: SharedString, - }, - Fetch { - url: SharedString, - content: SharedString, - }, -} - -impl From<&str> for MessageChunk { - fn from(chunk: &str) -> Self { - MessageChunk::Text { - chunk: chunk.to_string().into(), - } - } -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum AgentThreadEntryContent { - Message(Message), - ReadFile { path: PathBuf, content: String }, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct ThreadEntryId(usize); - -impl ThreadEntryId { - pub fn post_inc(&mut self) -> Self { - let id = *self; - self.0 += 1; - id - } -} - -#[derive(Debug)] -pub struct ThreadEntry { - pub id: ThreadEntryId, - pub content: AgentThreadEntryContent, -} - -pub struct ThreadStore { - threads: Vec, - agent: Arc, - project: Entity, -} - -impl ThreadStore { - pub async fn load( - agent: Arc, - project: Entity, - cx: &mut AsyncApp, - ) -> Result> { - let threads = agent.threads(cx).await?; - cx.new(|_cx| Self { - threads, - agent, - project, - }) - } - - /// Returns the threads in reverse chronological order. - pub fn threads(&self) -> &[AgentThreadSummary] { - &self.threads - } - - /// Opens a thread with the given ID. - pub fn open_thread( - &self, - id: ThreadId, - cx: &mut Context, - ) -> Task>> { - let agent = self.agent.clone(); - cx.spawn(async move |_, cx| agent.open_thread(id, cx).await) - } - - /// Creates a new thread. - pub fn create_thread(&self, cx: &mut Context) -> Task>> { - let agent = self.agent.clone(); - cx.spawn(async move |_, cx| agent.create_thread(cx).await) - } -} - -pub struct Thread { - id: ThreadId, - next_entry_id: ThreadEntryId, - entries: Vec, - agent: Arc, - title: SharedString, - project: Entity, -} - -impl Thread { - pub async fn load( - agent: Arc, - thread_id: ThreadId, - project: Entity, - cx: &mut AsyncApp, - ) -> Result> { - let entries = agent.thread_entries(thread_id.clone(), cx).await?; - cx.new(|cx| Self::new(agent, thread_id, entries, project, cx)) - } - - pub fn new( - agent: Arc, - thread_id: ThreadId, - entries: Vec, - project: Entity, - _: &mut Context, - ) -> Self { - let mut next_entry_id = ThreadEntryId(0); - Self { - title: "A new agent2 thread".into(), - entries: entries - .into_iter() - .map(|entry| ThreadEntry { - id: next_entry_id.post_inc(), - content: entry, - }) - .collect(), - agent, - id: thread_id, - next_entry_id, - project, - } - } - - pub fn title(&self) -> SharedString { - self.title.clone() - } - - pub fn entries(&self) -> &[ThreadEntry] { - &self.entries - } - - pub fn push_entry(&mut self, entry: AgentThreadEntryContent, cx: &mut Context) { - self.entries.push(ThreadEntry { - id: self.next_entry_id.post_inc(), - content: entry, - }); - cx.notify(); - } - - pub fn push_assistant_chunk(&mut self, chunk: MessageChunk, cx: &mut Context) { - if let Some(last_entry) = self.entries.last_mut() { - if let AgentThreadEntryContent::Message(Message { - ref mut chunks, - role: Role::Assistant, - }) = last_entry.content - { - if let ( - Some(MessageChunk::Text { chunk: old_chunk }), - MessageChunk::Text { chunk: new_chunk }, - ) = (chunks.last_mut(), &chunk) - { - old_chunk.push_str(&new_chunk); - return cx.notify(); - } - - chunks.push(chunk); - return cx.notify(); - } - } - - self.entries.push(ThreadEntry { - id: self.next_entry_id.post_inc(), - content: AgentThreadEntryContent::Message(Message { - role: Role::Assistant, - chunks: vec![chunk], - }), - }); - cx.notify(); - } - - pub fn send(&mut self, message: Message, cx: &mut Context) -> Task> { - let agent = self.agent.clone(); - let id = self.id.clone(); - self.push_entry(AgentThreadEntryContent::Message(message.clone()), cx); - cx.spawn(async move |_, cx| { - agent.send_thread_message(id, message, cx).await?; - Ok(()) - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::acp::AcpAgent; - use gpui::TestAppContext; - use project::FakeFs; - use serde_json::json; - use settings::SettingsStore; - use std::{env, path::Path, process::Stdio}; - use util::path; - - fn init_test(cx: &mut TestAppContext) { - env_logger::init(); - cx.update(|cx| { - let settings_store = SettingsStore::test(cx); - cx.set_global(settings_store); - Project::init_settings(cx); - language::init(cx); - }); - } - - #[gpui::test] - async fn test_gemini(cx: &mut TestAppContext) { - init_test(cx); - - cx.executor().allow_parking(); - - let fs = FakeFs::new(cx.executor()); - fs.insert_tree( - path!("/private/tmp"), - json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}), - ) - .await; - let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await; - let agent = gemini_agent(project.clone(), cx.to_async()).unwrap(); - let thread_store = ThreadStore::load(agent, project, &mut cx.to_async()) - .await - .unwrap(); - let thread = thread_store - .update(cx, |thread_store, cx| { - assert_eq!(thread_store.threads().len(), 0); - thread_store.create_thread(cx) - }) - .await - .unwrap(); - thread - .update(cx, |thread, cx| { - thread.send( - Message { - role: Role::User, - chunks: vec![ - "Read the '/private/tmp/foo' file and output all of its contents." - .into(), - ], - }, - cx, - ) - }) - .await - .unwrap(); - - thread.read_with(cx, |thread, _| { - assert!(matches!( - thread.entries[0].content, - AgentThreadEntryContent::Message(Message { - role: Role::User, - .. - }) - )); - assert!( - thread.entries().iter().any(|entry| { - entry.content - == AgentThreadEntryContent::ReadFile { - path: "/private/tmp/foo".into(), - content: "Lorem ipsum dolor".into(), - } - }), - "Thread does not contain entry. Actual: {:?}", - thread.entries() - ); - }); - } - - pub fn gemini_agent(project: Entity, mut cx: AsyncApp) -> Result> { - let cli_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini-cli/packages/cli"); - let mut command = util::command::new_smol_command("node"); - command - .arg(cli_path) - .arg("--acp") - .args(["--model", "gemini-2.5-flash"]) - .current_dir("/private/tmp") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .kill_on_drop(true); - - if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") { - command.env("GEMINI_API_KEY", gemini_key); - } - - let child = command.spawn().unwrap(); - - Ok(AcpAgent::stdio(child, project, &mut cx)) - } -} diff --git a/crates/acp/src/server.rs b/crates/acp/src/server.rs new file mode 100644 index 0000000000000000000000000000000000000000..57f700c3126d52f19601fd99382cdc6f0e9a692c --- /dev/null +++ b/crates/acp/src/server.rs @@ -0,0 +1,282 @@ +use crate::{AcpThread, AgentThreadEntryContent, MessageChunk, Role, ThreadEntryId, ThreadId}; +use agentic_coding_protocol as acp; +use anyhow::{Context as _, Result}; +use async_trait::async_trait; +use collections::HashMap; +use gpui::{App, AppContext, AsyncApp, Context, Entity, Task, WeakEntity}; +use parking_lot::Mutex; +use project::Project; +use smol::process::Child; +use std::{io::Write as _, path::Path, sync::Arc}; +use util::ResultExt; + +pub struct AcpServer { + connection: Arc, + threads: Arc>>>, + project: Entity, + _handler_task: Task<()>, + _io_task: Task<()>, +} + +struct AcpClientDelegate { + project: Entity, + threads: Arc>>>, + cx: AsyncApp, + // sent_buffer_versions: HashMap, HashMap>, +} + +impl AcpClientDelegate { + fn new( + project: Entity, + threads: Arc>>>, + cx: AsyncApp, + ) -> Self { + Self { + project, + threads, + cx: cx, + } + } + + fn update_thread( + &self, + thread_id: &ThreadId, + cx: &mut App, + callback: impl FnMut(&mut AcpThread, &mut Context) -> R, + ) -> Option { + let thread = self.threads.lock().get(&thread_id)?.clone(); + let Some(thread) = thread.upgrade() else { + self.threads.lock().remove(&thread_id); + return None; + }; + Some(thread.update(cx, callback)) + } +} + +#[async_trait(?Send)] +impl acp::Client for AcpClientDelegate { + async fn stat(&self, params: acp::StatParams) -> Result { + let cx = &mut self.cx.clone(); + self.project.update(cx, |project, cx| { + let path = project + .project_path_for_absolute_path(Path::new(¶ms.path), cx) + .context("Failed to get project path")?; + + match project.entry_for_path(&path, cx) { + // todo! refresh entry? + None => Ok(acp::StatResponse { + exists: false, + is_directory: false, + }), + Some(entry) => Ok(acp::StatResponse { + exists: entry.is_created(), + is_directory: entry.is_dir(), + }), + } + })? + } + + async fn stream_message_chunk( + &self, + params: acp::StreamMessageChunkParams, + ) -> Result { + dbg!(); + let cx = &mut self.cx.clone(); + + cx.update(|cx| { + self.update_thread(¶ms.thread_id.into(), cx, |thread, cx| { + let acp::MessageChunk::Text { chunk } = ¶ms.chunk; + thread.push_assistant_chunk( + MessageChunk::Text { + chunk: chunk.into(), + }, + cx, + ) + }); + })?; + + Ok(acp::StreamMessageChunkResponse) + } + + async fn read_text_file( + &self, + request: acp::ReadTextFileParams, + ) -> Result { + let cx = &mut self.cx.clone(); + let buffer = self + .project + .update(cx, |project, cx| { + let path = project + .project_path_for_absolute_path(Path::new(&request.path), cx) + .context("Failed to get project path")?; + anyhow::Ok(project.open_buffer(path, cx)) + })?? + .await?; + + buffer.update(cx, |buffer, cx| { + let start = language::Point::new(request.line_offset.unwrap_or(0), 0); + let end = match request.line_limit { + None => buffer.max_point(), + Some(limit) => start + language::Point::new(limit + 1, 0), + }; + + let content: String = buffer.text_for_range(start..end).collect(); + self.update_thread(&request.thread_id.into(), cx, |thread, cx| { + thread.push_entry( + AgentThreadEntryContent::ReadFile { + path: request.path.clone(), + content: content.clone(), + }, + cx, + ); + }); + + acp::ReadTextFileResponse { + content, + version: acp::FileVersion(0), + } + }) + } + + async fn read_binary_file( + &self, + request: acp::ReadBinaryFileParams, + ) -> Result { + let cx = &mut self.cx.clone(); + let file = self + .project + .update(cx, |project, cx| { + let (worktree, path) = project + .find_worktree(Path::new(&request.path), cx) + .context("Failed to get project path")?; + + let task = worktree.update(cx, |worktree, cx| worktree.load_binary_file(&path, cx)); + anyhow::Ok(task) + })?? + .await?; + + // todo! test + let content = cx + .background_spawn(async move { + let start = request.byte_offset.unwrap_or(0) as usize; + let end = request + .byte_limit + .map(|limit| (start + limit as usize).min(file.content.len())) + .unwrap_or(file.content.len()); + + let range_content = &file.content[start..end]; + + let mut base64_content = Vec::new(); + let mut base64_encoder = base64::write::EncoderWriter::new( + std::io::Cursor::new(&mut base64_content), + &base64::engine::general_purpose::STANDARD, + ); + base64_encoder.write_all(range_content)?; + drop(base64_encoder); + + // SAFETY: The base64 encoder should not produce non-UTF8. + unsafe { anyhow::Ok(String::from_utf8_unchecked(base64_content)) } + }) + .await?; + + Ok(acp::ReadBinaryFileResponse { + content, + // todo! + version: acp::FileVersion(0), + }) + } + + async fn glob_search(&self, request: acp::GlobSearchParams) -> Result { + todo!() + } +} + +impl AcpServer { + pub fn stdio(mut process: Child, project: Entity, cx: &mut AsyncApp) -> Arc { + let stdin = process.stdin.take().expect("process didn't have stdin"); + let stdout = process.stdout.take().expect("process didn't have stdout"); + + let threads: Arc>>> = Default::default(); + let (connection, handler_fut, io_fut) = acp::AgentConnection::connect_to_agent( + AcpClientDelegate::new(project.clone(), threads.clone(), cx.clone()), + stdin, + stdout, + ); + + let io_task = cx.background_spawn(async move { + io_fut.await.log_err(); + process.status().await.log_err(); + }); + + Arc::new(Self { + project, + connection: Arc::new(connection), + threads, + _handler_task: cx.foreground_executor().spawn(handler_fut), + _io_task: io_task, + }) + } +} + +impl AcpServer { + pub async fn create_thread(self: Arc, cx: &mut AsyncApp) -> Result> { + let response = self.connection.request(acp::CreateThreadParams).await?; + let thread_id: ThreadId = response.thread_id.into(); + let server = self.clone(); + let thread = cx.new(|_| AcpThread { + title: "The agent2 thread".into(), + id: thread_id.clone(), + next_entry_id: ThreadEntryId(0), + entries: Vec::default(), + project: self.project.clone(), + server, + })?; + self.threads.lock().insert(thread_id, thread.downgrade()); + Ok(thread) + } + + pub async fn send_message( + &self, + thread_id: ThreadId, + message: crate::Message, + cx: &mut AsyncApp, + ) -> Result<()> { + self.connection + .request(acp::SendMessageParams { + thread_id: thread_id.clone().into(), + message: acp::Message { + role: match message.role { + Role::User => acp::Role::User, + Role::Assistant => acp::Role::Assistant, + }, + chunks: message + .chunks + .into_iter() + .map(|chunk| match chunk { + MessageChunk::Text { chunk } => acp::MessageChunk::Text { + chunk: chunk.into(), + }, + MessageChunk::File { .. } => todo!(), + MessageChunk::Directory { .. } => todo!(), + MessageChunk::Symbol { .. } => todo!(), + MessageChunk::Fetch { .. } => todo!(), + }) + .collect(), + }, + }) + .await?; + Ok(()) + } +} + +impl From for ThreadId { + fn from(thread_id: acp::ThreadId) -> Self { + Self(thread_id.0.into()) + } +} + +impl From for acp::ThreadId { + fn from(thread_id: ThreadId) -> Self { + acp::ThreadId(thread_id.0.to_string()) + } +} diff --git a/crates/acp/src/thread_element.rs b/crates/acp/src/thread_view.rs similarity index 95% rename from crates/acp/src/thread_element.rs rename to crates/acp/src/thread_view.rs index e843edcde3da6ed577fd98fd80851615bcd1ad33..49a4d1fe8b6326a9af6fcaac551cdc7676f157dc 100644 --- a/crates/acp/src/thread_element.rs +++ b/crates/acp/src/thread_view.rs @@ -7,18 +7,18 @@ use ui::Tooltip; use ui::prelude::*; use zed_actions::agent::Chat; -use crate::{AgentThreadEntryContent, Message, MessageChunk, Role, Thread, ThreadEntry}; +use crate::{AcpThread, AgentThreadEntryContent, Message, MessageChunk, Role, ThreadEntry}; -pub struct ThreadElement { - thread: Entity, +pub struct AcpThreadView { + thread: Entity, // todo! use full message editor from agent2 message_editor: Entity, send_task: Option>>, _subscription: Subscription, } -impl ThreadElement { - pub fn new(thread: Entity, window: &mut Window, cx: &mut Context) -> Self { +impl AcpThreadView { + pub fn new(thread: Entity, window: &mut Window, cx: &mut Context) -> Self { let message_editor = cx.new(|cx| { let buffer = cx.new(|cx| Buffer::local("", cx)); let buffer = cx.new(|cx| MultiBuffer::singleton(buffer, cx)); @@ -127,13 +127,13 @@ impl ThreadElement { } } -impl Focusable for ThreadElement { +impl Focusable for AcpThreadView { fn focus_handle(&self, cx: &App) -> FocusHandle { self.message_editor.focus_handle(cx) } } -impl Render for ThreadElement { +impl Render for AcpThreadView { fn render(&mut self, window: &mut Window, cx: &mut Context) -> impl IntoElement { let text = self.message_editor.read(cx).text(cx); let is_editor_empty = text.is_empty(); diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index d07521f3ee0fbe2c5310e4a232b6213039bed6bd..149fcb22595f8a270419a3e4bedf3bc76bfe14a1 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use std::sync::Arc; use std::time::Duration; -use agent2::{AcpAgent, Agent as _}; +use acp::AcpServer; use db::kvp::{Dismissable, KEY_VALUE_STORE}; use serde::{Deserialize, Serialize}; @@ -198,7 +198,7 @@ enum ActiveView { _subscriptions: Vec, }, AcpThread { - thread_element: Entity, + thread_view: Entity, }, TextThread { context_editor: Entity, @@ -753,8 +753,8 @@ impl AgentPanel { ActiveView::Thread { thread, .. } => { thread.update(cx, |thread, cx| thread.cancel_last_completion(window, cx)); } - ActiveView::AcpThread { thread_element, .. } => { - thread_element.update(cx, |thread_element, _cx| thread_element.cancel()); + ActiveView::AcpThread { thread_view, .. } => { + thread_view.update(cx, |thread_element, _cx| thread_element.cancel()); } ActiveView::TextThread { .. } | ActiveView::History | ActiveView::Configuration => {} } @@ -916,12 +916,12 @@ impl AgentPanel { let project = self.project.clone(); cx.spawn_in(window, async move |this, cx| { - let agent = AcpAgent::stdio(child, project, cx); + let agent = AcpServer::stdio(child, project, cx); let thread = agent.create_thread(cx).await?; - let thread_element = - cx.new_window_entity(|window, cx| agent2::ThreadElement::new(thread, window, cx))?; + let thread_view = + cx.new_window_entity(|window, cx| acp::AcpThreadView::new(thread, window, cx))?; this.update_in(cx, |this, window, cx| { - this.set_active_view(ActiveView::AcpThread { thread_element }, window, cx); + this.set_active_view(ActiveView::AcpThread { thread_view }, window, cx); }) }) .detach(); @@ -1521,7 +1521,7 @@ impl Focusable for AgentPanel { fn focus_handle(&self, cx: &App) -> FocusHandle { match &self.active_view { ActiveView::Thread { message_editor, .. } => message_editor.focus_handle(cx), - ActiveView::AcpThread { thread_element, .. } => thread_element.focus_handle(cx), + ActiveView::AcpThread { thread_view, .. } => thread_view.focus_handle(cx), ActiveView::History => self.history.focus_handle(cx), ActiveView::TextThread { context_editor, .. } => context_editor.focus_handle(cx), ActiveView::Configuration => { @@ -1678,11 +1678,9 @@ impl AgentPanel { .into_any_element(), } } - ActiveView::AcpThread { thread_element } => { - Label::new(thread_element.read(cx).title(cx)) - .truncate() - .into_any_element() - } + ActiveView::AcpThread { thread_view } => Label::new(thread_view.read(cx).title(cx)) + .truncate() + .into_any_element(), ActiveView::TextThread { title_editor, context_editor, @@ -3188,9 +3186,9 @@ impl Render for AgentPanel { }) .child(h_flex().child(message_editor.clone())) .child(self.render_drag_target(cx)), - ActiveView::AcpThread { thread_element, .. } => parent + ActiveView::AcpThread { thread_view, .. } => parent .relative() - .child(thread_element.clone()) + .child(thread_view.clone()) // todo! // .child(h_flex().child(self.message_editor.clone())) .child(self.render_drag_target(cx)),