From 5b73b40df89fd4caf7596fb15b5b1ee54bd210df Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 1 Sep 2025 15:57:15 -0300 Subject: [PATCH] ACP Terminal support (#37129) Exposes terminal support via ACP and migrates our agent to use it. - N/A --------- Co-authored-by: Bennet Bo Fenner --- Cargo.lock | 9 +- Cargo.toml | 2 +- crates/acp_thread/Cargo.toml | 3 + crates/acp_thread/src/acp_thread.rs | 270 +++++++++++++--- crates/acp_thread/src/terminal.rs | 115 +++++-- crates/agent2/Cargo.toml | 2 - crates/agent2/src/agent.rs | 94 +++++- crates/agent2/src/thread.rs | 48 ++- crates/agent2/src/tools/terminal_tool.rs | 339 +++----------------- crates/agent_servers/src/acp.rs | 119 +++++-- crates/agent_ui/src/acp/entry_view_state.rs | 44 ++- crates/agent_ui/src/acp/thread_view.rs | 14 +- 12 files changed, 619 insertions(+), 440 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fed7077281333f53f4a9ce7b746227e3369d663b..a2ba36a91c445514ac9c8e1932b9b0135ca36b0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,7 @@ dependencies = [ "language_model", "markdown", "parking_lot", + "portable-pty", "project", "prompt_store", "rand 0.8.5", @@ -30,6 +31,7 @@ dependencies = [ "serde_json", "settings", "smol", + "task", "tempfile", "terminal", "ui", @@ -37,6 +39,7 @@ dependencies = [ "util", "uuid", "watch", + "which 6.0.3", "workspace-hack", ] @@ -192,9 +195,9 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.1.1" +version = "0.2.0-alpha.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b91e5ec3ce05e8effb2a7a3b7b1a587daa6699b9f98bbde6a35e44b8c6c773a" +checksum = "4ec42b8b612665799c7667890df4b5f5cb441b18a68619fd770f1e054480ee3f" dependencies = [ "anyhow", "async-broadcast", @@ -248,7 +251,6 @@ dependencies = [ "open", "parking_lot", "paths", - "portable-pty", "pretty_assertions", "project", "prompt_store", @@ -274,7 +276,6 @@ dependencies = [ "uuid", "watch", "web_search", - "which 6.0.3", "workspace-hack", "worktree", "zlog", diff --git a/Cargo.toml b/Cargo.toml index b20b37edb9ea08f49c757cc2d8764ce62494d688..6cf3d8858b01eb41afdeb860ca4284be0b9280a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -430,7 +430,7 @@ zlog_settings = { path = "crates/zlog_settings" } # External crates # -agent-client-protocol = "0.1" +agent-client-protocol = { version = "0.2.0-alpha.3", features = ["unstable"] } aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/Cargo.toml b/crates/acp_thread/Cargo.toml index 196614f731c6e330328e46eb75ba58cf928cf6cc..8d7bea8659c3f22d053e47d4b050bc4072e521ba 100644 --- a/crates/acp_thread/Cargo.toml +++ b/crates/acp_thread/Cargo.toml @@ -31,18 +31,21 @@ language.workspace = true language_model.workspace = true markdown.workspace = true parking_lot = { workspace = true, optional = true } +portable-pty.workspace = true project.workspace = true prompt_store.workspace = true serde.workspace = true serde_json.workspace = true settings.workspace = true smol.workspace = true +task.workspace = true terminal.workspace = true ui.workspace = true url.workspace = true util.workspace = true uuid.workspace = true watch.workspace = true +which.workspace = true workspace-hack.workspace = true [dev-dependencies] diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 394619732a72c205b6c5c940cc8b2b7d3a6d3d38..ab6aa98e99d4977256512b7c178dff26b71fc7e7 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -7,6 +7,7 @@ use agent_settings::AgentSettings; use collections::HashSet; pub use connection::*; pub use diff::*; +use futures::future::Shared; use language::language_settings::FormatOnSave; pub use mention::*; use project::lsp_store::{FormatTrigger, LspFormatTarget}; @@ -15,7 +16,7 @@ use settings::Settings as _; pub use terminal::*; use action_log::ActionLog; -use agent_client_protocol as acp; +use agent_client_protocol::{self as acp}; use anyhow::{Context as _, Result, anyhow}; use editor::Bias; use futures::{FutureExt, channel::oneshot, future::BoxFuture}; @@ -33,7 +34,8 @@ use std::rc::Rc; use std::time::{Duration, Instant}; use std::{fmt::Display, mem, path::PathBuf, sync::Arc}; use ui::App; -use util::ResultExt; +use util::{ResultExt, get_system_shell}; +use uuid::Uuid; #[derive(Debug)] pub struct UserMessage { @@ -183,37 +185,46 @@ impl ToolCall { tool_call: acp::ToolCall, status: ToolCallStatus, language_registry: Arc, + terminals: &HashMap>, cx: &mut App, - ) -> Self { + ) -> Result { let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") { first_line.to_owned() + "…" } else { tool_call.title }; - Self { + let mut content = Vec::with_capacity(tool_call.content.len()); + for item in tool_call.content { + content.push(ToolCallContent::from_acp( + item, + language_registry.clone(), + terminals, + cx, + )?); + } + + let result = Self { id: tool_call.id, label: cx .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)), kind: tool_call.kind, - content: tool_call - .content - .into_iter() - .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx)) - .collect(), + content, locations: tool_call.locations, resolved_locations: Vec::default(), status, raw_input: tool_call.raw_input, raw_output: tool_call.raw_output, - } + }; + Ok(result) } fn update_fields( &mut self, fields: acp::ToolCallUpdateFields, language_registry: Arc, + terminals: &HashMap>, cx: &mut App, - ) { + ) -> Result<()> { let acp::ToolCallUpdateFields { kind, status, @@ -248,14 +259,15 @@ impl ToolCall { // Reuse existing content if we can for (old, new) in self.content.iter_mut().zip(content.by_ref()) { - old.update_from_acp(new, language_registry.clone(), cx); + old.update_from_acp(new, language_registry.clone(), terminals, cx)?; } for new in content { self.content.push(ToolCallContent::from_acp( new, language_registry.clone(), + terminals, cx, - )) + )?) } self.content.truncate(new_content_len); } @@ -279,6 +291,7 @@ impl ToolCall { } self.raw_output = Some(raw_output); } + Ok(()) } pub fn diffs(&self) -> impl Iterator> { @@ -549,13 +562,16 @@ impl ToolCallContent { pub fn from_acp( content: acp::ToolCallContent, language_registry: Arc, + terminals: &HashMap>, cx: &mut App, - ) -> Self { + ) -> Result { match content { - acp::ToolCallContent::Content { content } => { - Self::ContentBlock(ContentBlock::new(content, &language_registry, cx)) - } - acp::ToolCallContent::Diff { diff } => Self::Diff(cx.new(|cx| { + acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new( + content, + &language_registry, + cx, + ))), + acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| { Diff::finalized( diff.path, diff.old_text, @@ -563,7 +579,12 @@ impl ToolCallContent { language_registry, cx, ) - })), + }))), + acp::ToolCallContent::Terminal { terminal_id } => terminals + .get(&terminal_id) + .cloned() + .map(Self::Terminal) + .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)), } } @@ -571,8 +592,9 @@ impl ToolCallContent { &mut self, new: acp::ToolCallContent, language_registry: Arc, + terminals: &HashMap>, cx: &mut App, - ) { + ) -> Result<()> { let needs_update = match (&self, &new) { (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => { old_diff.read(cx).needs_update( @@ -585,8 +607,9 @@ impl ToolCallContent { }; if needs_update { - *self = Self::from_acp(new, language_registry, cx); + *self = Self::from_acp(new, language_registry, terminals, cx)?; } + Ok(()) } pub fn to_markdown(&self, cx: &App) -> String { @@ -763,6 +786,8 @@ pub struct AcpThread { token_usage: Option, prompt_capabilities: acp::PromptCapabilities, _observe_prompt_capabilities: Task>, + determine_shell: Shared>, + terminals: HashMap>, } #[derive(Debug)] @@ -846,6 +871,20 @@ impl AcpThread { } }); + let determine_shell = cx + .background_spawn(async move { + if cfg!(windows) { + return get_system_shell(); + } + + if which::which("bash").is_ok() { + "bash".into() + } else { + get_system_shell() + } + }) + .shared(); + Self { action_log, shared_buffers: Default::default(), @@ -859,6 +898,8 @@ impl AcpThread { token_usage: None, prompt_capabilities, _observe_prompt_capabilities: task, + terminals: HashMap::default(), + determine_shell, } } @@ -1082,27 +1123,28 @@ impl AcpThread { let update = update.into(); let languages = self.project.read(cx).languages().clone(); - let (ix, current_call) = self - .tool_call_mut(update.id()) + let ix = self + .index_for_tool_call(update.id()) .context("Tool call not found")?; + let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else { + unreachable!() + }; + match update { ToolCallUpdate::UpdateFields(update) => { let location_updated = update.fields.locations.is_some(); - current_call.update_fields(update.fields, languages, cx); + call.update_fields(update.fields, languages, &self.terminals, cx)?; if location_updated { self.resolve_locations(update.id, cx); } } ToolCallUpdate::UpdateDiff(update) => { - current_call.content.clear(); - current_call - .content - .push(ToolCallContent::Diff(update.diff)); + call.content.clear(); + call.content.push(ToolCallContent::Diff(update.diff)); } ToolCallUpdate::UpdateTerminal(update) => { - current_call.content.clear(); - current_call - .content + call.content.clear(); + call.content .push(ToolCallContent::Terminal(update.terminal)); } } @@ -1125,21 +1167,30 @@ impl AcpThread { /// Fails if id does not match an existing entry. pub fn upsert_tool_call_inner( &mut self, - tool_call_update: acp::ToolCallUpdate, + update: acp::ToolCallUpdate, status: ToolCallStatus, cx: &mut Context, ) -> Result<(), acp::Error> { let language_registry = self.project.read(cx).languages().clone(); - let id = tool_call_update.id.clone(); + let id = update.id.clone(); - if let Some((ix, current_call)) = self.tool_call_mut(&id) { - current_call.update_fields(tool_call_update.fields, language_registry, cx); - current_call.status = status; + if let Some(ix) = self.index_for_tool_call(&id) { + let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else { + unreachable!() + }; + + call.update_fields(update.fields, language_registry, &self.terminals, cx)?; + call.status = status; cx.emit(AcpThreadEvent::EntryUpdated(ix)); } else { - let call = - ToolCall::from_acp(tool_call_update.try_into()?, status, language_registry, cx); + let call = ToolCall::from_acp( + update.try_into()?, + status, + language_registry, + &self.terminals, + cx, + )?; self.push_entry(AgentThreadEntry::ToolCall(call), cx); }; @@ -1147,6 +1198,22 @@ impl AcpThread { Ok(()) } + fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option { + self.entries + .iter() + .enumerate() + .rev() + .find_map(|(index, entry)| { + if let AgentThreadEntry::ToolCall(tool_call) = entry + && &tool_call.id == id + { + Some(index) + } else { + None + } + }) + } + fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> { // The tool call we are looking for is typically the last one, or very close to the end. // At the moment, it doesn't seem like a hashmap would be a good fit for this use case. @@ -1829,6 +1896,133 @@ impl AcpThread { }) } + pub fn create_terminal( + &self, + mut command: String, + args: Vec, + extra_env: Vec, + cwd: Option, + output_byte_limit: Option, + cx: &mut Context, + ) -> Task>> { + for arg in args { + command.push(' '); + command.push_str(&arg); + } + + let shell_command = if cfg!(windows) { + format!("$null | & {{{}}}", command.replace("\"", "'")) + } else if let Some(cwd) = cwd.as_ref().and_then(|cwd| cwd.as_os_str().to_str()) { + // Make sure once we're *inside* the shell, we cd into `cwd` + format!("(cd {cwd}; {}) self.project.update(cx, |project, cx| { + project.directory_environment(dir.as_path().into(), cx) + }), + None => Task::ready(None).shared(), + }; + + let env = cx.spawn(async move |_, _| { + let mut env = env.await.unwrap_or_default(); + if cfg!(unix) { + env.insert("PAGER".into(), "cat".into()); + } + for var in extra_env { + env.insert(var.name, var.value); + } + env + }); + + let project = self.project.clone(); + let language_registry = project.read(cx).languages().clone(); + let determine_shell = self.determine_shell.clone(); + + let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into()); + let terminal_task = cx.spawn({ + let terminal_id = terminal_id.clone(); + async move |_this, cx| { + let program = determine_shell.await; + let env = env.await; + let terminal = project + .update(cx, |project, cx| { + project.create_terminal_task( + task::SpawnInTerminal { + command: Some(program), + args, + cwd: cwd.clone(), + env, + ..Default::default() + }, + cx, + ) + })? + .await?; + + cx.new(|cx| { + Terminal::new( + terminal_id, + command, + cwd, + output_byte_limit.map(|l| l as usize), + terminal, + language_registry, + cx, + ) + }) + } + }); + + cx.spawn(async move |this, cx| { + let terminal = terminal_task.await?; + this.update(cx, |this, _cx| { + this.terminals.insert(terminal_id, terminal.clone()); + terminal + }) + }) + } + + pub fn kill_terminal( + &mut self, + terminal_id: acp::TerminalId, + cx: &mut Context, + ) -> Result<()> { + self.terminals + .get(&terminal_id) + .context("Terminal not found")? + .update(cx, |terminal, cx| { + terminal.kill(cx); + }); + + Ok(()) + } + + pub fn release_terminal( + &mut self, + terminal_id: acp::TerminalId, + cx: &mut Context, + ) -> Result<()> { + self.terminals + .remove(&terminal_id) + .context("Terminal not found")? + .update(cx, |terminal, cx| { + terminal.kill(cx); + }); + + Ok(()) + } + + pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result> { + self.terminals + .get(&terminal_id) + .context("Terminal not found") + .cloned() + } + pub fn to_markdown(&self, cx: &App) -> String { self.entries.iter().map(|e| e.to_markdown(cx)).collect() } diff --git a/crates/acp_thread/src/terminal.rs b/crates/acp_thread/src/terminal.rs index 41d7fb89bb2eb59207bf0a6557129a088b435f3a..6b4cdb73469d9dd7d1a1759bf3aa28d005d1f13e 100644 --- a/crates/acp_thread/src/terminal.rs +++ b/crates/acp_thread/src/terminal.rs @@ -1,34 +1,43 @@ -use gpui::{App, AppContext, Context, Entity}; +use agent_client_protocol as acp; + +use futures::{FutureExt as _, future::Shared}; +use gpui::{App, AppContext, Context, Entity, Task}; use language::LanguageRegistry; use markdown::Markdown; use std::{path::PathBuf, process::ExitStatus, sync::Arc, time::Instant}; pub struct Terminal { + id: acp::TerminalId, command: Entity, working_dir: Option, terminal: Entity, started_at: Instant, output: Option, + output_byte_limit: Option, + _output_task: Shared>, } pub struct TerminalOutput { pub ended_at: Instant, pub exit_status: Option, - pub was_content_truncated: bool, + pub content: String, pub original_content_len: usize, pub content_line_count: usize, - pub finished_with_empty_output: bool, } impl Terminal { pub fn new( + id: acp::TerminalId, command: String, working_dir: Option, + output_byte_limit: Option, terminal: Entity, language_registry: Arc, cx: &mut Context, ) -> Self { + let command_task = terminal.read(cx).wait_for_completed_task(cx); Self { + id, command: cx.new(|cx| { Markdown::new( format!("```\n{}\n```", command).into(), @@ -41,27 +50,93 @@ impl Terminal { terminal, started_at: Instant::now(), output: None, + output_byte_limit, + _output_task: cx + .spawn(async move |this, cx| { + let exit_status = command_task.await; + + this.update(cx, |this, cx| { + let (content, original_content_len) = this.truncated_output(cx); + let content_line_count = this.terminal.read(cx).total_lines(); + + this.output = Some(TerminalOutput { + ended_at: Instant::now(), + exit_status, + content, + original_content_len, + content_line_count, + }); + cx.notify(); + }) + .ok(); + + let exit_status = exit_status.map(portable_pty::ExitStatus::from); + + acp::TerminalExitStatus { + exit_code: exit_status.as_ref().map(|e| e.exit_code()), + signal: exit_status.and_then(|e| e.signal().map(Into::into)), + } + }) + .shared(), } } - pub fn finish( - &mut self, - exit_status: Option, - original_content_len: usize, - truncated_content_len: usize, - content_line_count: usize, - finished_with_empty_output: bool, - cx: &mut Context, - ) { - self.output = Some(TerminalOutput { - ended_at: Instant::now(), - exit_status, - was_content_truncated: truncated_content_len < original_content_len, - original_content_len, - content_line_count, - finished_with_empty_output, + pub fn id(&self) -> &acp::TerminalId { + &self.id + } + + pub fn wait_for_exit(&self) -> Shared> { + self._output_task.clone() + } + + pub fn kill(&mut self, cx: &mut App) { + self.terminal.update(cx, |terminal, _cx| { + terminal.kill_active_task(); }); - cx.notify(); + } + + pub fn current_output(&self, cx: &App) -> acp::TerminalOutputResponse { + if let Some(output) = self.output.as_ref() { + let exit_status = output.exit_status.map(portable_pty::ExitStatus::from); + + acp::TerminalOutputResponse { + output: output.content.clone(), + truncated: output.original_content_len > output.content.len(), + exit_status: Some(acp::TerminalExitStatus { + exit_code: exit_status.as_ref().map(|e| e.exit_code()), + signal: exit_status.and_then(|e| e.signal().map(Into::into)), + }), + } + } else { + let (current_content, original_len) = self.truncated_output(cx); + + acp::TerminalOutputResponse { + truncated: current_content.len() < original_len, + output: current_content, + exit_status: None, + } + } + } + + fn truncated_output(&self, cx: &App) -> (String, usize) { + let terminal = self.terminal.read(cx); + let mut content = terminal.get_content(); + + let original_content_len = content.len(); + + if let Some(limit) = self.output_byte_limit + && content.len() > limit + { + let mut end_ix = limit.min(content.len()); + while !content.is_char_boundary(end_ix) { + end_ix -= 1; + } + // Don't truncate mid-line, clear the remainder of the last line + end_ix = content[..end_ix].rfind('\n').unwrap_or(end_ix); + content.truncate(end_ix); + } + + (content, original_content_len) } pub fn command(&self) -> &Entity { diff --git a/crates/agent2/Cargo.toml b/crates/agent2/Cargo.toml index 68246a96b0288cb9091a4073a33712c0b69df67d..0e9c8fcf7237627d2cb7b17b977b68322160e6d5 100644 --- a/crates/agent2/Cargo.toml +++ b/crates/agent2/Cargo.toml @@ -48,7 +48,6 @@ log.workspace = true open.workspace = true parking_lot.workspace = true paths.workspace = true -portable-pty.workspace = true project.workspace = true prompt_store.workspace = true rust-embed.workspace = true @@ -68,7 +67,6 @@ util.workspace = true uuid.workspace = true watch.workspace = true web_search.workspace = true -which.workspace = true workspace-hack.workspace = true zstd.workspace = true diff --git a/crates/agent2/src/agent.rs b/crates/agent2/src/agent.rs index bb6a3c097ca27d6103c1072986f6d3255bc6c69f..e96b4c0cfa32be910a7a77e58a1911deb7e5357a 100644 --- a/crates/agent2/src/agent.rs +++ b/crates/agent2/src/agent.rs @@ -2,7 +2,7 @@ use crate::{ ContextServerRegistry, Thread, ThreadEvent, ThreadsDatabase, ToolCallAuthorization, UserMessageContent, templates::Templates, }; -use crate::{HistoryStore, TitleUpdated, TokenUsageUpdated}; +use crate::{HistoryStore, TerminalHandle, ThreadEnvironment, TitleUpdated, TokenUsageUpdated}; use acp_thread::{AcpThread, AgentModelSelector}; use action_log::ActionLog; use agent_client_protocol as acp; @@ -10,7 +10,8 @@ use agent_settings::AgentSettings; use anyhow::{Context as _, Result, anyhow}; use collections::{HashSet, IndexMap}; use fs::Fs; -use futures::channel::mpsc; +use futures::channel::{mpsc, oneshot}; +use futures::future::Shared; use futures::{StreamExt, future}; use gpui::{ App, AppContext, AsyncApp, Context, Entity, SharedString, Subscription, Task, WeakEntity, @@ -23,7 +24,7 @@ use prompt_store::{ use settings::update_settings_file; use std::any::Any; use std::collections::HashMap; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::rc::Rc; use std::sync::Arc; use util::ResultExt; @@ -276,13 +277,6 @@ impl NativeAgent { cx: &mut Context, ) -> Entity { let connection = Rc::new(NativeAgentConnection(cx.entity())); - let registry = LanguageModelRegistry::read_global(cx); - let summarization_model = registry.thread_summary_model().map(|c| c.model); - - thread_handle.update(cx, |thread, cx| { - thread.set_summarization_model(summarization_model, cx); - thread.add_default_tools(cx) - }); let thread = thread_handle.read(cx); let session_id = thread.id().clone(); @@ -301,6 +295,20 @@ impl NativeAgent { cx, ) }); + + let registry = LanguageModelRegistry::read_global(cx); + let summarization_model = registry.thread_summary_model().map(|c| c.model); + + thread_handle.update(cx, |thread, cx| { + thread.set_summarization_model(summarization_model, cx); + thread.add_default_tools( + Rc::new(AcpThreadEnvironment { + acp_thread: acp_thread.downgrade(), + }) as _, + cx, + ) + }); + let subscriptions = vec![ cx.observe_release(&acp_thread, |this, acp_thread, _cx| { this.sessions.remove(acp_thread.session_id()); @@ -1001,7 +1009,7 @@ impl acp_thread::AgentConnection for NativeAgentConnection { ) -> Option> { self.0.read_with(cx, |agent, _cx| { agent.sessions.get(session_id).map(|session| { - Rc::new(NativeAgentSessionEditor { + Rc::new(NativeAgentSessionTruncate { thread: session.thread.clone(), acp_thread: session.acp_thread.clone(), }) as _ @@ -1050,12 +1058,12 @@ impl acp_thread::AgentTelemetry for NativeAgentConnection { } } -struct NativeAgentSessionEditor { +struct NativeAgentSessionTruncate { thread: Entity, acp_thread: WeakEntity, } -impl acp_thread::AgentSessionTruncate for NativeAgentSessionEditor { +impl acp_thread::AgentSessionTruncate for NativeAgentSessionTruncate { fn run(&self, message_id: acp_thread::UserMessageId, cx: &mut App) -> Task> { match self.thread.update(cx, |thread, cx| { thread.truncate(message_id.clone(), cx)?; @@ -1104,6 +1112,66 @@ impl acp_thread::AgentSessionSetTitle for NativeAgentSessionSetTitle { } } +pub struct AcpThreadEnvironment { + acp_thread: WeakEntity, +} + +impl ThreadEnvironment for AcpThreadEnvironment { + fn create_terminal( + &self, + command: String, + cwd: Option, + output_byte_limit: Option, + cx: &mut AsyncApp, + ) -> Task>> { + let task = self.acp_thread.update(cx, |thread, cx| { + thread.create_terminal(command, vec![], vec![], cwd, output_byte_limit, cx) + }); + + let acp_thread = self.acp_thread.clone(); + cx.spawn(async move |cx| { + let terminal = task?.await?; + + let (drop_tx, drop_rx) = oneshot::channel(); + let terminal_id = terminal.read_with(cx, |terminal, _cx| terminal.id().clone())?; + + cx.spawn(async move |cx| { + drop_rx.await.ok(); + acp_thread.update(cx, |thread, cx| thread.release_terminal(terminal_id, cx)) + }) + .detach(); + + let handle = AcpTerminalHandle { + terminal, + _drop_tx: Some(drop_tx), + }; + + Ok(Rc::new(handle) as _) + }) + } +} + +pub struct AcpTerminalHandle { + terminal: Entity, + _drop_tx: Option>, +} + +impl TerminalHandle for AcpTerminalHandle { + fn id(&self, cx: &AsyncApp) -> Result { + self.terminal.read_with(cx, |term, _cx| term.id().clone()) + } + + fn wait_for_exit(&self, cx: &AsyncApp) -> Result>> { + self.terminal + .read_with(cx, |term, _cx| term.wait_for_exit()) + } + + fn current_output(&self, cx: &AsyncApp) -> Result { + self.terminal + .read_with(cx, |term, cx| term.current_output(cx)) + } +} + #[cfg(test)] mod tests { use crate::HistoryEntryId; diff --git a/crates/agent2/src/thread.rs b/crates/agent2/src/thread.rs index 8ff5b845066c8af90eb713aef2a0c87e6d114a85..6421e4982e9fef67af3c61f54f3374d59172f807 100644 --- a/crates/agent2/src/thread.rs +++ b/crates/agent2/src/thread.rs @@ -45,14 +45,15 @@ use schemars::{JsonSchema, Schema}; use serde::{Deserialize, Serialize}; use settings::{Settings, update_settings_file}; use smol::stream::StreamExt; -use std::fmt::Write; use std::{ collections::BTreeMap, ops::RangeInclusive, path::Path, + rc::Rc, sync::Arc, time::{Duration, Instant}, }; +use std::{fmt::Write, path::PathBuf}; use util::{ResultExt, debug_panic, markdown::MarkdownCodeBlock}; use uuid::Uuid; @@ -523,6 +524,22 @@ pub enum AgentMessageContent { ToolUse(LanguageModelToolUse), } +pub trait TerminalHandle { + fn id(&self, cx: &AsyncApp) -> Result; + fn current_output(&self, cx: &AsyncApp) -> Result; + fn wait_for_exit(&self, cx: &AsyncApp) -> Result>>; +} + +pub trait ThreadEnvironment { + fn create_terminal( + &self, + command: String, + cwd: Option, + output_byte_limit: Option, + cx: &mut AsyncApp, + ) -> Task>>; +} + #[derive(Debug)] pub enum ThreadEvent { UserMessage(UserMessage), @@ -535,6 +552,14 @@ pub enum ThreadEvent { Stop(acp::StopReason), } +#[derive(Debug)] +pub struct NewTerminal { + pub command: String, + pub output_byte_limit: Option, + pub cwd: Option, + pub response: oneshot::Sender>>, +} + #[derive(Debug)] pub struct ToolCallAuthorization { pub tool_call: acp::ToolCallUpdate, @@ -1024,7 +1049,11 @@ impl Thread { } } - pub fn add_default_tools(&mut self, cx: &mut Context) { + pub fn add_default_tools( + &mut self, + environment: Rc, + cx: &mut Context, + ) { let language_registry = self.project.read(cx).languages().clone(); self.add_tool(CopyPathTool::new(self.project.clone())); self.add_tool(CreateDirectoryTool::new(self.project.clone())); @@ -1045,7 +1074,7 @@ impl Thread { self.project.clone(), self.action_log.clone(), )); - self.add_tool(TerminalTool::new(self.project.clone(), cx)); + self.add_tool(TerminalTool::new(self.project.clone(), environment)); self.add_tool(ThinkingTool); self.add_tool(WebSearchTool); } @@ -2389,19 +2418,6 @@ impl ToolCallEventStream { .ok(); } - pub fn update_terminal(&self, terminal: Entity) { - self.stream - .0 - .unbounded_send(Ok(ThreadEvent::ToolCallUpdate( - acp_thread::ToolCallUpdateTerminal { - id: acp::ToolCallId(self.tool_use_id.to_string().into()), - terminal, - } - .into(), - ))) - .ok(); - } - pub fn authorize(&self, title: impl Into, cx: &mut App) -> Task> { if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions { return Task::ready(Ok(())); diff --git a/crates/agent2/src/tools/terminal_tool.rs b/crates/agent2/src/tools/terminal_tool.rs index 2270a7c32f076bee774c7c8177c4276985adc0b6..9ed585b1386e4958fe8d458a0376a70e0ef70862 100644 --- a/crates/agent2/src/tools/terminal_tool.rs +++ b/crates/agent2/src/tools/terminal_tool.rs @@ -1,19 +1,19 @@ use agent_client_protocol as acp; use anyhow::Result; -use futures::{FutureExt as _, future::Shared}; -use gpui::{App, AppContext, Entity, SharedString, Task}; +use gpui::{App, Entity, SharedString, Task}; use project::Project; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::{ path::{Path, PathBuf}, + rc::Rc, sync::Arc, }; -use util::{ResultExt, get_system_shell, markdown::MarkdownInlineCode}; +use util::markdown::MarkdownInlineCode; -use crate::{AgentTool, ToolCallEventStream}; +use crate::{AgentTool, ThreadEnvironment, ToolCallEventStream}; -const COMMAND_OUTPUT_LIMIT: usize = 16 * 1024; +const COMMAND_OUTPUT_LIMIT: u64 = 16 * 1024; /// Executes a shell one-liner and returns the combined output. /// @@ -36,25 +36,14 @@ pub struct TerminalToolInput { pub struct TerminalTool { project: Entity, - determine_shell: Shared>, + environment: Rc, } impl TerminalTool { - pub fn new(project: Entity, cx: &mut App) -> Self { - let determine_shell = cx.background_spawn(async move { - if cfg!(windows) { - return get_system_shell(); - } - - if which::which("bash").is_ok() { - "bash".into() - } else { - get_system_shell() - } - }); + pub fn new(project: Entity, environment: Rc) -> Self { Self { project, - determine_shell: determine_shell.shared(), + environment, } } } @@ -99,128 +88,49 @@ impl AgentTool for TerminalTool { event_stream: ToolCallEventStream, cx: &mut App, ) -> Task> { - let language_registry = self.project.read(cx).languages().clone(); let working_dir = match working_dir(&input, &self.project, cx) { Ok(dir) => dir, Err(err) => return Task::ready(Err(err)), }; - let program = self.determine_shell.clone(); - let command = if cfg!(windows) { - format!("$null | & {{{}}}", input.command.replace("\"", "'")) - } else if let Some(cwd) = working_dir - .as_ref() - .and_then(|cwd| cwd.as_os_str().to_str()) - { - // Make sure once we're *inside* the shell, we cd into `cwd` - format!("(cd {cwd}; {}) self.project.update(cx, |project, cx| { - project.directory_environment(dir.as_path().into(), cx) - }), - None => Task::ready(None).shared(), - }; - - let env = cx.spawn(async move |_| { - let mut env = env.await.unwrap_or_default(); - if cfg!(unix) { - env.insert("PAGER".into(), "cat".into()); - } - env - }); let authorize = event_stream.authorize(self.initial_title(Ok(input.clone())), cx); + cx.spawn(async move |cx| { + authorize.await?; + + let terminal = self + .environment + .create_terminal( + input.command.clone(), + working_dir, + Some(COMMAND_OUTPUT_LIMIT), + cx, + ) + .await?; - cx.spawn({ - async move |cx| { - authorize.await?; - - let program = program.await; - let env = env.await; - let terminal = self - .project - .update(cx, |project, cx| { - project.create_terminal_task( - task::SpawnInTerminal { - command: Some(program), - args, - cwd: working_dir.clone(), - env, - ..Default::default() - }, - cx, - ) - })? - .await?; - let acp_terminal = cx.new(|cx| { - acp_thread::Terminal::new( - input.command.clone(), - working_dir.clone(), - terminal.clone(), - language_registry, - cx, - ) - })?; - event_stream.update_terminal(acp_terminal.clone()); - - let exit_status = terminal - .update(cx, |terminal, cx| terminal.wait_for_completed_task(cx))? - .await; - let (content, content_line_count) = terminal.read_with(cx, |terminal, _| { - (terminal.get_content(), terminal.total_lines()) - })?; - - let (processed_content, finished_with_empty_output) = process_content( - &content, - &input.command, - exit_status.map(portable_pty::ExitStatus::from), - ); + let terminal_id = terminal.id(cx)?; + event_stream.update_fields(acp::ToolCallUpdateFields { + content: Some(vec![acp::ToolCallContent::Terminal { terminal_id }]), + ..Default::default() + }); - acp_terminal - .update(cx, |terminal, cx| { - terminal.finish( - exit_status, - content.len(), - processed_content.len(), - content_line_count, - finished_with_empty_output, - cx, - ); - }) - .log_err(); + let exit_status = terminal.wait_for_exit(cx)?.await; + let output = terminal.current_output(cx)?; - Ok(processed_content) - } + Ok(process_content(output, &input.command, exit_status)) }) } } fn process_content( - content: &str, + output: acp::TerminalOutputResponse, command: &str, - exit_status: Option, -) -> (String, bool) { - let should_truncate = content.len() > COMMAND_OUTPUT_LIMIT; - - let content = if should_truncate { - let mut end_ix = COMMAND_OUTPUT_LIMIT.min(content.len()); - while !content.is_char_boundary(end_ix) { - end_ix -= 1; - } - // Don't truncate mid-line, clear the remainder of the last line - end_ix = content[..end_ix].rfind('\n').unwrap_or(end_ix); - &content[..end_ix] - } else { - content - }; - let content = content.trim(); + exit_status: acp::TerminalExitStatus, +) -> String { + let content = output.output.trim(); let is_empty = content.is_empty(); + let content = format!("```\n{content}\n```"); - let content = if should_truncate { + let content = if output.truncated { format!( "Command output too long. The first {} bytes:\n\n{content}", content.len(), @@ -229,24 +139,21 @@ fn process_content( content }; - let content = match exit_status { - Some(exit_status) if exit_status.success() => { + let content = match exit_status.exit_code { + Some(0) => { if is_empty { "Command executed successfully.".to_string() } else { content } } - Some(exit_status) => { + Some(exit_code) => { if is_empty { - format!( - "Command \"{command}\" failed with exit code {}.", - exit_status.exit_code() - ) + format!("Command \"{command}\" failed with exit code {}.", exit_code) } else { format!( "Command \"{command}\" failed with exit code {}.\n\n{content}", - exit_status.exit_code() + exit_code ) } } @@ -257,7 +164,7 @@ fn process_content( ) } }; - (content, is_empty) + content } fn working_dir( @@ -300,169 +207,3 @@ fn working_dir( anyhow::bail!("`cd` directory {cd:?} was not in any of the project's worktrees."); } } - -#[cfg(test)] -mod tests { - use agent_settings::AgentSettings; - use editor::EditorSettings; - use fs::RealFs; - use gpui::{BackgroundExecutor, TestAppContext}; - use pretty_assertions::assert_eq; - use serde_json::json; - use settings::{Settings, SettingsStore}; - use terminal::terminal_settings::TerminalSettings; - use theme::ThemeSettings; - use util::test::TempTree; - - use crate::ThreadEvent; - - use super::*; - - fn init_test(executor: &BackgroundExecutor, cx: &mut TestAppContext) { - zlog::init_test(); - - executor.allow_parking(); - cx.update(|cx| { - let settings_store = SettingsStore::test(cx); - cx.set_global(settings_store); - language::init(cx); - Project::init_settings(cx); - ThemeSettings::register(cx); - TerminalSettings::register(cx); - EditorSettings::register(cx); - AgentSettings::register(cx); - }); - } - - #[gpui::test] - async fn test_interactive_command(executor: BackgroundExecutor, cx: &mut TestAppContext) { - if cfg!(windows) { - return; - } - - init_test(&executor, cx); - - let fs = Arc::new(RealFs::new(None, executor)); - let tree = TempTree::new(json!({ - "project": {}, - })); - let project: Entity = - Project::test(fs, [tree.path().join("project").as_path()], cx).await; - - let input = TerminalToolInput { - command: "cat".to_owned(), - cd: tree - .path() - .join("project") - .as_path() - .to_string_lossy() - .to_string(), - }; - let (event_stream_tx, mut event_stream_rx) = ToolCallEventStream::test(); - let result = cx - .update(|cx| Arc::new(TerminalTool::new(project, cx)).run(input, event_stream_tx, cx)); - - let auth = event_stream_rx.expect_authorization().await; - auth.response.send(auth.options[0].id.clone()).unwrap(); - event_stream_rx.expect_terminal().await; - assert_eq!(result.await.unwrap(), "Command executed successfully."); - } - - #[gpui::test] - async fn test_working_directory(executor: BackgroundExecutor, cx: &mut TestAppContext) { - if cfg!(windows) { - return; - } - - init_test(&executor, cx); - - let fs = Arc::new(RealFs::new(None, executor)); - let tree = TempTree::new(json!({ - "project": {}, - "other-project": {}, - })); - let project: Entity = - Project::test(fs, [tree.path().join("project").as_path()], cx).await; - - let check = |input, expected, cx: &mut TestAppContext| { - let (stream_tx, mut stream_rx) = ToolCallEventStream::test(); - let result = cx.update(|cx| { - Arc::new(TerminalTool::new(project.clone(), cx)).run(input, stream_tx, cx) - }); - cx.run_until_parked(); - let event = stream_rx.try_next(); - if let Ok(Some(Ok(ThreadEvent::ToolCallAuthorization(auth)))) = event { - auth.response.send(auth.options[0].id.clone()).unwrap(); - } - - cx.spawn(async move |_| { - let output = result.await; - assert_eq!(output.ok(), expected); - }) - }; - - check( - TerminalToolInput { - command: "pwd".into(), - cd: ".".into(), - }, - Some(format!( - "```\n{}\n```", - tree.path().join("project").display() - )), - cx, - ) - .await; - - check( - TerminalToolInput { - command: "pwd".into(), - cd: "other-project".into(), - }, - None, // other-project is a dir, but *not* a worktree (yet) - cx, - ) - .await; - - // Absolute path above the worktree root - check( - TerminalToolInput { - command: "pwd".into(), - cd: tree.path().to_string_lossy().into(), - }, - None, - cx, - ) - .await; - - project - .update(cx, |project, cx| { - project.create_worktree(tree.path().join("other-project"), true, cx) - }) - .await - .unwrap(); - - check( - TerminalToolInput { - command: "pwd".into(), - cd: "other-project".into(), - }, - Some(format!( - "```\n{}\n```", - tree.path().join("other-project").display() - )), - cx, - ) - .await; - - check( - TerminalToolInput { - command: "pwd".into(), - cd: ".".into(), - }, - None, - cx, - ) - .await; - } -} diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index b1d4bea5c35c113277847690906dd2f21e12050c..b29bfd5d8919f87594ccd26cbf9d7fdc3520ad30 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -134,6 +134,7 @@ impl AcpConnection { read_text_file: true, write_text_file: true, }, + terminal: true, }, }) .await?; @@ -344,11 +345,7 @@ impl acp::Client for ClientDelegate { let cx = &mut self.cx.clone(); let task = self - .sessions - .borrow() - .get(&arguments.session_id) - .context("Failed to get session")? - .thread + .session_thread(&arguments.session_id)? .update(cx, |thread, cx| { thread.request_tool_call_authorization(arguments.tool_call, arguments.options, cx) })??; @@ -364,11 +361,7 @@ impl acp::Client for ClientDelegate { ) -> Result<(), acp::Error> { let cx = &mut self.cx.clone(); let task = self - .sessions - .borrow() - .get(&arguments.session_id) - .context("Failed to get session")? - .thread + .session_thread(&arguments.session_id)? .update(cx, |thread, cx| { thread.write_text_file(arguments.path, arguments.content, cx) })?; @@ -382,16 +375,12 @@ impl acp::Client for ClientDelegate { &self, arguments: acp::ReadTextFileRequest, ) -> Result { - let cx = &mut self.cx.clone(); - let task = self - .sessions - .borrow() - .get(&arguments.session_id) - .context("Failed to get session")? - .thread - .update(cx, |thread, cx| { + let task = self.session_thread(&arguments.session_id)?.update( + &mut self.cx.clone(), + |thread, cx| { thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx) - })?; + }, + )?; let content = task.await?; @@ -402,16 +391,92 @@ impl acp::Client for ClientDelegate { &self, notification: acp::SessionNotification, ) -> Result<(), acp::Error> { - let cx = &mut self.cx.clone(); - let sessions = self.sessions.borrow(); - let session = sessions - .get(¬ification.session_id) - .context("Failed to get session")?; + self.session_thread(¬ification.session_id)? + .update(&mut self.cx.clone(), |thread, cx| { + thread.handle_session_update(notification.update, cx) + })??; + + Ok(()) + } + + async fn create_terminal( + &self, + args: acp::CreateTerminalRequest, + ) -> Result { + let terminal = self + .session_thread(&args.session_id)? + .update(&mut self.cx.clone(), |thread, cx| { + thread.create_terminal( + args.command, + args.args, + args.env, + args.cwd, + args.output_byte_limit, + cx, + ) + })? + .await?; + Ok( + terminal.read_with(&self.cx, |terminal, _| acp::CreateTerminalResponse { + terminal_id: terminal.id().clone(), + })?, + ) + } - session.thread.update(cx, |thread, cx| { - thread.handle_session_update(notification.update, cx) - })??; + async fn kill_terminal(&self, args: acp::KillTerminalRequest) -> Result<(), acp::Error> { + self.session_thread(&args.session_id)? + .update(&mut self.cx.clone(), |thread, cx| { + thread.kill_terminal(args.terminal_id, cx) + })??; + + Ok(()) + } + + async fn release_terminal(&self, args: acp::ReleaseTerminalRequest) -> Result<(), acp::Error> { + self.session_thread(&args.session_id)? + .update(&mut self.cx.clone(), |thread, cx| { + thread.release_terminal(args.terminal_id, cx) + })??; Ok(()) } + + async fn terminal_output( + &self, + args: acp::TerminalOutputRequest, + ) -> Result { + self.session_thread(&args.session_id)? + .read_with(&mut self.cx.clone(), |thread, cx| { + let out = thread + .terminal(args.terminal_id)? + .read(cx) + .current_output(cx); + + Ok(out) + })? + } + + async fn wait_for_terminal_exit( + &self, + args: acp::WaitForTerminalExitRequest, + ) -> Result { + let exit_status = self + .session_thread(&args.session_id)? + .update(&mut self.cx.clone(), |thread, cx| { + anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit()) + })?? + .await; + + Ok(acp::WaitForTerminalExitResponse { exit_status }) + } +} + +impl ClientDelegate { + fn session_thread(&self, session_id: &acp::SessionId) -> Result> { + let sessions = self.sessions.borrow(); + sessions + .get(session_id) + .context("Failed to get session") + .map(|session| session.thread.clone()) + } } diff --git a/crates/agent_ui/src/acp/entry_view_state.rs b/crates/agent_ui/src/acp/entry_view_state.rs index 76b3709325a0c84a72bc71db8a67a3d4bd72dd06..0103219e31e8210440e66637cce8101d283210ea 100644 --- a/crates/agent_ui/src/acp/entry_view_state.rs +++ b/crates/agent_ui/src/acp/entry_view_state.rs @@ -125,22 +125,35 @@ impl EntryViewState { views }; + let is_tool_call_completed = + matches!(tool_call.status, acp_thread::ToolCallStatus::Completed); + for terminal in terminals { - views.entry(terminal.entity_id()).or_insert_with(|| { - let element = create_terminal( - self.workspace.clone(), - self.project.clone(), - terminal.clone(), - window, - cx, - ) - .into_any(); - cx.emit(EntryViewEvent { - entry_index: index, - view_event: ViewEvent::NewTerminal(id.clone()), - }); - element - }); + match views.entry(terminal.entity_id()) { + collections::hash_map::Entry::Vacant(entry) => { + let element = create_terminal( + self.workspace.clone(), + self.project.clone(), + terminal.clone(), + window, + cx, + ) + .into_any(); + cx.emit(EntryViewEvent { + entry_index: index, + view_event: ViewEvent::NewTerminal(id.clone()), + }); + entry.insert(element); + } + collections::hash_map::Entry::Occupied(_entry) => { + if is_tool_call_completed && terminal.read(cx).output().is_none() { + cx.emit(EntryViewEvent { + entry_index: index, + view_event: ViewEvent::TerminalMovedToBackground(id.clone()), + }); + } + } + } } for diff in diffs { @@ -217,6 +230,7 @@ pub struct EntryViewEvent { pub enum ViewEvent { NewDiff(ToolCallId), NewTerminal(ToolCallId), + TerminalMovedToBackground(ToolCallId), MessageEditorEvent(Entity, MessageEditorEvent), } diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index eff9ceedd433ea8beb833108fb9fea1eb3f706da..5e842d713d3d2e20f71e5a0b5e5c1fce773bed8d 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -827,6 +827,9 @@ impl AcpThreadView { self.expanded_tool_calls.insert(tool_call_id.clone()); } } + ViewEvent::TerminalMovedToBackground(tool_call_id) => { + self.expanded_tool_calls.remove(tool_call_id); + } ViewEvent::MessageEditorEvent(_editor, MessageEditorEvent::Focus) => { if let Some(thread) = self.thread() && let Some(AgentThreadEntry::UserMessage(user_message)) = @@ -2418,7 +2421,8 @@ impl AcpThreadView { let output = terminal_data.output(); let command_finished = output.is_some(); - let truncated_output = output.is_some_and(|output| output.was_content_truncated); + let truncated_output = + output.is_some_and(|output| output.original_content_len > output.content.len()); let output_line_count = output.map(|output| output.content_line_count).unwrap_or(0); let command_failed = command_finished @@ -2540,14 +2544,14 @@ impl AcpThreadView { .when(truncated_output, |header| { let tooltip = if let Some(output) = output { if output_line_count + 10 > terminal::MAX_SCROLL_HISTORY_LINES { - "Output exceeded terminal max lines and was \ - truncated, the model received the first 16 KB." - .to_string() + format!("Output exceeded terminal max lines and was \ + truncated, the model received the first {}.", format_file_size(output.content.len() as u64, true)) } else { format!( "Output is {} long, and to avoid unexpected token usage, \ - only 16 KB was sent back to the model.", + only {} was sent back to the agent.", format_file_size(output.original_content_len as u64, true), + format_file_size(output.content.len() as u64, true) ) } } else {