acp.rs

  1use acp_thread::AgentConnection;
  2use acp_tools::AcpConnectionRegistry;
  3use action_log::ActionLog;
  4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
  5use anyhow::anyhow;
  6use collections::HashMap;
  7use futures::AsyncBufReadExt as _;
  8use futures::io::BufReader;
  9use project::Project;
 10use project::agent_server_store::AgentServerCommand;
 11use serde::Deserialize;
 12use util::ResultExt as _;
 13
 14use std::path::PathBuf;
 15use std::{any::Any, cell::RefCell};
 16use std::{path::Path, rc::Rc};
 17use thiserror::Error;
 18
 19use anyhow::{Context as _, Result};
 20use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 21
 22use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
 23use terminal::TerminalBuilder;
 24use terminal::terminal_settings::{AlternateScroll, CursorShape};
 25
 26#[derive(Debug, Error)]
 27#[error("Unsupported version")]
 28pub struct UnsupportedVersion;
 29
 30pub struct AcpConnection {
 31    server_name: SharedString,
 32    telemetry_id: &'static str,
 33    connection: Rc<acp::ClientSideConnection>,
 34    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 35    auth_methods: Vec<acp::AuthMethod>,
 36    agent_capabilities: acp::AgentCapabilities,
 37    default_mode: Option<acp::SessionModeId>,
 38    default_model: Option<acp::ModelId>,
 39    root_dir: PathBuf,
 40    // NB: Don't move this into the wait_task, since we need to ensure the process is
 41    // killed on drop (setting kill_on_drop on the command seems to not always work).
 42    child: smol::process::Child,
 43    _io_task: Task<Result<(), acp::Error>>,
 44    _wait_task: Task<Result<()>>,
 45    _stderr_task: Task<Result<()>>,
 46}
 47
 48pub struct AcpSession {
 49    thread: WeakEntity<AcpThread>,
 50    suppress_abort_err: bool,
 51    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 52    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 53}
 54
 55pub async fn connect(
 56    server_name: SharedString,
 57    telemetry_id: &'static str,
 58    command: AgentServerCommand,
 59    root_dir: &Path,
 60    default_mode: Option<acp::SessionModeId>,
 61    default_model: Option<acp::ModelId>,
 62    is_remote: bool,
 63    cx: &mut AsyncApp,
 64) -> Result<Rc<dyn AgentConnection>> {
 65    let conn = AcpConnection::stdio(
 66        server_name,
 67        telemetry_id,
 68        command.clone(),
 69        root_dir,
 70        default_mode,
 71        default_model,
 72        is_remote,
 73        cx,
 74    )
 75    .await?;
 76    Ok(Rc::new(conn) as _)
 77}
 78
 79const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 80
 81impl AcpConnection {
 82    pub async fn stdio(
 83        server_name: SharedString,
 84        telemetry_id: &'static str,
 85        command: AgentServerCommand,
 86        root_dir: &Path,
 87        default_mode: Option<acp::SessionModeId>,
 88        default_model: Option<acp::ModelId>,
 89        is_remote: bool,
 90        cx: &mut AsyncApp,
 91    ) -> Result<Self> {
 92        let mut child = util::command::new_smol_command(&command.path);
 93        child
 94            .args(command.args.iter().map(|arg| arg.as_str()))
 95            .envs(command.env.iter().flatten())
 96            .stdin(std::process::Stdio::piped())
 97            .stdout(std::process::Stdio::piped())
 98            .stderr(std::process::Stdio::piped());
 99        if !is_remote {
100            child.current_dir(root_dir);
101        }
102        let mut child = child.spawn()?;
103
104        let stdout = child.stdout.take().context("Failed to take stdout")?;
105        let stdin = child.stdin.take().context("Failed to take stdin")?;
106        let stderr = child.stderr.take().context("Failed to take stderr")?;
107        log::debug!(
108            "Spawning external agent server: {:?}, {:?}",
109            command.path,
110            command.args
111        );
112        log::trace!("Spawned (pid: {})", child.id());
113
114        let sessions = Rc::new(RefCell::new(HashMap::default()));
115
116        let (release_channel, version) = cx.update(|cx| {
117            (
118                release_channel::ReleaseChannel::try_global(cx)
119                    .map(|release_channel| release_channel.display_name()),
120                release_channel::AppVersion::global(cx).to_string(),
121            )
122        })?;
123
124        let client = ClientDelegate {
125            sessions: sessions.clone(),
126            cx: cx.clone(),
127        };
128        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
129            let foreground_executor = cx.foreground_executor().clone();
130            move |fut| {
131                foreground_executor.spawn(fut).detach();
132            }
133        });
134
135        let io_task = cx.background_spawn(io_task);
136
137        let stderr_task = cx.background_spawn(async move {
138            let mut stderr = BufReader::new(stderr);
139            let mut line = String::new();
140            while let Ok(n) = stderr.read_line(&mut line).await
141                && n > 0
142            {
143                log::warn!("agent stderr: {}", line.trim());
144                line.clear();
145            }
146            Ok(())
147        });
148
149        let wait_task = cx.spawn({
150            let sessions = sessions.clone();
151            let status_fut = child.status();
152            async move |cx| {
153                let status = status_fut.await?;
154
155                for session in sessions.borrow().values() {
156                    session
157                        .thread
158                        .update(cx, |thread, cx| {
159                            thread.emit_load_error(LoadError::Exited { status }, cx)
160                        })
161                        .ok();
162                }
163
164                anyhow::Ok(())
165            }
166        });
167
168        let connection = Rc::new(connection);
169
170        cx.update(|cx| {
171            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
172                registry.set_active_connection(server_name.clone(), &connection, cx)
173            });
174        })?;
175
176        let response = connection
177            .initialize(
178                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
179                    .client_capabilities(
180                        acp::ClientCapabilities::new()
181                            .fs(acp::FileSystemCapability::new()
182                                .read_text_file(true)
183                                .write_text_file(true))
184                            .terminal(true)
185                            // Experimental: Allow for rendering terminal output from the agents
186                            .meta(acp::Meta::from_iter([
187                                ("terminal_output".into(), true.into()),
188                                ("terminal-auth".into(), true.into()),
189                            ])),
190                    )
191                    .client_info(
192                        acp::Implementation::new("zed", version)
193                            .title(release_channel.map(ToOwned::to_owned)),
194                    ),
195            )
196            .await?;
197
198        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
199            return Err(UnsupportedVersion.into());
200        }
201
202        Ok(Self {
203            auth_methods: response.auth_methods,
204            root_dir: root_dir.to_owned(),
205            connection,
206            server_name,
207            telemetry_id,
208            sessions,
209            agent_capabilities: response.agent_capabilities,
210            default_mode,
211            default_model,
212            _io_task: io_task,
213            _wait_task: wait_task,
214            _stderr_task: stderr_task,
215            child,
216        })
217    }
218
219    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
220        &self.agent_capabilities.prompt_capabilities
221    }
222
223    pub fn root_dir(&self) -> &Path {
224        &self.root_dir
225    }
226}
227
228impl Drop for AcpConnection {
229    fn drop(&mut self) {
230        // See the comment on the child field.
231        self.child.kill().log_err();
232    }
233}
234
235impl AgentConnection for AcpConnection {
236    fn telemetry_id(&self) -> &'static str {
237        self.telemetry_id
238    }
239
240    fn new_thread(
241        self: Rc<Self>,
242        project: Entity<Project>,
243        cwd: &Path,
244        cx: &mut App,
245    ) -> Task<Result<Entity<AcpThread>>> {
246        let name = self.server_name.clone();
247        let conn = self.connection.clone();
248        let sessions = self.sessions.clone();
249        let default_mode = self.default_mode.clone();
250        let default_model = self.default_model.clone();
251        let cwd = cwd.to_path_buf();
252        let context_server_store = project.read(cx).context_server_store().read(cx);
253        let mcp_servers = if project.read(cx).is_local() {
254            context_server_store
255                .configured_server_ids()
256                .iter()
257                .filter_map(|id| {
258                    let configuration = context_server_store.configuration_for_server(id)?;
259                    match &*configuration {
260                        project::context_server_store::ContextServerConfiguration::Custom {
261                            command,
262                            ..
263                        }
264                        | project::context_server_store::ContextServerConfiguration::Extension {
265                            command,
266                            ..
267                        } => Some(acp::McpServer::Stdio(
268                            acp::McpServerStdio::new(id.0.to_string(), &command.path)
269                                .args(command.args.clone())
270                                .env(if let Some(env) = command.env.as_ref() {
271                                    env.iter()
272                                        .map(|(name, value)| acp::EnvVariable::new(name, value))
273                                        .collect()
274                                } else {
275                                    vec![]
276                                }),
277                        )),
278                        project::context_server_store::ContextServerConfiguration::Http {
279                            url,
280                            headers,
281                        } => Some(acp::McpServer::Http(
282                            acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
283                                headers
284                                    .iter()
285                                    .map(|(name, value)| acp::HttpHeader::new(name, value))
286                                    .collect(),
287                            ),
288                        )),
289                    }
290                })
291                .collect()
292        } else {
293            // In SSH projects, the external agent is running on the remote
294            // machine, and currently we only run MCP servers on the local
295            // machine. So don't pass any MCP servers to the agent in that case.
296            Vec::new()
297        };
298
299        cx.spawn(async move |cx| {
300            let response = conn
301                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
302                .await
303                .map_err(|err| {
304                    if err.code == acp::ErrorCode::AuthRequired {
305                        let mut error = AuthRequired::new();
306
307                        if err.message != acp::ErrorCode::AuthRequired.to_string() {
308                            error = error.with_description(err.message);
309                        }
310
311                        anyhow!(error)
312                    } else {
313                        anyhow!(err)
314                    }
315                })?;
316
317            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
318            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
319
320            if let Some(default_mode) = default_mode {
321                if let Some(modes) = modes.as_ref() {
322                    let mut modes_ref = modes.borrow_mut();
323                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
324
325                    if has_mode {
326                        let initial_mode_id = modes_ref.current_mode_id.clone();
327
328                        cx.spawn({
329                            let default_mode = default_mode.clone();
330                            let session_id = response.session_id.clone();
331                            let modes = modes.clone();
332                            let conn = conn.clone();
333                            async move |_| {
334                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
335                                .await.log_err();
336
337                                if result.is_none() {
338                                    modes.borrow_mut().current_mode_id = initial_mode_id;
339                                }
340                            }
341                        }).detach();
342
343                        modes_ref.current_mode_id = default_mode;
344                    } else {
345                        let available_modes = modes_ref
346                            .available_modes
347                            .iter()
348                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
349                            .collect::<Vec<_>>()
350                            .join("\n");
351
352                        log::warn!(
353                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
354                        );
355                    }
356                } else {
357                    log::warn!(
358                        "`{name}` does not support modes, but `default_mode` was set in settings.",
359                    );
360                }
361            }
362
363            if let Some(default_model) = default_model {
364                if let Some(models) = models.as_ref() {
365                    let mut models_ref = models.borrow_mut();
366                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
367
368                    if has_model {
369                        let initial_model_id = models_ref.current_model_id.clone();
370
371                        cx.spawn({
372                            let default_model = default_model.clone();
373                            let session_id = response.session_id.clone();
374                            let models = models.clone();
375                            let conn = conn.clone();
376                            async move |_| {
377                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
378                                .await.log_err();
379
380                                if result.is_none() {
381                                    models.borrow_mut().current_model_id = initial_model_id;
382                                }
383                            }
384                        }).detach();
385
386                        models_ref.current_model_id = default_model;
387                    } else {
388                        let available_models = models_ref
389                            .available_models
390                            .iter()
391                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
392                            .collect::<Vec<_>>()
393                            .join("\n");
394
395                        log::warn!(
396                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
397                        );
398                    }
399                } else {
400                    log::warn!(
401                        "`{name}` does not support model selection, but `default_model` was set in settings.",
402                    );
403                }
404            }
405
406            let session_id = response.session_id;
407            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
408            let thread = cx.new(|cx| {
409                AcpThread::new(
410                    self.server_name.clone(),
411                    self.clone(),
412                    project,
413                    action_log,
414                    session_id.clone(),
415                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
416                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
417                    cx,
418                )
419            })?;
420
421
422            let session = AcpSession {
423                thread: thread.downgrade(),
424                suppress_abort_err: false,
425                session_modes: modes,
426                models,
427            };
428            sessions.borrow_mut().insert(session_id, session);
429
430            Ok(thread)
431        })
432    }
433
434    fn auth_methods(&self) -> &[acp::AuthMethod] {
435        &self.auth_methods
436    }
437
438    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
439        let conn = self.connection.clone();
440        cx.foreground_executor().spawn(async move {
441            conn.authenticate(acp::AuthenticateRequest::new(method_id))
442                .await?;
443            Ok(())
444        })
445    }
446
447    fn prompt(
448        &self,
449        _id: Option<acp_thread::UserMessageId>,
450        params: acp::PromptRequest,
451        cx: &mut App,
452    ) -> Task<Result<acp::PromptResponse>> {
453        let conn = self.connection.clone();
454        let sessions = self.sessions.clone();
455        let session_id = params.session_id.clone();
456        cx.foreground_executor().spawn(async move {
457            let result = conn.prompt(params).await;
458
459            let mut suppress_abort_err = false;
460
461            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
462                suppress_abort_err = session.suppress_abort_err;
463                session.suppress_abort_err = false;
464            }
465
466            match result {
467                Ok(response) => Ok(response),
468                Err(err) => {
469                    if err.code == acp::ErrorCode::AuthRequired {
470                        return Err(anyhow!(acp::Error::auth_required()));
471                    }
472
473                    if err.code != ErrorCode::InternalError {
474                        anyhow::bail!(err)
475                    }
476
477                    let Some(data) = &err.data else {
478                        anyhow::bail!(err)
479                    };
480
481                    // Temporary workaround until the following PR is generally available:
482                    // https://github.com/google-gemini/gemini-cli/pull/6656
483
484                    #[derive(Deserialize)]
485                    #[serde(deny_unknown_fields)]
486                    struct ErrorDetails {
487                        details: Box<str>,
488                    }
489
490                    match serde_json::from_value(data.clone()) {
491                        Ok(ErrorDetails { details }) => {
492                            if suppress_abort_err
493                                && (details.contains("This operation was aborted")
494                                    || details.contains("The user aborted a request"))
495                            {
496                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
497                            } else {
498                                Err(anyhow!(details))
499                            }
500                        }
501                        Err(_) => Err(anyhow!(err)),
502                    }
503                }
504            }
505        })
506    }
507
508    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
509        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
510            session.suppress_abort_err = true;
511        }
512        let conn = self.connection.clone();
513        let params = acp::CancelNotification::new(session_id.clone());
514        cx.foreground_executor()
515            .spawn(async move { conn.cancel(params).await })
516            .detach();
517    }
518
519    fn session_modes(
520        &self,
521        session_id: &acp::SessionId,
522        _cx: &App,
523    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
524        let sessions = self.sessions.clone();
525        let sessions_ref = sessions.borrow();
526        let Some(session) = sessions_ref.get(session_id) else {
527            return None;
528        };
529
530        if let Some(modes) = session.session_modes.as_ref() {
531            Some(Rc::new(AcpSessionModes {
532                connection: self.connection.clone(),
533                session_id: session_id.clone(),
534                state: modes.clone(),
535            }) as _)
536        } else {
537            None
538        }
539    }
540
541    fn model_selector(
542        &self,
543        session_id: &acp::SessionId,
544    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
545        let sessions = self.sessions.clone();
546        let sessions_ref = sessions.borrow();
547        let Some(session) = sessions_ref.get(session_id) else {
548            return None;
549        };
550
551        if let Some(models) = session.models.as_ref() {
552            Some(Rc::new(AcpModelSelector::new(
553                session_id.clone(),
554                self.connection.clone(),
555                models.clone(),
556            )) as _)
557        } else {
558            None
559        }
560    }
561
562    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
563        self
564    }
565}
566
567struct AcpSessionModes {
568    session_id: acp::SessionId,
569    connection: Rc<acp::ClientSideConnection>,
570    state: Rc<RefCell<acp::SessionModeState>>,
571}
572
573impl acp_thread::AgentSessionModes for AcpSessionModes {
574    fn current_mode(&self) -> acp::SessionModeId {
575        self.state.borrow().current_mode_id.clone()
576    }
577
578    fn all_modes(&self) -> Vec<acp::SessionMode> {
579        self.state.borrow().available_modes.clone()
580    }
581
582    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
583        let connection = self.connection.clone();
584        let session_id = self.session_id.clone();
585        let old_mode_id;
586        {
587            let mut state = self.state.borrow_mut();
588            old_mode_id = state.current_mode_id.clone();
589            state.current_mode_id = mode_id.clone();
590        };
591        let state = self.state.clone();
592        cx.foreground_executor().spawn(async move {
593            let result = connection
594                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
595                .await;
596
597            if result.is_err() {
598                state.borrow_mut().current_mode_id = old_mode_id;
599            }
600
601            result?;
602
603            Ok(())
604        })
605    }
606}
607
608struct AcpModelSelector {
609    session_id: acp::SessionId,
610    connection: Rc<acp::ClientSideConnection>,
611    state: Rc<RefCell<acp::SessionModelState>>,
612}
613
614impl AcpModelSelector {
615    fn new(
616        session_id: acp::SessionId,
617        connection: Rc<acp::ClientSideConnection>,
618        state: Rc<RefCell<acp::SessionModelState>>,
619    ) -> Self {
620        Self {
621            session_id,
622            connection,
623            state,
624        }
625    }
626}
627
628impl acp_thread::AgentModelSelector for AcpModelSelector {
629    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
630        Task::ready(Ok(acp_thread::AgentModelList::Flat(
631            self.state
632                .borrow()
633                .available_models
634                .clone()
635                .into_iter()
636                .map(acp_thread::AgentModelInfo::from)
637                .collect(),
638        )))
639    }
640
641    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
642        let connection = self.connection.clone();
643        let session_id = self.session_id.clone();
644        let old_model_id;
645        {
646            let mut state = self.state.borrow_mut();
647            old_model_id = state.current_model_id.clone();
648            state.current_model_id = model_id.clone();
649        };
650        let state = self.state.clone();
651        cx.foreground_executor().spawn(async move {
652            let result = connection
653                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
654                .await;
655
656            if result.is_err() {
657                state.borrow_mut().current_model_id = old_model_id;
658            }
659
660            result?;
661
662            Ok(())
663        })
664    }
665
666    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
667        let state = self.state.borrow();
668        Task::ready(
669            state
670                .available_models
671                .iter()
672                .find(|m| m.model_id == state.current_model_id)
673                .cloned()
674                .map(acp_thread::AgentModelInfo::from)
675                .ok_or_else(|| anyhow::anyhow!("Model not found")),
676        )
677    }
678}
679
680struct ClientDelegate {
681    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
682    cx: AsyncApp,
683}
684
685#[async_trait::async_trait(?Send)]
686impl acp::Client for ClientDelegate {
687    async fn request_permission(
688        &self,
689        arguments: acp::RequestPermissionRequest,
690    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
691        let respect_always_allow_setting;
692        let thread;
693        {
694            let sessions_ref = self.sessions.borrow();
695            let session = sessions_ref
696                .get(&arguments.session_id)
697                .context("Failed to get session")?;
698            respect_always_allow_setting = session.session_modes.is_none();
699            thread = session.thread.clone();
700        }
701
702        let cx = &mut self.cx.clone();
703
704        let task = thread.update(cx, |thread, cx| {
705            thread.request_tool_call_authorization(
706                arguments.tool_call,
707                arguments.options,
708                respect_always_allow_setting,
709                cx,
710            )
711        })??;
712
713        let outcome = task.await;
714
715        Ok(acp::RequestPermissionResponse::new(outcome))
716    }
717
718    async fn write_text_file(
719        &self,
720        arguments: acp::WriteTextFileRequest,
721    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
722        let cx = &mut self.cx.clone();
723        let task = self
724            .session_thread(&arguments.session_id)?
725            .update(cx, |thread, cx| {
726                thread.write_text_file(arguments.path, arguments.content, cx)
727            })?;
728
729        task.await?;
730
731        Ok(Default::default())
732    }
733
734    async fn read_text_file(
735        &self,
736        arguments: acp::ReadTextFileRequest,
737    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
738        let task = self.session_thread(&arguments.session_id)?.update(
739            &mut self.cx.clone(),
740            |thread, cx| {
741                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
742            },
743        )?;
744
745        let content = task.await?;
746
747        Ok(acp::ReadTextFileResponse::new(content))
748    }
749
750    async fn session_notification(
751        &self,
752        notification: acp::SessionNotification,
753    ) -> Result<(), acp::Error> {
754        let sessions = self.sessions.borrow();
755        let session = sessions
756            .get(&notification.session_id)
757            .context("Failed to get session")?;
758
759        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
760            current_mode_id,
761            ..
762        }) = &notification.update
763        {
764            if let Some(session_modes) = &session.session_modes {
765                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
766            } else {
767                log::error!(
768                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
769                );
770            }
771        }
772
773        // Clone so we can inspect meta both before and after handing off to the thread
774        let update_clone = notification.update.clone();
775
776        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
777        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
778            if let Some(meta) = &tc.meta {
779                if let Some(terminal_info) = meta.get("terminal_info") {
780                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
781                    {
782                        let terminal_id = acp::TerminalId::new(id_str);
783                        let cwd = terminal_info
784                            .get("cwd")
785                            .and_then(|v| v.as_str().map(PathBuf::from));
786
787                        // Create a minimal display-only lower-level terminal and register it.
788                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
789                            let builder = TerminalBuilder::new_display_only(
790                                CursorShape::default(),
791                                AlternateScroll::On,
792                                None,
793                                0,
794                            )?;
795                            let lower = cx.new(|cx| builder.subscribe(cx));
796                            thread.on_terminal_provider_event(
797                                TerminalProviderEvent::Created {
798                                    terminal_id,
799                                    label: tc.title.clone(),
800                                    cwd,
801                                    output_byte_limit: None,
802                                    terminal: lower,
803                                },
804                                cx,
805                            );
806                            anyhow::Ok(())
807                        });
808                    }
809                }
810            }
811        }
812
813        // Forward the update to the acp_thread as usual.
814        session.thread.update(&mut self.cx.clone(), |thread, cx| {
815            thread.handle_session_update(notification.update.clone(), cx)
816        })??;
817
818        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
819        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
820            if let Some(meta) = &tcu.meta {
821                if let Some(term_out) = meta.get("terminal_output") {
822                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
823                        let terminal_id = acp::TerminalId::new(id_str);
824                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
825                            let data = s.as_bytes().to_vec();
826                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
827                                thread.on_terminal_provider_event(
828                                    TerminalProviderEvent::Output { terminal_id, data },
829                                    cx,
830                                );
831                            });
832                        }
833                    }
834                }
835
836                // terminal_exit
837                if let Some(term_exit) = meta.get("terminal_exit") {
838                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
839                        let terminal_id = acp::TerminalId::new(id_str);
840                        let status = acp::TerminalExitStatus::new()
841                            .exit_code(
842                                term_exit
843                                    .get("exit_code")
844                                    .and_then(|v| v.as_u64())
845                                    .map(|i| i as u32),
846                            )
847                            .signal(
848                                term_exit
849                                    .get("signal")
850                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
851                            );
852
853                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
854                            thread.on_terminal_provider_event(
855                                TerminalProviderEvent::Exit {
856                                    terminal_id,
857                                    status,
858                                },
859                                cx,
860                            );
861                        });
862                    }
863                }
864            }
865        }
866
867        Ok(())
868    }
869
870    async fn create_terminal(
871        &self,
872        args: acp::CreateTerminalRequest,
873    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
874        let thread = self.session_thread(&args.session_id)?;
875        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
876
877        let terminal_entity = acp_thread::create_terminal_entity(
878            args.command.clone(),
879            &args.args,
880            args.env
881                .into_iter()
882                .map(|env| (env.name, env.value))
883                .collect(),
884            args.cwd.clone(),
885            &project,
886            &mut self.cx.clone(),
887        )
888        .await?;
889
890        // Register with renderer
891        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
892            thread.register_terminal_created(
893                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
894                format!("{} {}", args.command, args.args.join(" ")),
895                args.cwd.clone(),
896                args.output_byte_limit,
897                terminal_entity,
898                cx,
899            )
900        })?;
901        let terminal_id =
902            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
903        Ok(acp::CreateTerminalResponse::new(terminal_id))
904    }
905
906    async fn kill_terminal_command(
907        &self,
908        args: acp::KillTerminalCommandRequest,
909    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
910        self.session_thread(&args.session_id)?
911            .update(&mut self.cx.clone(), |thread, cx| {
912                thread.kill_terminal(args.terminal_id, cx)
913            })??;
914
915        Ok(Default::default())
916    }
917
918    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
919        Err(acp::Error::method_not_found())
920    }
921
922    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
923        Err(acp::Error::method_not_found())
924    }
925
926    async fn release_terminal(
927        &self,
928        args: acp::ReleaseTerminalRequest,
929    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
930        self.session_thread(&args.session_id)?
931            .update(&mut self.cx.clone(), |thread, cx| {
932                thread.release_terminal(args.terminal_id, cx)
933            })??;
934
935        Ok(Default::default())
936    }
937
938    async fn terminal_output(
939        &self,
940        args: acp::TerminalOutputRequest,
941    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
942        self.session_thread(&args.session_id)?
943            .read_with(&mut self.cx.clone(), |thread, cx| {
944                let out = thread
945                    .terminal(args.terminal_id)?
946                    .read(cx)
947                    .current_output(cx);
948
949                Ok(out)
950            })?
951    }
952
953    async fn wait_for_terminal_exit(
954        &self,
955        args: acp::WaitForTerminalExitRequest,
956    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
957        let exit_status = self
958            .session_thread(&args.session_id)?
959            .update(&mut self.cx.clone(), |thread, cx| {
960                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
961            })??
962            .await;
963
964        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
965    }
966}
967
968impl ClientDelegate {
969    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
970        let sessions = self.sessions.borrow();
971        sessions
972            .get(session_id)
973            .context("Failed to get session")
974            .map(|session| session.thread.clone())
975    }
976}