Cargo.lock 🔗
@@ -318,9 +318,11 @@ dependencies = [
"smol",
"task",
"tempfile",
+ "terminal",
"thiserror 2.0.12",
"ui",
"util",
+ "uuid",
"watch",
"workspace-hack",
]
Richard Feldman created
Codex needs (and future projects are anticipated to need as well) a
concept of display-only terminals. This refactors terminals to decouple
the PTY part from the display part, so that we can render terminal
changes based on a series of events - regardless of whether they're
being driven from a PTY inside Zed or from an outside source (e.g.
`codex-acp`).
Release Notes:
- N/A
Cargo.lock | 2
crates/acp_thread/src/acp_thread.rs | 293 +++++++++++++++++++++++++++++++
crates/agent_servers/Cargo.toml | 2
crates/agent_servers/src/acp.rs | 166 +++++++++++++++-
crates/terminal/src/terminal.rs | 47 ++++
5 files changed, 493 insertions(+), 17 deletions(-)
@@ -318,9 +318,11 @@ dependencies = [
"smol",
"task",
"tempfile",
+ "terminal",
"thiserror 2.0.12",
"ui",
"util",
+ "uuid",
"watch",
"workspace-hack",
]
@@ -788,6 +788,8 @@ pub struct AcpThread {
prompt_capabilities: acp::PromptCapabilities,
_observe_prompt_capabilities: Task<anyhow::Result<()>>,
terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
+ pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
+ pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
}
#[derive(Debug)]
@@ -810,6 +812,126 @@ pub enum AcpThreadEvent {
impl EventEmitter<AcpThreadEvent> for AcpThread {}
+#[derive(Debug, Clone)]
+pub enum TerminalProviderEvent {
+ Created {
+ terminal_id: acp::TerminalId,
+ label: String,
+ cwd: Option<PathBuf>,
+ output_byte_limit: Option<u64>,
+ terminal: Entity<::terminal::Terminal>,
+ },
+ Output {
+ terminal_id: acp::TerminalId,
+ data: Vec<u8>,
+ },
+ TitleChanged {
+ terminal_id: acp::TerminalId,
+ title: String,
+ },
+ Exit {
+ terminal_id: acp::TerminalId,
+ status: acp::TerminalExitStatus,
+ },
+}
+
+#[derive(Debug, Clone)]
+pub enum TerminalProviderCommand {
+ WriteInput {
+ terminal_id: acp::TerminalId,
+ bytes: Vec<u8>,
+ },
+ Resize {
+ terminal_id: acp::TerminalId,
+ cols: u16,
+ rows: u16,
+ },
+ Close {
+ terminal_id: acp::TerminalId,
+ },
+}
+
+impl AcpThread {
+ pub fn on_terminal_provider_event(
+ &mut self,
+ event: TerminalProviderEvent,
+ cx: &mut Context<Self>,
+ ) {
+ match event {
+ TerminalProviderEvent::Created {
+ terminal_id,
+ label,
+ cwd,
+ output_byte_limit,
+ terminal,
+ } => {
+ let entity = self.register_terminal_created(
+ terminal_id.clone(),
+ label,
+ cwd,
+ output_byte_limit,
+ terminal,
+ cx,
+ );
+
+ if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
+ for data in chunks.drain(..) {
+ entity.update(cx, |term, cx| {
+ term.inner().update(cx, |inner, cx| {
+ inner.write_output(&data, cx);
+ })
+ });
+ }
+ }
+
+ if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
+ entity.update(cx, |_term, cx| {
+ cx.notify();
+ });
+ }
+
+ cx.notify();
+ }
+ TerminalProviderEvent::Output { terminal_id, data } => {
+ if let Some(entity) = self.terminals.get(&terminal_id) {
+ entity.update(cx, |term, cx| {
+ term.inner().update(cx, |inner, cx| {
+ inner.write_output(&data, cx);
+ })
+ });
+ } else {
+ self.pending_terminal_output
+ .entry(terminal_id)
+ .or_default()
+ .push(data);
+ }
+ }
+ TerminalProviderEvent::TitleChanged { terminal_id, title } => {
+ if let Some(entity) = self.terminals.get(&terminal_id) {
+ entity.update(cx, |term, cx| {
+ term.inner().update(cx, |inner, cx| {
+ inner.breadcrumb_text = title;
+ cx.emit(::terminal::Event::BreadcrumbsChanged);
+ })
+ });
+ }
+ }
+ TerminalProviderEvent::Exit {
+ terminal_id,
+ status,
+ } => {
+ if let Some(entity) = self.terminals.get(&terminal_id) {
+ entity.update(cx, |_term, cx| {
+ cx.notify();
+ });
+ } else {
+ self.pending_terminal_exit.insert(terminal_id, status);
+ }
+ }
+ }
+ }
+}
+
#[derive(PartialEq, Eq, Debug)]
pub enum ThreadStatus {
Idle,
@@ -887,6 +1009,8 @@ impl AcpThread {
prompt_capabilities,
_observe_prompt_capabilities: task,
terminals: HashMap::default(),
+ pending_terminal_output: HashMap::default(),
+ pending_terminal_exit: HashMap::default(),
}
}
@@ -2079,6 +2203,32 @@ impl AcpThread {
pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
cx.emit(AcpThreadEvent::LoadError(error));
}
+
+ pub fn register_terminal_created(
+ &mut self,
+ terminal_id: acp::TerminalId,
+ command_label: String,
+ working_dir: Option<PathBuf>,
+ output_byte_limit: Option<u64>,
+ terminal: Entity<::terminal::Terminal>,
+ cx: &mut Context<Self>,
+ ) -> Entity<Terminal> {
+ let language_registry = self.project.read(cx).languages().clone();
+
+ let entity = cx.new(|cx| {
+ Terminal::new(
+ terminal_id.clone(),
+ &command_label,
+ working_dir.clone(),
+ output_byte_limit.map(|l| l as usize),
+ terminal,
+ language_registry,
+ cx,
+ )
+ });
+ self.terminals.insert(terminal_id.clone(), entity.clone());
+ entity
+ }
}
fn markdown_for_raw_output(
@@ -2155,6 +2305,149 @@ mod tests {
});
}
+ #[gpui::test]
+ async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
+ init_test(cx);
+
+ let fs = FakeFs::new(cx.executor());
+ let project = Project::test(fs, [], cx).await;
+ let connection = Rc::new(FakeAgentConnection::new());
+ let thread = cx
+ .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
+ .await
+ .unwrap();
+
+ let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
+
+ // Send Output BEFORE Created - should be buffered by acp_thread
+ thread.update(cx, |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Output {
+ terminal_id: terminal_id.clone(),
+ data: b"hello buffered".to_vec(),
+ },
+ cx,
+ );
+ });
+
+ // Create a display-only terminal and then send Created
+ let lower = cx.new(|cx| {
+ let builder = ::terminal::TerminalBuilder::new_display_only(
+ None,
+ ::terminal::terminal_settings::CursorShape::default(),
+ ::terminal::terminal_settings::AlternateScroll::On,
+ None,
+ 0,
+ cx,
+ )
+ .unwrap();
+ builder.subscribe(cx)
+ });
+
+ thread.update(cx, |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Created {
+ terminal_id: terminal_id.clone(),
+ label: "Buffered Test".to_string(),
+ cwd: None,
+ output_byte_limit: None,
+ terminal: lower.clone(),
+ },
+ cx,
+ );
+ });
+
+ // After Created, buffered Output should have been flushed into the renderer
+ let content = thread.read_with(cx, |thread, cx| {
+ let term = thread.terminal(terminal_id.clone()).unwrap();
+ term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
+ });
+
+ assert!(
+ content.contains("hello buffered"),
+ "expected buffered output to render, got: {content}"
+ );
+ }
+
+ #[gpui::test]
+ async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
+ init_test(cx);
+
+ let fs = FakeFs::new(cx.executor());
+ let project = Project::test(fs, [], cx).await;
+ let connection = Rc::new(FakeAgentConnection::new());
+ let thread = cx
+ .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
+ .await
+ .unwrap();
+
+ let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
+
+ // Send Output BEFORE Created
+ thread.update(cx, |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Output {
+ terminal_id: terminal_id.clone(),
+ data: b"pre-exit data".to_vec(),
+ },
+ cx,
+ );
+ });
+
+ // Send Exit BEFORE Created
+ thread.update(cx, |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Exit {
+ terminal_id: terminal_id.clone(),
+ status: acp::TerminalExitStatus {
+ exit_code: Some(0),
+ signal: None,
+ meta: None,
+ },
+ },
+ cx,
+ );
+ });
+
+ // Now create a display-only lower-level terminal and send Created
+ let lower = cx.new(|cx| {
+ let builder = ::terminal::TerminalBuilder::new_display_only(
+ None,
+ ::terminal::terminal_settings::CursorShape::default(),
+ ::terminal::terminal_settings::AlternateScroll::On,
+ None,
+ 0,
+ cx,
+ )
+ .unwrap();
+ builder.subscribe(cx)
+ });
+
+ thread.update(cx, |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Created {
+ terminal_id: terminal_id.clone(),
+ label: "Buffered Exit Test".to_string(),
+ cwd: None,
+ output_byte_limit: None,
+ terminal: lower.clone(),
+ },
+ cx,
+ );
+ });
+
+ // Output should be present after Created (flushed from buffer)
+ let content = thread.read_with(cx, |thread, cx| {
+ let term = thread.terminal(terminal_id.clone()).unwrap();
+ term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
+ });
+
+ assert!(
+ content.contains("pre-exit data"),
+ "expected pre-exit data to render, got: {content}"
+ );
+ }
+
#[gpui::test]
async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
init_test(cx);
@@ -47,6 +47,8 @@ task.workspace = true
tempfile.workspace = true
thiserror.workspace = true
ui.workspace = true
+terminal.workspace = true
+uuid.workspace = true
util.workspace = true
watch.workspace = true
workspace-hack.workspace = true
@@ -19,7 +19,9 @@ use thiserror::Error;
use anyhow::{Context as _, Result};
use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
-use acp_thread::{AcpThread, AuthRequired, LoadError};
+use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
+use terminal::TerminalBuilder;
+use terminal::terminal_settings::{AlternateScroll, CursorShape};
#[derive(Debug, Error)]
#[error("Unsupported version")]
@@ -700,10 +702,99 @@ impl acp::Client for ClientDelegate {
}
}
+ // Clone so we can inspect meta both before and after handing off to the thread
+ let update_clone = notification.update.clone();
+
+ // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
+ if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
+ if let Some(meta) = &tc.meta {
+ if let Some(terminal_info) = meta.get("terminal_info") {
+ if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
+ {
+ let terminal_id = acp::TerminalId(id_str.into());
+
+ // Create a minimal display-only lower-level terminal and register it.
+ let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
+ let builder = TerminalBuilder::new_display_only(
+ None,
+ CursorShape::default(),
+ AlternateScroll::On,
+ None,
+ 0,
+ cx,
+ )?;
+ let lower = cx.new(|cx| builder.subscribe(cx));
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Created {
+ terminal_id: terminal_id.clone(),
+ label: tc.title.clone(),
+ cwd: None,
+ output_byte_limit: None,
+ terminal: lower,
+ },
+ cx,
+ );
+ anyhow::Ok(())
+ });
+ }
+ }
+ }
+ }
+
+ // Forward the update to the acp_thread as usual.
session.thread.update(&mut self.cx.clone(), |thread, cx| {
- thread.handle_session_update(notification.update, cx)
+ thread.handle_session_update(notification.update.clone(), cx)
})??;
+ // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
+ if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
+ if let Some(meta) = &tcu.meta {
+ if let Some(term_out) = meta.get("terminal_output") {
+ if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
+ let terminal_id = acp::TerminalId(id_str.into());
+ if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
+ let data = s.as_bytes().to_vec();
+ let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Output {
+ terminal_id: terminal_id.clone(),
+ data,
+ },
+ cx,
+ );
+ });
+ }
+ }
+ }
+
+ // terminal_exit
+ if let Some(term_exit) = meta.get("terminal_exit") {
+ if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
+ let terminal_id = acp::TerminalId(id_str.into());
+ let status = acp::TerminalExitStatus {
+ exit_code: term_exit
+ .get("exit_code")
+ .and_then(|v| v.as_u64())
+ .map(|i| i as u32),
+ signal: term_exit
+ .get("signal")
+ .and_then(|v| v.as_str().map(|s| s.to_string())),
+ meta: None,
+ };
+ let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
+ thread.on_terminal_provider_event(
+ TerminalProviderEvent::Exit {
+ terminal_id: terminal_id.clone(),
+ status,
+ },
+ cx,
+ );
+ });
+ }
+ }
+ }
+ }
+
Ok(())
}
@@ -711,25 +802,66 @@ impl acp::Client for ClientDelegate {
&self,
args: acp::CreateTerminalRequest,
) -> Result<acp::CreateTerminalResponse, acp::Error> {
- let terminal = self
- .session_thread(&args.session_id)?
- .update(&mut self.cx.clone(), |thread, cx| {
- thread.create_terminal(
- args.command,
- args.args,
- args.env,
- args.cwd,
- args.output_byte_limit,
+ let thread = self.session_thread(&args.session_id)?;
+ let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
+
+ let mut env = if let Some(dir) = &args.cwd {
+ project
+ .update(&mut self.cx.clone(), |project, cx| {
+ project.directory_environment(&task::Shell::System, dir.clone().into(), cx)
+ })?
+ .await
+ .unwrap_or_default()
+ } else {
+ Default::default()
+ };
+ for var in args.env {
+ env.insert(var.name, var.value);
+ }
+
+ // Use remote shell or default system shell, as appropriate
+ let remote_shell = project.update(&mut self.cx.clone(), |project, cx| {
+ project
+ .remote_client()
+ .and_then(|r| r.read(cx).default_system_shell())
+ })?;
+ let (task_command, task_args) =
+ task::ShellBuilder::new(remote_shell.as_deref(), &task::Shell::System)
+ .redirect_stdin_to_dev_null()
+ .build(Some(args.command.clone()), &args.args);
+
+ let terminal_entity = project
+ .update(&mut self.cx.clone(), |project, cx| {
+ project.create_terminal_task(
+ task::SpawnInTerminal {
+ command: Some(task_command),
+ args: task_args,
+ cwd: args.cwd.clone(),
+ env,
+ ..Default::default()
+ },
cx,
)
})?
.await?;
- Ok(
- terminal.read_with(&self.cx, |terminal, _| acp::CreateTerminalResponse {
- terminal_id: terminal.id().clone(),
- meta: None,
- })?,
- )
+
+ // Register with renderer
+ let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
+ thread.register_terminal_created(
+ acp::TerminalId(uuid::Uuid::new_v4().to_string().into()),
+ format!("{} {}", args.command, args.args.join(" ")),
+ args.cwd.clone(),
+ args.output_byte_limit,
+ terminal_entity,
+ cx,
+ )
+ })?;
+ let terminal_id =
+ terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
+ Ok(acp::CreateTerminalResponse {
+ terminal_id,
+ meta: None,
+ })
}
async fn kill_terminal_command(
@@ -339,6 +339,29 @@ pub struct TerminalBuilder {
}
impl TerminalBuilder {
+ pub fn new_display_only(
+ working_directory: Option<PathBuf>,
+ cursor_shape: CursorShape,
+ alternate_scroll: AlternateScroll,
+ max_scroll_history_lines: Option<usize>,
+ window_id: u64,
+ cx: &App,
+ ) -> Result<TerminalBuilder> {
+ Self::new(
+ working_directory,
+ None,
+ Shell::System,
+ HashMap::default(),
+ cursor_shape,
+ alternate_scroll,
+ max_scroll_history_lines,
+ false,
+ window_id,
+ None,
+ cx,
+ Vec::new(),
+ )
+ }
pub fn new(
working_directory: Option<PathBuf>,
task: Option<TaskState>,
@@ -1132,6 +1155,19 @@ impl Terminal {
self.term.lock().set_options(self.term_config.clone());
}
+ pub fn write_output(&mut self, bytes: &[u8], cx: &mut Context<Self>) {
+ // Inject bytes directly into the terminal emulator and refresh the UI.
+ // This bypasses the PTY/event loop for display-only terminals.
+ let mut processor = alacritty_terminal::vte::ansi::Processor::<
+ alacritty_terminal::vte::ansi::StdSyncHandler,
+ >::new();
+ {
+ let mut term = self.term.lock();
+ processor.advance(&mut *term, bytes);
+ }
+ cx.emit(Event::Wakeup);
+ }
+
pub fn total_lines(&self) -> usize {
let term = self.term.clone();
let terminal = term.lock_unfair();
@@ -2272,6 +2308,17 @@ mod tests {
terminal.update(cx, |term, _| term.get_content()).trim(),
"hello"
);
+
+ // Inject additional output directly into the emulator (display-only path)
+ terminal.update(cx, |term, cx| {
+ term.write_output(b"\nfrom_injection", cx);
+ });
+
+ let content_after = terminal.update(cx, |term, _| term.get_content());
+ assert!(
+ content_after.contains("from_injection"),
+ "expected injected output to appear, got: {content_after}"
+ );
}
// TODO should be tested on Linux too, but does not work there well