From dc6fad965994db8ffcf804650a807b3bedadcb1b Mon Sep 17 00:00:00 2001 From: Richard Feldman Date: Thu, 2 Oct 2025 22:50:32 -0400 Subject: [PATCH] Display-only ACP terminals (#39419) Codex needs (and future projects are anticipated to need as well) a concept of display-only terminals. This refactors terminals to decouple the PTY part from the display part, so that we can render terminal changes based on a series of events - regardless of whether they're being driven from a PTY inside Zed or from an outside source (e.g. `codex-acp`). Release Notes: - N/A --- Cargo.lock | 2 + crates/acp_thread/src/acp_thread.rs | 293 ++++++++++++++++++++++++++++ crates/agent_servers/Cargo.toml | 2 + crates/agent_servers/src/acp.rs | 166 ++++++++++++++-- crates/terminal/src/terminal.rs | 47 +++++ 5 files changed, 493 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94e409bc7f39716f33039556da5c205b9945c811..7de383f18105bb13d1978fb7d8aa8a2c5c676398 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,9 +318,11 @@ dependencies = [ "smol", "task", "tempfile", + "terminal", "thiserror 2.0.12", "ui", "util", + "uuid", "watch", "workspace-hack", ] diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index e2c414985b99b3a939026103a004c2b10415fa91..c27dc94a7a3235942198647dcb8caccc5cd7dad5 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -788,6 +788,8 @@ pub struct AcpThread { prompt_capabilities: acp::PromptCapabilities, _observe_prompt_capabilities: Task>, terminals: HashMap>, + pending_terminal_output: HashMap>>, + pending_terminal_exit: HashMap, } #[derive(Debug)] @@ -810,6 +812,126 @@ pub enum AcpThreadEvent { impl EventEmitter for AcpThread {} +#[derive(Debug, Clone)] +pub enum TerminalProviderEvent { + Created { + terminal_id: acp::TerminalId, + label: String, + cwd: Option, + output_byte_limit: Option, + terminal: Entity<::terminal::Terminal>, + }, + Output { + terminal_id: acp::TerminalId, + data: Vec, + }, + TitleChanged { + terminal_id: acp::TerminalId, + title: String, + }, + Exit { + terminal_id: acp::TerminalId, + status: acp::TerminalExitStatus, + }, +} + +#[derive(Debug, Clone)] +pub enum TerminalProviderCommand { + WriteInput { + terminal_id: acp::TerminalId, + bytes: Vec, + }, + Resize { + terminal_id: acp::TerminalId, + cols: u16, + rows: u16, + }, + Close { + terminal_id: acp::TerminalId, + }, +} + +impl AcpThread { + pub fn on_terminal_provider_event( + &mut self, + event: TerminalProviderEvent, + cx: &mut Context, + ) { + match event { + TerminalProviderEvent::Created { + terminal_id, + label, + cwd, + output_byte_limit, + terminal, + } => { + let entity = self.register_terminal_created( + terminal_id.clone(), + label, + cwd, + output_byte_limit, + terminal, + cx, + ); + + if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) { + for data in chunks.drain(..) { + entity.update(cx, |term, cx| { + term.inner().update(cx, |inner, cx| { + inner.write_output(&data, cx); + }) + }); + } + } + + if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) { + entity.update(cx, |_term, cx| { + cx.notify(); + }); + } + + cx.notify(); + } + TerminalProviderEvent::Output { terminal_id, data } => { + if let Some(entity) = self.terminals.get(&terminal_id) { + entity.update(cx, |term, cx| { + term.inner().update(cx, |inner, cx| { + inner.write_output(&data, cx); + }) + }); + } else { + self.pending_terminal_output + .entry(terminal_id) + .or_default() + .push(data); + } + } + TerminalProviderEvent::TitleChanged { terminal_id, title } => { + if let Some(entity) = self.terminals.get(&terminal_id) { + entity.update(cx, |term, cx| { + term.inner().update(cx, |inner, cx| { + inner.breadcrumb_text = title; + cx.emit(::terminal::Event::BreadcrumbsChanged); + }) + }); + } + } + TerminalProviderEvent::Exit { + terminal_id, + status, + } => { + if let Some(entity) = self.terminals.get(&terminal_id) { + entity.update(cx, |_term, cx| { + cx.notify(); + }); + } else { + self.pending_terminal_exit.insert(terminal_id, status); + } + } + } + } +} + #[derive(PartialEq, Eq, Debug)] pub enum ThreadStatus { Idle, @@ -887,6 +1009,8 @@ impl AcpThread { prompt_capabilities, _observe_prompt_capabilities: task, terminals: HashMap::default(), + pending_terminal_output: HashMap::default(), + pending_terminal_exit: HashMap::default(), } } @@ -2079,6 +2203,32 @@ impl AcpThread { pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context) { cx.emit(AcpThreadEvent::LoadError(error)); } + + pub fn register_terminal_created( + &mut self, + terminal_id: acp::TerminalId, + command_label: String, + working_dir: Option, + output_byte_limit: Option, + terminal: Entity<::terminal::Terminal>, + cx: &mut Context, + ) -> Entity { + let language_registry = self.project.read(cx).languages().clone(); + + let entity = cx.new(|cx| { + Terminal::new( + terminal_id.clone(), + &command_label, + working_dir.clone(), + output_byte_limit.map(|l| l as usize), + terminal, + language_registry, + cx, + ) + }); + self.terminals.insert(terminal_id.clone(), entity.clone()); + entity + } } fn markdown_for_raw_output( @@ -2155,6 +2305,149 @@ mod tests { }); } + #[gpui::test] + async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) { + init_test(cx); + + let fs = FakeFs::new(cx.executor()); + let project = Project::test(fs, [], cx).await; + let connection = Rc::new(FakeAgentConnection::new()); + let thread = cx + .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx)) + .await + .unwrap(); + + let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into()); + + // Send Output BEFORE Created - should be buffered by acp_thread + thread.update(cx, |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Output { + terminal_id: terminal_id.clone(), + data: b"hello buffered".to_vec(), + }, + cx, + ); + }); + + // Create a display-only terminal and then send Created + let lower = cx.new(|cx| { + let builder = ::terminal::TerminalBuilder::new_display_only( + None, + ::terminal::terminal_settings::CursorShape::default(), + ::terminal::terminal_settings::AlternateScroll::On, + None, + 0, + cx, + ) + .unwrap(); + builder.subscribe(cx) + }); + + thread.update(cx, |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Created { + terminal_id: terminal_id.clone(), + label: "Buffered Test".to_string(), + cwd: None, + output_byte_limit: None, + terminal: lower.clone(), + }, + cx, + ); + }); + + // After Created, buffered Output should have been flushed into the renderer + let content = thread.read_with(cx, |thread, cx| { + let term = thread.terminal(terminal_id.clone()).unwrap(); + term.read_with(cx, |t, cx| t.inner().read(cx).get_content()) + }); + + assert!( + content.contains("hello buffered"), + "expected buffered output to render, got: {content}" + ); + } + + #[gpui::test] + async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) { + init_test(cx); + + let fs = FakeFs::new(cx.executor()); + let project = Project::test(fs, [], cx).await; + let connection = Rc::new(FakeAgentConnection::new()); + let thread = cx + .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx)) + .await + .unwrap(); + + let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into()); + + // Send Output BEFORE Created + thread.update(cx, |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Output { + terminal_id: terminal_id.clone(), + data: b"pre-exit data".to_vec(), + }, + cx, + ); + }); + + // Send Exit BEFORE Created + thread.update(cx, |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Exit { + terminal_id: terminal_id.clone(), + status: acp::TerminalExitStatus { + exit_code: Some(0), + signal: None, + meta: None, + }, + }, + cx, + ); + }); + + // Now create a display-only lower-level terminal and send Created + let lower = cx.new(|cx| { + let builder = ::terminal::TerminalBuilder::new_display_only( + None, + ::terminal::terminal_settings::CursorShape::default(), + ::terminal::terminal_settings::AlternateScroll::On, + None, + 0, + cx, + ) + .unwrap(); + builder.subscribe(cx) + }); + + thread.update(cx, |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Created { + terminal_id: terminal_id.clone(), + label: "Buffered Exit Test".to_string(), + cwd: None, + output_byte_limit: None, + terminal: lower.clone(), + }, + cx, + ); + }); + + // Output should be present after Created (flushed from buffer) + let content = thread.read_with(cx, |thread, cx| { + let term = thread.terminal(terminal_id.clone()).unwrap(); + term.read_with(cx, |t, cx| t.inner().read(cx).get_content()) + }); + + assert!( + content.contains("pre-exit data"), + "expected pre-exit data to render, got: {content}" + ); + } + #[gpui::test] async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) { init_test(cx); diff --git a/crates/agent_servers/Cargo.toml b/crates/agent_servers/Cargo.toml index ca6db6c663ddb2132c05d716e5b935c5855bccdb..bdf1b72fdc0c2c71d5e445633d1d4a8ce32a6ba4 100644 --- a/crates/agent_servers/Cargo.toml +++ b/crates/agent_servers/Cargo.toml @@ -47,6 +47,8 @@ task.workspace = true tempfile.workspace = true thiserror.workspace = true ui.workspace = true +terminal.workspace = true +uuid.workspace = true util.workspace = true watch.workspace = true workspace-hack.workspace = true diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index 52760a5b65ba335660e31a693c304c7c61acdec9..9ec952c14016b0d788356774b86ea0ec9d393545 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -19,7 +19,9 @@ use thiserror::Error; use anyhow::{Context as _, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity}; -use acp_thread::{AcpThread, AuthRequired, LoadError}; +use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent}; +use terminal::TerminalBuilder; +use terminal::terminal_settings::{AlternateScroll, CursorShape}; #[derive(Debug, Error)] #[error("Unsupported version")] @@ -700,10 +702,99 @@ impl acp::Client for ClientDelegate { } } + // Clone so we can inspect meta both before and after handing off to the thread + let update_clone = notification.update.clone(); + + // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal. + if let acp::SessionUpdate::ToolCall(tc) = &update_clone { + if let Some(meta) = &tc.meta { + if let Some(terminal_info) = meta.get("terminal_info") { + if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) + { + let terminal_id = acp::TerminalId(id_str.into()); + + // Create a minimal display-only lower-level terminal and register it. + let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| { + let builder = TerminalBuilder::new_display_only( + None, + CursorShape::default(), + AlternateScroll::On, + None, + 0, + cx, + )?; + let lower = cx.new(|cx| builder.subscribe(cx)); + thread.on_terminal_provider_event( + TerminalProviderEvent::Created { + terminal_id: terminal_id.clone(), + label: tc.title.clone(), + cwd: None, + output_byte_limit: None, + terminal: lower, + }, + cx, + ); + anyhow::Ok(()) + }); + } + } + } + } + + // Forward the update to the acp_thread as usual. session.thread.update(&mut self.cx.clone(), |thread, cx| { - thread.handle_session_update(notification.update, cx) + thread.handle_session_update(notification.update.clone(), cx) })??; + // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta. + if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone { + if let Some(meta) = &tcu.meta { + if let Some(term_out) = meta.get("terminal_output") { + if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) { + let terminal_id = acp::TerminalId(id_str.into()); + if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) { + let data = s.as_bytes().to_vec(); + let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Output { + terminal_id: terminal_id.clone(), + data, + }, + cx, + ); + }); + } + } + } + + // terminal_exit + if let Some(term_exit) = meta.get("terminal_exit") { + if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) { + let terminal_id = acp::TerminalId(id_str.into()); + let status = acp::TerminalExitStatus { + exit_code: term_exit + .get("exit_code") + .and_then(|v| v.as_u64()) + .map(|i| i as u32), + signal: term_exit + .get("signal") + .and_then(|v| v.as_str().map(|s| s.to_string())), + meta: None, + }; + let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| { + thread.on_terminal_provider_event( + TerminalProviderEvent::Exit { + terminal_id: terminal_id.clone(), + status, + }, + cx, + ); + }); + } + } + } + } + Ok(()) } @@ -711,25 +802,66 @@ impl acp::Client for ClientDelegate { &self, args: acp::CreateTerminalRequest, ) -> Result { - let terminal = self - .session_thread(&args.session_id)? - .update(&mut self.cx.clone(), |thread, cx| { - thread.create_terminal( - args.command, - args.args, - args.env, - args.cwd, - args.output_byte_limit, + let thread = self.session_thread(&args.session_id)?; + let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?; + + let mut env = if let Some(dir) = &args.cwd { + project + .update(&mut self.cx.clone(), |project, cx| { + project.directory_environment(&task::Shell::System, dir.clone().into(), cx) + })? + .await + .unwrap_or_default() + } else { + Default::default() + }; + for var in args.env { + env.insert(var.name, var.value); + } + + // Use remote shell or default system shell, as appropriate + let remote_shell = project.update(&mut self.cx.clone(), |project, cx| { + project + .remote_client() + .and_then(|r| r.read(cx).default_system_shell()) + })?; + let (task_command, task_args) = + task::ShellBuilder::new(remote_shell.as_deref(), &task::Shell::System) + .redirect_stdin_to_dev_null() + .build(Some(args.command.clone()), &args.args); + + let terminal_entity = project + .update(&mut self.cx.clone(), |project, cx| { + project.create_terminal_task( + task::SpawnInTerminal { + command: Some(task_command), + args: task_args, + cwd: args.cwd.clone(), + env, + ..Default::default() + }, cx, ) })? .await?; - Ok( - terminal.read_with(&self.cx, |terminal, _| acp::CreateTerminalResponse { - terminal_id: terminal.id().clone(), - meta: None, - })?, - ) + + // Register with renderer + let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| { + thread.register_terminal_created( + acp::TerminalId(uuid::Uuid::new_v4().to_string().into()), + format!("{} {}", args.command, args.args.join(" ")), + args.cwd.clone(), + args.output_byte_limit, + terminal_entity, + cx, + ) + })?; + let terminal_id = + terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?; + Ok(acp::CreateTerminalResponse { + terminal_id, + meta: None, + }) } async fn kill_terminal_command( diff --git a/crates/terminal/src/terminal.rs b/crates/terminal/src/terminal.rs index 288916c775cf66b7a0f468ee48525df9b394515e..99058d28ddf96c596d7a4a347c34f6355236b17d 100644 --- a/crates/terminal/src/terminal.rs +++ b/crates/terminal/src/terminal.rs @@ -339,6 +339,29 @@ pub struct TerminalBuilder { } impl TerminalBuilder { + pub fn new_display_only( + working_directory: Option, + cursor_shape: CursorShape, + alternate_scroll: AlternateScroll, + max_scroll_history_lines: Option, + window_id: u64, + cx: &App, + ) -> Result { + Self::new( + working_directory, + None, + Shell::System, + HashMap::default(), + cursor_shape, + alternate_scroll, + max_scroll_history_lines, + false, + window_id, + None, + cx, + Vec::new(), + ) + } pub fn new( working_directory: Option, task: Option, @@ -1132,6 +1155,19 @@ impl Terminal { self.term.lock().set_options(self.term_config.clone()); } + pub fn write_output(&mut self, bytes: &[u8], cx: &mut Context) { + // Inject bytes directly into the terminal emulator and refresh the UI. + // This bypasses the PTY/event loop for display-only terminals. + let mut processor = alacritty_terminal::vte::ansi::Processor::< + alacritty_terminal::vte::ansi::StdSyncHandler, + >::new(); + { + let mut term = self.term.lock(); + processor.advance(&mut *term, bytes); + } + cx.emit(Event::Wakeup); + } + pub fn total_lines(&self) -> usize { let term = self.term.clone(); let terminal = term.lock_unfair(); @@ -2272,6 +2308,17 @@ mod tests { terminal.update(cx, |term, _| term.get_content()).trim(), "hello" ); + + // Inject additional output directly into the emulator (display-only path) + terminal.update(cx, |term, cx| { + term.write_output(b"\nfrom_injection", cx); + }); + + let content_after = terminal.update(cx, |term, _| term.get_content()); + assert!( + content_after.contains("from_injection"), + "expected injected output to appear, got: {content_after}" + ); } // TODO should be tested on Linux too, but does not work there well