Detailed changes
@@ -1,345 +1,289 @@
-use std::{io::Write as _, path::Path, sync::Arc};
-
-use crate::{
- Agent, AgentThreadEntryContent, AgentThreadSummary, Message, MessageChunk, Role, Thread,
- ThreadEntryId, ThreadId,
-};
-use agentic_coding_protocol as acp;
-use anyhow::{Context as _, Result};
-use async_trait::async_trait;
-use collections::HashMap;
-use gpui::{App, AppContext, AsyncApp, Context, Entity, Task, WeakEntity};
-use parking_lot::Mutex;
+mod server;
+mod thread_view;
+
+use anyhow::Result;
+use chrono::{DateTime, Utc};
+use gpui::{Context, Entity, SharedString, Task};
use project::Project;
-use smol::process::Child;
-use util::ResultExt;
+use std::{ops::Range, path::PathBuf, sync::Arc};
-pub struct AcpAgent {
- connection: Arc<acp::AgentConnection>,
- threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<Thread>>>>,
- project: Entity<Project>,
- _handler_task: Task<()>,
- _io_task: Task<()>,
+pub use server::AcpServer;
+pub use thread_view::AcpThreadView;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ThreadId(SharedString);
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub struct FileVersion(u64);
+
+#[derive(Debug)]
+pub struct AgentThreadSummary {
+ pub id: ThreadId,
+ pub title: String,
+ pub created_at: DateTime<Utc>,
}
-struct AcpClientDelegate {
- project: Entity<Project>,
- threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<Thread>>>>,
- cx: AsyncApp,
- // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct FileContent {
+ pub path: PathBuf,
+ pub version: FileVersion,
+ pub content: SharedString,
}
-impl AcpClientDelegate {
- fn new(
- project: Entity<Project>,
- threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<Thread>>>>,
- cx: AsyncApp,
- ) -> Self {
- Self {
- project,
- threads,
- cx: cx,
- }
- }
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+pub enum Role {
+ User,
+ Assistant,
+}
- fn update_thread<R>(
- &self,
- thread_id: &ThreadId,
- cx: &mut App,
- callback: impl FnMut(&mut Thread, &mut Context<Thread>) -> R,
- ) -> Option<R> {
- let thread = self.threads.lock().get(&thread_id)?.clone();
- let Some(thread) = thread.upgrade() else {
- self.threads.lock().remove(&thread_id);
- return None;
- };
- Some(thread.update(cx, callback))
- }
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct Message {
+ pub role: Role,
+ pub chunks: Vec<MessageChunk>,
}
-#[async_trait(?Send)]
-impl acp::Client for AcpClientDelegate {
- async fn stat(&self, params: acp::StatParams) -> Result<acp::StatResponse> {
- let cx = &mut self.cx.clone();
- self.project.update(cx, |project, cx| {
- let path = project
- .project_path_for_absolute_path(Path::new(¶ms.path), cx)
- .context("Failed to get project path")?;
-
- match project.entry_for_path(&path, cx) {
- // todo! refresh entry?
- None => Ok(acp::StatResponse {
- exists: false,
- is_directory: false,
- }),
- Some(entry) => Ok(acp::StatResponse {
- exists: entry.is_created(),
- is_directory: entry.is_dir(),
- }),
- }
- })?
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum MessageChunk {
+ Text {
+ // todo! should it be shared string? what about streaming?
+ chunk: String,
+ },
+ File {
+ content: FileContent,
+ },
+ Directory {
+ path: PathBuf,
+ contents: Vec<FileContent>,
+ },
+ Symbol {
+ path: PathBuf,
+ range: Range<u64>,
+ version: FileVersion,
+ name: SharedString,
+ content: SharedString,
+ },
+ Fetch {
+ url: SharedString,
+ content: SharedString,
+ },
+}
+
+impl From<&str> for MessageChunk {
+ fn from(chunk: &str) -> Self {
+ MessageChunk::Text {
+ chunk: chunk.to_string().into(),
+ }
}
+}
- async fn stream_message_chunk(
- &self,
- params: acp::StreamMessageChunkParams,
- ) -> Result<acp::StreamMessageChunkResponse> {
- let cx = &mut self.cx.clone();
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum AgentThreadEntryContent {
+ Message(Message),
+ ReadFile { path: PathBuf, content: String },
+}
- cx.update(|cx| {
- self.update_thread(¶ms.thread_id.into(), cx, |thread, cx| {
- let acp::MessageChunk::Text { chunk } = ¶ms.chunk;
- thread.push_assistant_chunk(
- MessageChunk::Text {
- chunk: chunk.into(),
- },
- cx,
- )
- });
- })?;
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub struct ThreadEntryId(usize);
- Ok(acp::StreamMessageChunkResponse)
+impl ThreadEntryId {
+ pub fn post_inc(&mut self) -> Self {
+ let id = *self;
+ self.0 += 1;
+ id
}
+}
- async fn read_text_file(
- &self,
- request: acp::ReadTextFileParams,
- ) -> Result<acp::ReadTextFileResponse> {
- let cx = &mut self.cx.clone();
- let buffer = self
- .project
- .update(cx, |project, cx| {
- let path = project
- .project_path_for_absolute_path(Path::new(&request.path), cx)
- .context("Failed to get project path")?;
- anyhow::Ok(project.open_buffer(path, cx))
- })??
- .await?;
-
- buffer.update(cx, |buffer, cx| {
- let start = language::Point::new(request.line_offset.unwrap_or(0), 0);
- let end = match request.line_limit {
- None => buffer.max_point(),
- Some(limit) => start + language::Point::new(limit + 1, 0),
- };
-
- let content: String = buffer.text_for_range(start..end).collect();
- self.update_thread(&request.thread_id.into(), cx, |thread, cx| {
- thread.push_entry(
- AgentThreadEntryContent::ReadFile {
- path: request.path.clone(),
- content: content.clone(),
- },
- cx,
- );
- });
+#[derive(Debug)]
+pub struct ThreadEntry {
+ pub id: ThreadEntryId,
+ pub content: AgentThreadEntryContent,
+}
- acp::ReadTextFileResponse {
- content,
- version: acp::FileVersion(0),
- }
- })
- }
+pub struct AcpThread {
+ id: ThreadId,
+ next_entry_id: ThreadEntryId,
+ entries: Vec<ThreadEntry>,
+ server: Arc<AcpServer>,
+ title: SharedString,
+ project: Entity<Project>,
+}
- async fn read_binary_file(
- &self,
- request: acp::ReadBinaryFileParams,
- ) -> Result<acp::ReadBinaryFileResponse> {
- let cx = &mut self.cx.clone();
- let file = self
- .project
- .update(cx, |project, cx| {
- let (worktree, path) = project
- .find_worktree(Path::new(&request.path), cx)
- .context("Failed to get project path")?;
-
- let task = worktree.update(cx, |worktree, cx| worktree.load_binary_file(&path, cx));
- anyhow::Ok(task)
- })??
- .await?;
-
- // todo! test
- let content = cx
- .background_spawn(async move {
- let start = request.byte_offset.unwrap_or(0) as usize;
- let end = request
- .byte_limit
- .map(|limit| (start + limit as usize).min(file.content.len()))
- .unwrap_or(file.content.len());
-
- let range_content = &file.content[start..end];
-
- let mut base64_content = Vec::new();
- let mut base64_encoder = base64::write::EncoderWriter::new(
- std::io::Cursor::new(&mut base64_content),
- &base64::engine::general_purpose::STANDARD,
- );
- base64_encoder.write_all(range_content)?;
- drop(base64_encoder);
-
- // SAFETY: The base64 encoder should not produce non-UTF8.
- unsafe { anyhow::Ok(String::from_utf8_unchecked(base64_content)) }
- })
- .await?;
+impl AcpThread {
+ pub fn new(
+ server: Arc<AcpServer>,
+ thread_id: ThreadId,
+ entries: Vec<AgentThreadEntryContent>,
+ project: Entity<Project>,
+ _: &mut Context<Self>,
+ ) -> Self {
+ let mut next_entry_id = ThreadEntryId(0);
+ Self {
+ title: "A new agent2 thread".into(),
+ entries: entries
+ .into_iter()
+ .map(|entry| ThreadEntry {
+ id: next_entry_id.post_inc(),
+ content: entry,
+ })
+ .collect(),
+ server,
+ id: thread_id,
+ next_entry_id,
+ project,
+ }
+ }
- Ok(acp::ReadBinaryFileResponse {
- content,
- // todo!
- version: acp::FileVersion(0),
- })
+ pub fn title(&self) -> SharedString {
+ self.title.clone()
}
- async fn glob_search(&self, request: acp::GlobSearchParams) -> Result<acp::GlobSearchResponse> {
- todo!()
+ pub fn entries(&self) -> &[ThreadEntry] {
+ &self.entries
}
-}
-impl AcpAgent {
- pub fn stdio(mut process: Child, project: Entity<Project>, cx: &mut AsyncApp) -> Arc<Self> {
- let stdin = process.stdin.take().expect("process didn't have stdin");
- let stdout = process.stdout.take().expect("process didn't have stdout");
-
- let threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<Thread>>>> = Default::default();
- let (connection, handler_fut, io_fut) = acp::AgentConnection::connect_to_agent(
- AcpClientDelegate::new(project.clone(), threads.clone(), cx.clone()),
- stdin,
- stdout,
- );
-
- let io_task = cx.background_spawn(async move {
- io_fut.await.log_err();
- process.status().await.log_err();
+ pub fn push_entry(&mut self, entry: AgentThreadEntryContent, cx: &mut Context<Self>) {
+ self.entries.push(ThreadEntry {
+ id: self.next_entry_id.post_inc(),
+ content: entry,
});
-
- Arc::new(Self {
- project,
- connection: Arc::new(connection),
- threads,
- _handler_task: cx.foreground_executor().spawn(handler_fut),
- _io_task: io_task,
- })
+ cx.notify();
}
-}
-#[async_trait(?Send)]
-impl Agent for AcpAgent {
- async fn threads(&self, cx: &mut AsyncApp) -> Result<Vec<AgentThreadSummary>> {
- let response = self.connection.request(acp::GetThreadsParams).await?;
- response
- .threads
- .into_iter()
- .map(|thread| {
- Ok(AgentThreadSummary {
- id: thread.id.into(),
- title: thread.title,
- created_at: thread.modified_at,
- })
- })
- .collect()
- }
+ pub fn push_assistant_chunk(&mut self, chunk: MessageChunk, cx: &mut Context<Self>) {
+ if let Some(last_entry) = self.entries.last_mut() {
+ if let AgentThreadEntryContent::Message(Message {
+ ref mut chunks,
+ role: Role::Assistant,
+ }) = last_entry.content
+ {
+ if let (
+ Some(MessageChunk::Text { chunk: old_chunk }),
+ MessageChunk::Text { chunk: new_chunk },
+ ) = (chunks.last_mut(), &chunk)
+ {
+ old_chunk.push_str(&new_chunk);
+ return cx.notify();
+ }
- async fn create_thread(self: Arc<Self>, cx: &mut AsyncApp) -> Result<Entity<Thread>> {
- let response = self.connection.request(acp::CreateThreadParams).await?;
- let thread_id: ThreadId = response.thread_id.into();
- let agent = self.clone();
- let thread = cx.new(|_| Thread {
- title: "The agent2 thread".into(),
- id: thread_id.clone(),
- next_entry_id: ThreadEntryId(0),
- entries: Vec::default(),
- project: self.project.clone(),
- agent,
- })?;
- self.threads.lock().insert(thread_id, thread.downgrade());
- Ok(thread)
+ chunks.push(chunk);
+ return cx.notify();
+ }
+ }
+
+ self.entries.push(ThreadEntry {
+ id: self.next_entry_id.post_inc(),
+ content: AgentThreadEntryContent::Message(Message {
+ role: Role::Assistant,
+ chunks: vec![chunk],
+ }),
+ });
+ cx.notify();
}
- async fn open_thread(&self, id: ThreadId, cx: &mut AsyncApp) -> Result<Entity<Thread>> {
- todo!()
+ pub fn send(&mut self, message: Message, cx: &mut Context<Self>) -> Task<Result<()>> {
+ let agent = self.server.clone();
+ let id = self.id.clone();
+ self.push_entry(AgentThreadEntryContent::Message(message.clone()), cx);
+ cx.spawn(async move |_, cx| {
+ agent.send_message(id, message, cx).await?;
+ Ok(())
+ })
}
+}
- async fn thread_entries(
- &self,
- thread_id: ThreadId,
- cx: &mut AsyncApp,
- ) -> Result<Vec<AgentThreadEntryContent>> {
- let response = self
- .connection
- .request(acp::GetThreadEntriesParams {
- thread_id: thread_id.clone().into(),
- })
- .await?;
-
- Ok(response
- .entries
- .into_iter()
- .map(|entry| match entry {
- acp::ThreadEntry::Message { message } => {
- AgentThreadEntryContent::Message(Message {
- role: match message.role {
- acp::Role::User => Role::User,
- acp::Role::Assistant => Role::Assistant,
- },
- chunks: message
- .chunks
- .into_iter()
- .map(|chunk| match chunk {
- acp::MessageChunk::Text { chunk } => MessageChunk::Text {
- chunk: chunk.into(),
- },
- })
- .collect(),
- })
- }
- acp::ThreadEntry::ReadFile { path, content } => {
- AgentThreadEntryContent::ReadFile { path, content }
- }
- })
- .collect())
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use gpui::{AsyncApp, TestAppContext};
+ use project::FakeFs;
+ use serde_json::json;
+ use settings::SettingsStore;
+ use std::{env, path::Path, process::Stdio};
+ use util::path;
+
+ fn init_test(cx: &mut TestAppContext) {
+ env_logger::init();
+ cx.update(|cx| {
+ let settings_store = SettingsStore::test(cx);
+ cx.set_global(settings_store);
+ Project::init_settings(cx);
+ language::init(cx);
+ });
}
- async fn send_thread_message(
- &self,
- thread_id: ThreadId,
- message: crate::Message,
- cx: &mut AsyncApp,
- ) -> Result<()> {
- self.connection
- .request(acp::SendMessageParams {
- thread_id: thread_id.clone().into(),
- message: acp::Message {
- role: match message.role {
- Role::User => acp::Role::User,
- Role::Assistant => acp::Role::Assistant,
+ #[gpui::test]
+ async fn test_gemini(cx: &mut TestAppContext) {
+ init_test(cx);
+
+ cx.executor().allow_parking();
+
+ let fs = FakeFs::new(cx.executor());
+ fs.insert_tree(
+ path!("/private/tmp"),
+ json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}),
+ )
+ .await;
+ let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
+ let server = gemini_acp_server(project.clone(), cx.to_async()).unwrap();
+ let thread = server.create_thread(&mut cx.to_async()).await.unwrap();
+ thread
+ .update(cx, |thread, cx| {
+ thread.send(
+ Message {
+ role: Role::User,
+ chunks: vec![
+ "Read the '/private/tmp/foo' file and output all of its contents."
+ .into(),
+ ],
},
- chunks: message
- .chunks
- .into_iter()
- .map(|chunk| match chunk {
- MessageChunk::Text { chunk } => acp::MessageChunk::Text {
- chunk: chunk.into(),
- },
- MessageChunk::File { .. } => todo!(),
- MessageChunk::Directory { .. } => todo!(),
- MessageChunk::Symbol { .. } => todo!(),
- MessageChunk::Fetch { .. } => todo!(),
- })
- .collect(),
- },
+ cx,
+ )
})
- .await?;
- Ok(())
+ .await
+ .unwrap();
+
+ thread.read_with(cx, |thread, _| {
+ assert!(matches!(
+ thread.entries[0].content,
+ AgentThreadEntryContent::Message(Message {
+ role: Role::User,
+ ..
+ })
+ ));
+ assert!(
+ thread.entries().iter().any(|entry| {
+ entry.content
+ == AgentThreadEntryContent::ReadFile {
+ path: "/private/tmp/foo".into(),
+ content: "Lorem ipsum dolor".into(),
+ }
+ }),
+ "Thread does not contain entry. Actual: {:?}",
+ thread.entries()
+ );
+ });
}
-}
-impl From<acp::ThreadId> for ThreadId {
- fn from(thread_id: acp::ThreadId) -> Self {
- Self(thread_id.0.into())
- }
-}
+ pub fn gemini_acp_server(project: Entity<Project>, mut cx: AsyncApp) -> Result<Arc<AcpServer>> {
+ let cli_path =
+ Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini-cli/packages/cli");
+ let mut command = util::command::new_smol_command("node");
+ command
+ .arg(cli_path)
+ .arg("--acp")
+ .args(["--model", "gemini-2.5-flash"])
+ .current_dir("/private/tmp")
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::inherit())
+ .kill_on_drop(true);
+
+ if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") {
+ command.env("GEMINI_API_KEY", gemini_key);
+ }
+
+ let child = command.spawn().unwrap();
-impl From<ThreadId> for acp::ThreadId {
- fn from(thread_id: ThreadId) -> Self {
- acp::ThreadId(thread_id.0.to_string())
+ Ok(AcpServer::stdio(child, project, &mut cx))
}
}
@@ -1,352 +0,0 @@
-mod acp;
-mod thread_element;
-
-use anyhow::Result;
-use async_trait::async_trait;
-use chrono::{DateTime, Utc};
-use gpui::{AppContext, AsyncApp, Context, Entity, SharedString, Task};
-use project::Project;
-use std::{ops::Range, path::PathBuf, sync::Arc};
-
-pub use acp::AcpAgent;
-pub use thread_element::ThreadElement;
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub struct ThreadId(SharedString);
-
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub struct FileVersion(u64);
-
-#[derive(Debug)]
-pub struct AgentThreadSummary {
- pub id: ThreadId,
- pub title: String,
- pub created_at: DateTime<Utc>,
-}
-
-#[derive(Clone, Debug, PartialEq, Eq)]
-pub struct FileContent {
- pub path: PathBuf,
- pub version: FileVersion,
- pub content: SharedString,
-}
-
-#[derive(Copy, Clone, Debug, Eq, PartialEq)]
-pub enum Role {
- User,
- Assistant,
-}
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub struct Message {
- pub role: Role,
- pub chunks: Vec<MessageChunk>,
-}
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub enum MessageChunk {
- Text {
- // todo! should it be shared string? what about streaming?
- chunk: String,
- },
- File {
- content: FileContent,
- },
- Directory {
- path: PathBuf,
- contents: Vec<FileContent>,
- },
- Symbol {
- path: PathBuf,
- range: Range<u64>,
- version: FileVersion,
- name: SharedString,
- content: SharedString,
- },
- Fetch {
- url: SharedString,
- content: SharedString,
- },
-}
-
-impl From<&str> for MessageChunk {
- fn from(chunk: &str) -> Self {
- MessageChunk::Text {
- chunk: chunk.to_string().into(),
- }
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub enum AgentThreadEntryContent {
- Message(Message),
- ReadFile { path: PathBuf, content: String },
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub struct ThreadEntryId(usize);
-
-impl ThreadEntryId {
- pub fn post_inc(&mut self) -> Self {
- let id = *self;
- self.0 += 1;
- id
- }
-}
-
-#[derive(Debug)]
-pub struct ThreadEntry {
- pub id: ThreadEntryId,
- pub content: AgentThreadEntryContent,
-}
-
-pub struct ThreadStore {
- threads: Vec<AgentThreadSummary>,
- agent: Arc<dyn Agent>,
- project: Entity<Project>,
-}
-
-impl ThreadStore {
- pub async fn load(
- agent: Arc<dyn Agent>,
- project: Entity<Project>,
- cx: &mut AsyncApp,
- ) -> Result<Entity<Self>> {
- let threads = agent.threads(cx).await?;
- cx.new(|_cx| Self {
- threads,
- agent,
- project,
- })
- }
-
- /// Returns the threads in reverse chronological order.
- pub fn threads(&self) -> &[AgentThreadSummary] {
- &self.threads
- }
-
- /// Opens a thread with the given ID.
- pub fn open_thread(
- &self,
- id: ThreadId,
- cx: &mut Context<Self>,
- ) -> Task<Result<Entity<Thread>>> {
- let agent = self.agent.clone();
- cx.spawn(async move |_, cx| agent.open_thread(id, cx).await)
- }
-
- /// Creates a new thread.
- pub fn create_thread(&self, cx: &mut Context<Self>) -> Task<Result<Entity<Thread>>> {
- let agent = self.agent.clone();
- cx.spawn(async move |_, cx| agent.create_thread(cx).await)
- }
-}
-
-pub struct Thread {
- id: ThreadId,
- next_entry_id: ThreadEntryId,
- entries: Vec<ThreadEntry>,
- agent: Arc<dyn Agent>,
- title: SharedString,
- project: Entity<Project>,
-}
-
-impl Thread {
- pub async fn load(
- agent: Arc<dyn Agent>,
- thread_id: ThreadId,
- project: Entity<Project>,
- cx: &mut AsyncApp,
- ) -> Result<Entity<Self>> {
- let entries = agent.thread_entries(thread_id.clone(), cx).await?;
- cx.new(|cx| Self::new(agent, thread_id, entries, project, cx))
- }
-
- pub fn new(
- agent: Arc<dyn Agent>,
- thread_id: ThreadId,
- entries: Vec<AgentThreadEntryContent>,
- project: Entity<Project>,
- _: &mut Context<Self>,
- ) -> Self {
- let mut next_entry_id = ThreadEntryId(0);
- Self {
- title: "A new agent2 thread".into(),
- entries: entries
- .into_iter()
- .map(|entry| ThreadEntry {
- id: next_entry_id.post_inc(),
- content: entry,
- })
- .collect(),
- agent,
- id: thread_id,
- next_entry_id,
- project,
- }
- }
-
- pub fn title(&self) -> SharedString {
- self.title.clone()
- }
-
- pub fn entries(&self) -> &[ThreadEntry] {
- &self.entries
- }
-
- pub fn push_entry(&mut self, entry: AgentThreadEntryContent, cx: &mut Context<Self>) {
- self.entries.push(ThreadEntry {
- id: self.next_entry_id.post_inc(),
- content: entry,
- });
- cx.notify();
- }
-
- pub fn push_assistant_chunk(&mut self, chunk: MessageChunk, cx: &mut Context<Self>) {
- if let Some(last_entry) = self.entries.last_mut() {
- if let AgentThreadEntryContent::Message(Message {
- ref mut chunks,
- role: Role::Assistant,
- }) = last_entry.content
- {
- if let (
- Some(MessageChunk::Text { chunk: old_chunk }),
- MessageChunk::Text { chunk: new_chunk },
- ) = (chunks.last_mut(), &chunk)
- {
- old_chunk.push_str(&new_chunk);
- return cx.notify();
- }
-
- chunks.push(chunk);
- return cx.notify();
- }
- }
-
- self.entries.push(ThreadEntry {
- id: self.next_entry_id.post_inc(),
- content: AgentThreadEntryContent::Message(Message {
- role: Role::Assistant,
- chunks: vec![chunk],
- }),
- });
- cx.notify();
- }
-
- pub fn send(&mut self, message: Message, cx: &mut Context<Self>) -> Task<Result<()>> {
- let agent = self.agent.clone();
- let id = self.id.clone();
- self.push_entry(AgentThreadEntryContent::Message(message.clone()), cx);
- cx.spawn(async move |_, cx| {
- agent.send_thread_message(id, message, cx).await?;
- Ok(())
- })
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::acp::AcpAgent;
- use gpui::TestAppContext;
- use project::FakeFs;
- use serde_json::json;
- use settings::SettingsStore;
- use std::{env, path::Path, process::Stdio};
- use util::path;
-
- fn init_test(cx: &mut TestAppContext) {
- env_logger::init();
- cx.update(|cx| {
- let settings_store = SettingsStore::test(cx);
- cx.set_global(settings_store);
- Project::init_settings(cx);
- language::init(cx);
- });
- }
-
- #[gpui::test]
- async fn test_gemini(cx: &mut TestAppContext) {
- init_test(cx);
-
- cx.executor().allow_parking();
-
- let fs = FakeFs::new(cx.executor());
- fs.insert_tree(
- path!("/private/tmp"),
- json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}),
- )
- .await;
- let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
- let agent = gemini_agent(project.clone(), cx.to_async()).unwrap();
- let thread_store = ThreadStore::load(agent, project, &mut cx.to_async())
- .await
- .unwrap();
- let thread = thread_store
- .update(cx, |thread_store, cx| {
- assert_eq!(thread_store.threads().len(), 0);
- thread_store.create_thread(cx)
- })
- .await
- .unwrap();
- thread
- .update(cx, |thread, cx| {
- thread.send(
- Message {
- role: Role::User,
- chunks: vec![
- "Read the '/private/tmp/foo' file and output all of its contents."
- .into(),
- ],
- },
- cx,
- )
- })
- .await
- .unwrap();
-
- thread.read_with(cx, |thread, _| {
- assert!(matches!(
- thread.entries[0].content,
- AgentThreadEntryContent::Message(Message {
- role: Role::User,
- ..
- })
- ));
- assert!(
- thread.entries().iter().any(|entry| {
- entry.content
- == AgentThreadEntryContent::ReadFile {
- path: "/private/tmp/foo".into(),
- content: "Lorem ipsum dolor".into(),
- }
- }),
- "Thread does not contain entry. Actual: {:?}",
- thread.entries()
- );
- });
- }
-
- pub fn gemini_agent(project: Entity<Project>, mut cx: AsyncApp) -> Result<Arc<AcpAgent>> {
- let cli_path =
- Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini-cli/packages/cli");
- let mut command = util::command::new_smol_command("node");
- command
- .arg(cli_path)
- .arg("--acp")
- .args(["--model", "gemini-2.5-flash"])
- .current_dir("/private/tmp")
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .stderr(Stdio::inherit())
- .kill_on_drop(true);
-
- if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") {
- command.env("GEMINI_API_KEY", gemini_key);
- }
-
- let child = command.spawn().unwrap();
-
- Ok(AcpAgent::stdio(child, project, &mut cx))
- }
-}
@@ -0,0 +1,282 @@
+use crate::{AcpThread, AgentThreadEntryContent, MessageChunk, Role, ThreadEntryId, ThreadId};
+use agentic_coding_protocol as acp;
+use anyhow::{Context as _, Result};
+use async_trait::async_trait;
+use collections::HashMap;
+use gpui::{App, AppContext, AsyncApp, Context, Entity, Task, WeakEntity};
+use parking_lot::Mutex;
+use project::Project;
+use smol::process::Child;
+use std::{io::Write as _, path::Path, sync::Arc};
+use util::ResultExt;
+
+pub struct AcpServer {
+ connection: Arc<acp::AgentConnection>,
+ threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<AcpThread>>>>,
+ project: Entity<Project>,
+ _handler_task: Task<()>,
+ _io_task: Task<()>,
+}
+
+struct AcpClientDelegate {
+ project: Entity<Project>,
+ threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<AcpThread>>>>,
+ cx: AsyncApp,
+ // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
+}
+
+impl AcpClientDelegate {
+ fn new(
+ project: Entity<Project>,
+ threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<AcpThread>>>>,
+ cx: AsyncApp,
+ ) -> Self {
+ Self {
+ project,
+ threads,
+ cx: cx,
+ }
+ }
+
+ fn update_thread<R>(
+ &self,
+ thread_id: &ThreadId,
+ cx: &mut App,
+ callback: impl FnMut(&mut AcpThread, &mut Context<AcpThread>) -> R,
+ ) -> Option<R> {
+ let thread = self.threads.lock().get(&thread_id)?.clone();
+ let Some(thread) = thread.upgrade() else {
+ self.threads.lock().remove(&thread_id);
+ return None;
+ };
+ Some(thread.update(cx, callback))
+ }
+}
+
+#[async_trait(?Send)]
+impl acp::Client for AcpClientDelegate {
+ async fn stat(&self, params: acp::StatParams) -> Result<acp::StatResponse> {
+ let cx = &mut self.cx.clone();
+ self.project.update(cx, |project, cx| {
+ let path = project
+ .project_path_for_absolute_path(Path::new(¶ms.path), cx)
+ .context("Failed to get project path")?;
+
+ match project.entry_for_path(&path, cx) {
+ // todo! refresh entry?
+ None => Ok(acp::StatResponse {
+ exists: false,
+ is_directory: false,
+ }),
+ Some(entry) => Ok(acp::StatResponse {
+ exists: entry.is_created(),
+ is_directory: entry.is_dir(),
+ }),
+ }
+ })?
+ }
+
+ async fn stream_message_chunk(
+ &self,
+ params: acp::StreamMessageChunkParams,
+ ) -> Result<acp::StreamMessageChunkResponse> {
+ dbg!();
+ let cx = &mut self.cx.clone();
+
+ cx.update(|cx| {
+ self.update_thread(¶ms.thread_id.into(), cx, |thread, cx| {
+ let acp::MessageChunk::Text { chunk } = ¶ms.chunk;
+ thread.push_assistant_chunk(
+ MessageChunk::Text {
+ chunk: chunk.into(),
+ },
+ cx,
+ )
+ });
+ })?;
+
+ Ok(acp::StreamMessageChunkResponse)
+ }
+
+ async fn read_text_file(
+ &self,
+ request: acp::ReadTextFileParams,
+ ) -> Result<acp::ReadTextFileResponse> {
+ let cx = &mut self.cx.clone();
+ let buffer = self
+ .project
+ .update(cx, |project, cx| {
+ let path = project
+ .project_path_for_absolute_path(Path::new(&request.path), cx)
+ .context("Failed to get project path")?;
+ anyhow::Ok(project.open_buffer(path, cx))
+ })??
+ .await?;
+
+ buffer.update(cx, |buffer, cx| {
+ let start = language::Point::new(request.line_offset.unwrap_or(0), 0);
+ let end = match request.line_limit {
+ None => buffer.max_point(),
+ Some(limit) => start + language::Point::new(limit + 1, 0),
+ };
+
+ let content: String = buffer.text_for_range(start..end).collect();
+ self.update_thread(&request.thread_id.into(), cx, |thread, cx| {
+ thread.push_entry(
+ AgentThreadEntryContent::ReadFile {
+ path: request.path.clone(),
+ content: content.clone(),
+ },
+ cx,
+ );
+ });
+
+ acp::ReadTextFileResponse {
+ content,
+ version: acp::FileVersion(0),
+ }
+ })
+ }
+
+ async fn read_binary_file(
+ &self,
+ request: acp::ReadBinaryFileParams,
+ ) -> Result<acp::ReadBinaryFileResponse> {
+ let cx = &mut self.cx.clone();
+ let file = self
+ .project
+ .update(cx, |project, cx| {
+ let (worktree, path) = project
+ .find_worktree(Path::new(&request.path), cx)
+ .context("Failed to get project path")?;
+
+ let task = worktree.update(cx, |worktree, cx| worktree.load_binary_file(&path, cx));
+ anyhow::Ok(task)
+ })??
+ .await?;
+
+ // todo! test
+ let content = cx
+ .background_spawn(async move {
+ let start = request.byte_offset.unwrap_or(0) as usize;
+ let end = request
+ .byte_limit
+ .map(|limit| (start + limit as usize).min(file.content.len()))
+ .unwrap_or(file.content.len());
+
+ let range_content = &file.content[start..end];
+
+ let mut base64_content = Vec::new();
+ let mut base64_encoder = base64::write::EncoderWriter::new(
+ std::io::Cursor::new(&mut base64_content),
+ &base64::engine::general_purpose::STANDARD,
+ );
+ base64_encoder.write_all(range_content)?;
+ drop(base64_encoder);
+
+ // SAFETY: The base64 encoder should not produce non-UTF8.
+ unsafe { anyhow::Ok(String::from_utf8_unchecked(base64_content)) }
+ })
+ .await?;
+
+ Ok(acp::ReadBinaryFileResponse {
+ content,
+ // todo!
+ version: acp::FileVersion(0),
+ })
+ }
+
+ async fn glob_search(&self, request: acp::GlobSearchParams) -> Result<acp::GlobSearchResponse> {
+ todo!()
+ }
+}
+
+impl AcpServer {
+ pub fn stdio(mut process: Child, project: Entity<Project>, cx: &mut AsyncApp) -> Arc<Self> {
+ let stdin = process.stdin.take().expect("process didn't have stdin");
+ let stdout = process.stdout.take().expect("process didn't have stdout");
+
+ let threads: Arc<Mutex<HashMap<ThreadId, WeakEntity<AcpThread>>>> = Default::default();
+ let (connection, handler_fut, io_fut) = acp::AgentConnection::connect_to_agent(
+ AcpClientDelegate::new(project.clone(), threads.clone(), cx.clone()),
+ stdin,
+ stdout,
+ );
+
+ let io_task = cx.background_spawn(async move {
+ io_fut.await.log_err();
+ process.status().await.log_err();
+ });
+
+ Arc::new(Self {
+ project,
+ connection: Arc::new(connection),
+ threads,
+ _handler_task: cx.foreground_executor().spawn(handler_fut),
+ _io_task: io_task,
+ })
+ }
+}
+
+impl AcpServer {
+ pub async fn create_thread(self: Arc<Self>, cx: &mut AsyncApp) -> Result<Entity<AcpThread>> {
+ let response = self.connection.request(acp::CreateThreadParams).await?;
+ let thread_id: ThreadId = response.thread_id.into();
+ let server = self.clone();
+ let thread = cx.new(|_| AcpThread {
+ title: "The agent2 thread".into(),
+ id: thread_id.clone(),
+ next_entry_id: ThreadEntryId(0),
+ entries: Vec::default(),
+ project: self.project.clone(),
+ server,
+ })?;
+ self.threads.lock().insert(thread_id, thread.downgrade());
+ Ok(thread)
+ }
+
+ pub async fn send_message(
+ &self,
+ thread_id: ThreadId,
+ message: crate::Message,
+ cx: &mut AsyncApp,
+ ) -> Result<()> {
+ self.connection
+ .request(acp::SendMessageParams {
+ thread_id: thread_id.clone().into(),
+ message: acp::Message {
+ role: match message.role {
+ Role::User => acp::Role::User,
+ Role::Assistant => acp::Role::Assistant,
+ },
+ chunks: message
+ .chunks
+ .into_iter()
+ .map(|chunk| match chunk {
+ MessageChunk::Text { chunk } => acp::MessageChunk::Text {
+ chunk: chunk.into(),
+ },
+ MessageChunk::File { .. } => todo!(),
+ MessageChunk::Directory { .. } => todo!(),
+ MessageChunk::Symbol { .. } => todo!(),
+ MessageChunk::Fetch { .. } => todo!(),
+ })
+ .collect(),
+ },
+ })
+ .await?;
+ Ok(())
+ }
+}
+
+impl From<acp::ThreadId> for ThreadId {
+ fn from(thread_id: acp::ThreadId) -> Self {
+ Self(thread_id.0.into())
+ }
+}
+
+impl From<ThreadId> for acp::ThreadId {
+ fn from(thread_id: ThreadId) -> Self {
+ acp::ThreadId(thread_id.0.to_string())
+ }
+}
@@ -7,18 +7,18 @@ use ui::Tooltip;
use ui::prelude::*;
use zed_actions::agent::Chat;
-use crate::{AgentThreadEntryContent, Message, MessageChunk, Role, Thread, ThreadEntry};
+use crate::{AcpThread, AgentThreadEntryContent, Message, MessageChunk, Role, ThreadEntry};
-pub struct ThreadElement {
- thread: Entity<Thread>,
+pub struct AcpThreadView {
+ thread: Entity<AcpThread>,
// todo! use full message editor from agent2
message_editor: Entity<Editor>,
send_task: Option<Task<Result<()>>>,
_subscription: Subscription,
}
-impl ThreadElement {
- pub fn new(thread: Entity<Thread>, window: &mut Window, cx: &mut Context<Self>) -> Self {
+impl AcpThreadView {
+ pub fn new(thread: Entity<AcpThread>, window: &mut Window, cx: &mut Context<Self>) -> Self {
let message_editor = cx.new(|cx| {
let buffer = cx.new(|cx| Buffer::local("", cx));
let buffer = cx.new(|cx| MultiBuffer::singleton(buffer, cx));
@@ -127,13 +127,13 @@ impl ThreadElement {
}
}
-impl Focusable for ThreadElement {
+impl Focusable for AcpThreadView {
fn focus_handle(&self, cx: &App) -> FocusHandle {
self.message_editor.focus_handle(cx)
}
}
-impl Render for ThreadElement {
+impl Render for AcpThreadView {
fn render(&mut self, window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
let text = self.message_editor.read(cx).text(cx);
let is_editor_empty = text.is_empty();
@@ -4,7 +4,7 @@ use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
-use agent2::{AcpAgent, Agent as _};
+use acp::AcpServer;
use db::kvp::{Dismissable, KEY_VALUE_STORE};
use serde::{Deserialize, Serialize};
@@ -198,7 +198,7 @@ enum ActiveView {
_subscriptions: Vec<gpui::Subscription>,
},
AcpThread {
- thread_element: Entity<agent2::ThreadElement>,
+ thread_view: Entity<acp::AcpThreadView>,
},
TextThread {
context_editor: Entity<TextThreadEditor>,
@@ -753,8 +753,8 @@ impl AgentPanel {
ActiveView::Thread { thread, .. } => {
thread.update(cx, |thread, cx| thread.cancel_last_completion(window, cx));
}
- ActiveView::AcpThread { thread_element, .. } => {
- thread_element.update(cx, |thread_element, _cx| thread_element.cancel());
+ ActiveView::AcpThread { thread_view, .. } => {
+ thread_view.update(cx, |thread_element, _cx| thread_element.cancel());
}
ActiveView::TextThread { .. } | ActiveView::History | ActiveView::Configuration => {}
}
@@ -916,12 +916,12 @@ impl AgentPanel {
let project = self.project.clone();
cx.spawn_in(window, async move |this, cx| {
- let agent = AcpAgent::stdio(child, project, cx);
+ let agent = AcpServer::stdio(child, project, cx);
let thread = agent.create_thread(cx).await?;
- let thread_element =
- cx.new_window_entity(|window, cx| agent2::ThreadElement::new(thread, window, cx))?;
+ let thread_view =
+ cx.new_window_entity(|window, cx| acp::AcpThreadView::new(thread, window, cx))?;
this.update_in(cx, |this, window, cx| {
- this.set_active_view(ActiveView::AcpThread { thread_element }, window, cx);
+ this.set_active_view(ActiveView::AcpThread { thread_view }, window, cx);
})
})
.detach();
@@ -1521,7 +1521,7 @@ impl Focusable for AgentPanel {
fn focus_handle(&self, cx: &App) -> FocusHandle {
match &self.active_view {
ActiveView::Thread { message_editor, .. } => message_editor.focus_handle(cx),
- ActiveView::AcpThread { thread_element, .. } => thread_element.focus_handle(cx),
+ ActiveView::AcpThread { thread_view, .. } => thread_view.focus_handle(cx),
ActiveView::History => self.history.focus_handle(cx),
ActiveView::TextThread { context_editor, .. } => context_editor.focus_handle(cx),
ActiveView::Configuration => {
@@ -1678,11 +1678,9 @@ impl AgentPanel {
.into_any_element(),
}
}
- ActiveView::AcpThread { thread_element } => {
- Label::new(thread_element.read(cx).title(cx))
- .truncate()
- .into_any_element()
- }
+ ActiveView::AcpThread { thread_view } => Label::new(thread_view.read(cx).title(cx))
+ .truncate()
+ .into_any_element(),
ActiveView::TextThread {
title_editor,
context_editor,
@@ -3188,9 +3186,9 @@ impl Render for AgentPanel {
})
.child(h_flex().child(message_editor.clone()))
.child(self.render_drag_target(cx)),
- ActiveView::AcpThread { thread_element, .. } => parent
+ ActiveView::AcpThread { thread_view, .. } => parent
.relative()
- .child(thread_element.clone())
+ .child(thread_view.clone())
// todo!
// .child(h_flex().child(self.message_editor.clone()))
.child(self.render_drag_target(cx)),