acp.rs

  1use acp_thread::AgentConnection;
  2use acp_tools::AcpConnectionRegistry;
  3use action_log::ActionLog;
  4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
  5use anyhow::anyhow;
  6use collections::HashMap;
  7use futures::AsyncBufReadExt as _;
  8use futures::io::BufReader;
  9use project::Project;
 10use project::agent_server_store::AgentServerCommand;
 11use serde::Deserialize;
 12use util::ResultExt as _;
 13
 14use std::path::PathBuf;
 15use std::{any::Any, cell::RefCell};
 16use std::{path::Path, rc::Rc};
 17use thiserror::Error;
 18
 19use anyhow::{Context as _, Result};
 20use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 21
 22use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
 23use terminal::TerminalBuilder;
 24use terminal::terminal_settings::{AlternateScroll, CursorShape};
 25
 26#[derive(Debug, Error)]
 27#[error("Unsupported version")]
 28pub struct UnsupportedVersion;
 29
 30pub struct AcpConnection {
 31    server_name: SharedString,
 32    telemetry_id: &'static str,
 33    connection: Rc<acp::ClientSideConnection>,
 34    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 35    auth_methods: Vec<acp::AuthMethod>,
 36    agent_capabilities: acp::AgentCapabilities,
 37    default_mode: Option<acp::SessionModeId>,
 38    default_model: Option<acp::ModelId>,
 39    root_dir: PathBuf,
 40    // NB: Don't move this into the wait_task, since we need to ensure the process is
 41    // killed on drop (setting kill_on_drop on the command seems to not always work).
 42    child: smol::process::Child,
 43    _io_task: Task<Result<(), acp::Error>>,
 44    _wait_task: Task<Result<()>>,
 45    _stderr_task: Task<Result<()>>,
 46}
 47
 48pub struct AcpSession {
 49    thread: WeakEntity<AcpThread>,
 50    suppress_abort_err: bool,
 51    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 52    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 53}
 54
 55pub async fn connect(
 56    server_name: SharedString,
 57    telemetry_id: &'static str,
 58    command: AgentServerCommand,
 59    root_dir: &Path,
 60    default_mode: Option<acp::SessionModeId>,
 61    default_model: Option<acp::ModelId>,
 62    is_remote: bool,
 63    cx: &mut AsyncApp,
 64) -> Result<Rc<dyn AgentConnection>> {
 65    let conn = AcpConnection::stdio(
 66        server_name,
 67        telemetry_id,
 68        command.clone(),
 69        root_dir,
 70        default_mode,
 71        default_model,
 72        is_remote,
 73        cx,
 74    )
 75    .await?;
 76    Ok(Rc::new(conn) as _)
 77}
 78
 79const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 80
 81impl AcpConnection {
 82    pub async fn stdio(
 83        server_name: SharedString,
 84        telemetry_id: &'static str,
 85        command: AgentServerCommand,
 86        root_dir: &Path,
 87        default_mode: Option<acp::SessionModeId>,
 88        default_model: Option<acp::ModelId>,
 89        is_remote: bool,
 90        cx: &mut AsyncApp,
 91    ) -> Result<Self> {
 92        let mut child = util::command::new_smol_command(&command.path);
 93        child
 94            .args(command.args.iter().map(|arg| arg.as_str()))
 95            .envs(command.env.iter().flatten())
 96            .stdin(std::process::Stdio::piped())
 97            .stdout(std::process::Stdio::piped())
 98            .stderr(std::process::Stdio::piped());
 99        if !is_remote {
100            child.current_dir(root_dir);
101        }
102        let mut child = child.spawn()?;
103
104        let stdout = child.stdout.take().context("Failed to take stdout")?;
105        let stdin = child.stdin.take().context("Failed to take stdin")?;
106        let stderr = child.stderr.take().context("Failed to take stderr")?;
107        log::debug!(
108            "Spawning external agent server: {:?}, {:?}",
109            command.path,
110            command.args
111        );
112        log::trace!("Spawned (pid: {})", child.id());
113
114        let sessions = Rc::new(RefCell::new(HashMap::default()));
115
116        let (release_channel, version) = cx.update(|cx| {
117            (
118                release_channel::ReleaseChannel::try_global(cx)
119                    .map(|release_channel| release_channel.display_name()),
120                release_channel::AppVersion::global(cx).to_string(),
121            )
122        })?;
123
124        let client = ClientDelegate {
125            sessions: sessions.clone(),
126            cx: cx.clone(),
127        };
128        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
129            let foreground_executor = cx.foreground_executor().clone();
130            move |fut| {
131                foreground_executor.spawn(fut).detach();
132            }
133        });
134
135        let io_task = cx.background_spawn(io_task);
136
137        let stderr_task = cx.background_spawn(async move {
138            let mut stderr = BufReader::new(stderr);
139            let mut line = String::new();
140            while let Ok(n) = stderr.read_line(&mut line).await
141                && n > 0
142            {
143                log::warn!("agent stderr: {}", line.trim());
144                line.clear();
145            }
146            Ok(())
147        });
148
149        let wait_task = cx.spawn({
150            let sessions = sessions.clone();
151            let status_fut = child.status();
152            async move |cx| {
153                let status = status_fut.await?;
154
155                for session in sessions.borrow().values() {
156                    session
157                        .thread
158                        .update(cx, |thread, cx| {
159                            thread.emit_load_error(LoadError::Exited { status }, cx)
160                        })
161                        .ok();
162                }
163
164                anyhow::Ok(())
165            }
166        });
167
168        let connection = Rc::new(connection);
169
170        cx.update(|cx| {
171            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
172                registry.set_active_connection(server_name.clone(), &connection, cx)
173            });
174        })?;
175
176        let mut client_info = acp::Implementation::new("zed", version);
177        if let Some(release_channel) = release_channel {
178            client_info = client_info.title(release_channel);
179        }
180        let response = connection
181            .initialize(
182                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
183                    .client_capabilities(
184                        acp::ClientCapabilities::new()
185                            .fs(acp::FileSystemCapability::new()
186                                .read_text_file(true)
187                                .write_text_file(true))
188                            .terminal(true)
189                            // Experimental: Allow for rendering terminal output from the agents
190                            .meta(acp::Meta::from_iter([
191                                ("terminal_output".into(), true.into()),
192                                ("terminal-auth".into(), true.into()),
193                            ])),
194                    )
195                    .client_info(client_info),
196            )
197            .await?;
198
199        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
200            return Err(UnsupportedVersion.into());
201        }
202
203        Ok(Self {
204            auth_methods: response.auth_methods,
205            root_dir: root_dir.to_owned(),
206            connection,
207            server_name,
208            telemetry_id,
209            sessions,
210            agent_capabilities: response.agent_capabilities,
211            default_mode,
212            default_model,
213            _io_task: io_task,
214            _wait_task: wait_task,
215            _stderr_task: stderr_task,
216            child,
217        })
218    }
219
220    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
221        &self.agent_capabilities.prompt_capabilities
222    }
223
224    pub fn root_dir(&self) -> &Path {
225        &self.root_dir
226    }
227}
228
229impl Drop for AcpConnection {
230    fn drop(&mut self) {
231        // See the comment on the child field.
232        self.child.kill().log_err();
233    }
234}
235
236impl AgentConnection for AcpConnection {
237    fn telemetry_id(&self) -> &'static str {
238        self.telemetry_id
239    }
240
241    fn new_thread(
242        self: Rc<Self>,
243        project: Entity<Project>,
244        cwd: &Path,
245        cx: &mut App,
246    ) -> Task<Result<Entity<AcpThread>>> {
247        let name = self.server_name.clone();
248        let conn = self.connection.clone();
249        let sessions = self.sessions.clone();
250        let default_mode = self.default_mode.clone();
251        let default_model = self.default_model.clone();
252        let cwd = cwd.to_path_buf();
253        let context_server_store = project.read(cx).context_server_store().read(cx);
254        let mcp_servers = if project.read(cx).is_local() {
255            context_server_store
256                .configured_server_ids()
257                .iter()
258                .filter_map(|id| {
259                    let configuration = context_server_store.configuration_for_server(id)?;
260                    match &*configuration {
261                        project::context_server_store::ContextServerConfiguration::Custom {
262                            command,
263                            ..
264                        }
265                        | project::context_server_store::ContextServerConfiguration::Extension {
266                            command,
267                            ..
268                        } => Some(acp::McpServer::Stdio(
269                            acp::McpServerStdio::new(id.0.to_string(), &command.path)
270                                .args(command.args.clone())
271                                .env(if let Some(env) = command.env.as_ref() {
272                                    env.iter()
273                                        .map(|(name, value)| acp::EnvVariable::new(name, value))
274                                        .collect()
275                                } else {
276                                    vec![]
277                                }),
278                        )),
279                        project::context_server_store::ContextServerConfiguration::Http {
280                            url,
281                            headers,
282                        } => Some(acp::McpServer::Http(
283                            acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
284                                headers
285                                    .iter()
286                                    .map(|(name, value)| acp::HttpHeader::new(name, value))
287                                    .collect(),
288                            ),
289                        )),
290                    }
291                })
292                .collect()
293        } else {
294            // In SSH projects, the external agent is running on the remote
295            // machine, and currently we only run MCP servers on the local
296            // machine. So don't pass any MCP servers to the agent in that case.
297            Vec::new()
298        };
299
300        cx.spawn(async move |cx| {
301            let response = conn
302                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
303                .await
304                .map_err(|err| {
305                    if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
306                        let mut error = AuthRequired::new();
307
308                        if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
309                            error = error.with_description(err.message);
310                        }
311
312                        anyhow!(error)
313                    } else {
314                        anyhow!(err)
315                    }
316                })?;
317
318            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
319            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
320
321            if let Some(default_mode) = default_mode {
322                if let Some(modes) = modes.as_ref() {
323                    let mut modes_ref = modes.borrow_mut();
324                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
325
326                    if has_mode {
327                        let initial_mode_id = modes_ref.current_mode_id.clone();
328
329                        cx.spawn({
330                            let default_mode = default_mode.clone();
331                            let session_id = response.session_id.clone();
332                            let modes = modes.clone();
333                            let conn = conn.clone();
334                            async move |_| {
335                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
336                                .await.log_err();
337
338                                if result.is_none() {
339                                    modes.borrow_mut().current_mode_id = initial_mode_id;
340                                }
341                            }
342                        }).detach();
343
344                        modes_ref.current_mode_id = default_mode;
345                    } else {
346                        let available_modes = modes_ref
347                            .available_modes
348                            .iter()
349                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
350                            .collect::<Vec<_>>()
351                            .join("\n");
352
353                        log::warn!(
354                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
355                        );
356                    }
357                } else {
358                    log::warn!(
359                        "`{name}` does not support modes, but `default_mode` was set in settings.",
360                    );
361                }
362            }
363
364            if let Some(default_model) = default_model {
365                if let Some(models) = models.as_ref() {
366                    let mut models_ref = models.borrow_mut();
367                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
368
369                    if has_model {
370                        let initial_model_id = models_ref.current_model_id.clone();
371
372                        cx.spawn({
373                            let default_model = default_model.clone();
374                            let session_id = response.session_id.clone();
375                            let models = models.clone();
376                            let conn = conn.clone();
377                            async move |_| {
378                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
379                                .await.log_err();
380
381                                if result.is_none() {
382                                    models.borrow_mut().current_model_id = initial_model_id;
383                                }
384                            }
385                        }).detach();
386
387                        models_ref.current_model_id = default_model;
388                    } else {
389                        let available_models = models_ref
390                            .available_models
391                            .iter()
392                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
393                            .collect::<Vec<_>>()
394                            .join("\n");
395
396                        log::warn!(
397                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
398                        );
399                    }
400                } else {
401                    log::warn!(
402                        "`{name}` does not support model selection, but `default_model` was set in settings.",
403                    );
404                }
405            }
406
407            let session_id = response.session_id;
408            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
409            let thread = cx.new(|cx| {
410                AcpThread::new(
411                    self.server_name.clone(),
412                    self.clone(),
413                    project,
414                    action_log,
415                    session_id.clone(),
416                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
417                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
418                    cx,
419                )
420            })?;
421
422
423            let session = AcpSession {
424                thread: thread.downgrade(),
425                suppress_abort_err: false,
426                session_modes: modes,
427                models,
428            };
429            sessions.borrow_mut().insert(session_id, session);
430
431            Ok(thread)
432        })
433    }
434
435    fn auth_methods(&self) -> &[acp::AuthMethod] {
436        &self.auth_methods
437    }
438
439    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
440        let conn = self.connection.clone();
441        cx.foreground_executor().spawn(async move {
442            conn.authenticate(acp::AuthenticateRequest::new(method_id))
443                .await?;
444            Ok(())
445        })
446    }
447
448    fn prompt(
449        &self,
450        _id: Option<acp_thread::UserMessageId>,
451        params: acp::PromptRequest,
452        cx: &mut App,
453    ) -> Task<Result<acp::PromptResponse>> {
454        let conn = self.connection.clone();
455        let sessions = self.sessions.clone();
456        let session_id = params.session_id.clone();
457        cx.foreground_executor().spawn(async move {
458            let result = conn.prompt(params).await;
459
460            let mut suppress_abort_err = false;
461
462            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
463                suppress_abort_err = session.suppress_abort_err;
464                session.suppress_abort_err = false;
465            }
466
467            match result {
468                Ok(response) => Ok(response),
469                Err(err) => {
470                    if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
471                        return Err(anyhow!(acp::Error::auth_required()));
472                    }
473
474                    if err.code != ErrorCode::INTERNAL_ERROR.code {
475                        anyhow::bail!(err)
476                    }
477
478                    let Some(data) = &err.data else {
479                        anyhow::bail!(err)
480                    };
481
482                    // Temporary workaround until the following PR is generally available:
483                    // https://github.com/google-gemini/gemini-cli/pull/6656
484
485                    #[derive(Deserialize)]
486                    #[serde(deny_unknown_fields)]
487                    struct ErrorDetails {
488                        details: Box<str>,
489                    }
490
491                    match serde_json::from_value(data.clone()) {
492                        Ok(ErrorDetails { details }) => {
493                            if suppress_abort_err
494                                && (details.contains("This operation was aborted")
495                                    || details.contains("The user aborted a request"))
496                            {
497                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
498                            } else {
499                                Err(anyhow!(details))
500                            }
501                        }
502                        Err(_) => Err(anyhow!(err)),
503                    }
504                }
505            }
506        })
507    }
508
509    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
510        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
511            session.suppress_abort_err = true;
512        }
513        let conn = self.connection.clone();
514        let params = acp::CancelNotification::new(session_id.clone());
515        cx.foreground_executor()
516            .spawn(async move { conn.cancel(params).await })
517            .detach();
518    }
519
520    fn session_modes(
521        &self,
522        session_id: &acp::SessionId,
523        _cx: &App,
524    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
525        let sessions = self.sessions.clone();
526        let sessions_ref = sessions.borrow();
527        let Some(session) = sessions_ref.get(session_id) else {
528            return None;
529        };
530
531        if let Some(modes) = session.session_modes.as_ref() {
532            Some(Rc::new(AcpSessionModes {
533                connection: self.connection.clone(),
534                session_id: session_id.clone(),
535                state: modes.clone(),
536            }) as _)
537        } else {
538            None
539        }
540    }
541
542    fn model_selector(
543        &self,
544        session_id: &acp::SessionId,
545    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
546        let sessions = self.sessions.clone();
547        let sessions_ref = sessions.borrow();
548        let Some(session) = sessions_ref.get(session_id) else {
549            return None;
550        };
551
552        if let Some(models) = session.models.as_ref() {
553            Some(Rc::new(AcpModelSelector::new(
554                session_id.clone(),
555                self.connection.clone(),
556                models.clone(),
557            )) as _)
558        } else {
559            None
560        }
561    }
562
563    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
564        self
565    }
566}
567
568struct AcpSessionModes {
569    session_id: acp::SessionId,
570    connection: Rc<acp::ClientSideConnection>,
571    state: Rc<RefCell<acp::SessionModeState>>,
572}
573
574impl acp_thread::AgentSessionModes for AcpSessionModes {
575    fn current_mode(&self) -> acp::SessionModeId {
576        self.state.borrow().current_mode_id.clone()
577    }
578
579    fn all_modes(&self) -> Vec<acp::SessionMode> {
580        self.state.borrow().available_modes.clone()
581    }
582
583    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
584        let connection = self.connection.clone();
585        let session_id = self.session_id.clone();
586        let old_mode_id;
587        {
588            let mut state = self.state.borrow_mut();
589            old_mode_id = state.current_mode_id.clone();
590            state.current_mode_id = mode_id.clone();
591        };
592        let state = self.state.clone();
593        cx.foreground_executor().spawn(async move {
594            let result = connection
595                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
596                .await;
597
598            if result.is_err() {
599                state.borrow_mut().current_mode_id = old_mode_id;
600            }
601
602            result?;
603
604            Ok(())
605        })
606    }
607}
608
609struct AcpModelSelector {
610    session_id: acp::SessionId,
611    connection: Rc<acp::ClientSideConnection>,
612    state: Rc<RefCell<acp::SessionModelState>>,
613}
614
615impl AcpModelSelector {
616    fn new(
617        session_id: acp::SessionId,
618        connection: Rc<acp::ClientSideConnection>,
619        state: Rc<RefCell<acp::SessionModelState>>,
620    ) -> Self {
621        Self {
622            session_id,
623            connection,
624            state,
625        }
626    }
627}
628
629impl acp_thread::AgentModelSelector for AcpModelSelector {
630    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
631        Task::ready(Ok(acp_thread::AgentModelList::Flat(
632            self.state
633                .borrow()
634                .available_models
635                .clone()
636                .into_iter()
637                .map(acp_thread::AgentModelInfo::from)
638                .collect(),
639        )))
640    }
641
642    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
643        let connection = self.connection.clone();
644        let session_id = self.session_id.clone();
645        let old_model_id;
646        {
647            let mut state = self.state.borrow_mut();
648            old_model_id = state.current_model_id.clone();
649            state.current_model_id = model_id.clone();
650        };
651        let state = self.state.clone();
652        cx.foreground_executor().spawn(async move {
653            let result = connection
654                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
655                .await;
656
657            if result.is_err() {
658                state.borrow_mut().current_model_id = old_model_id;
659            }
660
661            result?;
662
663            Ok(())
664        })
665    }
666
667    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
668        let state = self.state.borrow();
669        Task::ready(
670            state
671                .available_models
672                .iter()
673                .find(|m| m.model_id == state.current_model_id)
674                .cloned()
675                .map(acp_thread::AgentModelInfo::from)
676                .ok_or_else(|| anyhow::anyhow!("Model not found")),
677        )
678    }
679}
680
681struct ClientDelegate {
682    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
683    cx: AsyncApp,
684}
685
686#[async_trait::async_trait(?Send)]
687impl acp::Client for ClientDelegate {
688    async fn request_permission(
689        &self,
690        arguments: acp::RequestPermissionRequest,
691    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
692        let respect_always_allow_setting;
693        let thread;
694        {
695            let sessions_ref = self.sessions.borrow();
696            let session = sessions_ref
697                .get(&arguments.session_id)
698                .context("Failed to get session")?;
699            respect_always_allow_setting = session.session_modes.is_none();
700            thread = session.thread.clone();
701        }
702
703        let cx = &mut self.cx.clone();
704
705        let task = thread.update(cx, |thread, cx| {
706            thread.request_tool_call_authorization(
707                arguments.tool_call,
708                arguments.options,
709                respect_always_allow_setting,
710                cx,
711            )
712        })??;
713
714        let outcome = task.await;
715
716        Ok(acp::RequestPermissionResponse::new(outcome))
717    }
718
719    async fn write_text_file(
720        &self,
721        arguments: acp::WriteTextFileRequest,
722    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
723        let cx = &mut self.cx.clone();
724        let task = self
725            .session_thread(&arguments.session_id)?
726            .update(cx, |thread, cx| {
727                thread.write_text_file(arguments.path, arguments.content, cx)
728            })?;
729
730        task.await?;
731
732        Ok(Default::default())
733    }
734
735    async fn read_text_file(
736        &self,
737        arguments: acp::ReadTextFileRequest,
738    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
739        let task = self.session_thread(&arguments.session_id)?.update(
740            &mut self.cx.clone(),
741            |thread, cx| {
742                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
743            },
744        )?;
745
746        let content = task.await?;
747
748        Ok(acp::ReadTextFileResponse::new(content))
749    }
750
751    async fn session_notification(
752        &self,
753        notification: acp::SessionNotification,
754    ) -> Result<(), acp::Error> {
755        let sessions = self.sessions.borrow();
756        let session = sessions
757            .get(&notification.session_id)
758            .context("Failed to get session")?;
759
760        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
761            current_mode_id,
762            ..
763        }) = &notification.update
764        {
765            if let Some(session_modes) = &session.session_modes {
766                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
767            } else {
768                log::error!(
769                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
770                );
771            }
772        }
773
774        // Clone so we can inspect meta both before and after handing off to the thread
775        let update_clone = notification.update.clone();
776
777        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
778        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
779            if let Some(meta) = &tc.meta {
780                if let Some(terminal_info) = meta.get("terminal_info") {
781                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
782                    {
783                        let terminal_id = acp::TerminalId::new(id_str);
784                        let cwd = terminal_info
785                            .get("cwd")
786                            .and_then(|v| v.as_str().map(PathBuf::from));
787
788                        // Create a minimal display-only lower-level terminal and register it.
789                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
790                            let builder = TerminalBuilder::new_display_only(
791                                CursorShape::default(),
792                                AlternateScroll::On,
793                                None,
794                                0,
795                            )?;
796                            let lower = cx.new(|cx| builder.subscribe(cx));
797                            thread.on_terminal_provider_event(
798                                TerminalProviderEvent::Created {
799                                    terminal_id,
800                                    label: tc.title.clone(),
801                                    cwd,
802                                    output_byte_limit: None,
803                                    terminal: lower,
804                                },
805                                cx,
806                            );
807                            anyhow::Ok(())
808                        });
809                    }
810                }
811            }
812        }
813
814        // Forward the update to the acp_thread as usual.
815        session.thread.update(&mut self.cx.clone(), |thread, cx| {
816            thread.handle_session_update(notification.update.clone(), cx)
817        })??;
818
819        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
820        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
821            if let Some(meta) = &tcu.meta {
822                if let Some(term_out) = meta.get("terminal_output") {
823                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
824                        let terminal_id = acp::TerminalId::new(id_str);
825                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
826                            let data = s.as_bytes().to_vec();
827                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
828                                thread.on_terminal_provider_event(
829                                    TerminalProviderEvent::Output { terminal_id, data },
830                                    cx,
831                                );
832                            });
833                        }
834                    }
835                }
836
837                // terminal_exit
838                if let Some(term_exit) = meta.get("terminal_exit") {
839                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
840                        let terminal_id = acp::TerminalId::new(id_str);
841                        let mut status = acp::TerminalExitStatus::new();
842                        if let Some(code) = term_exit.get("exit_code").and_then(|v| v.as_u64()) {
843                            status = status.exit_code(code as u32)
844                        }
845                        if let Some(signal) = term_exit.get("signal").and_then(|v| v.as_str()) {
846                            status = status.signal(signal);
847                        }
848
849                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
850                            thread.on_terminal_provider_event(
851                                TerminalProviderEvent::Exit {
852                                    terminal_id,
853                                    status,
854                                },
855                                cx,
856                            );
857                        });
858                    }
859                }
860            }
861        }
862
863        Ok(())
864    }
865
866    async fn create_terminal(
867        &self,
868        args: acp::CreateTerminalRequest,
869    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
870        let thread = self.session_thread(&args.session_id)?;
871        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
872
873        let terminal_entity = acp_thread::create_terminal_entity(
874            args.command.clone(),
875            &args.args,
876            args.env
877                .into_iter()
878                .map(|env| (env.name, env.value))
879                .collect(),
880            args.cwd.clone(),
881            &project,
882            &mut self.cx.clone(),
883        )
884        .await?;
885
886        // Register with renderer
887        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
888            thread.register_terminal_created(
889                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
890                format!("{} {}", args.command, args.args.join(" ")),
891                args.cwd.clone(),
892                args.output_byte_limit,
893                terminal_entity,
894                cx,
895            )
896        })?;
897        let terminal_id =
898            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
899        Ok(acp::CreateTerminalResponse::new(terminal_id))
900    }
901
902    async fn kill_terminal_command(
903        &self,
904        args: acp::KillTerminalCommandRequest,
905    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
906        self.session_thread(&args.session_id)?
907            .update(&mut self.cx.clone(), |thread, cx| {
908                thread.kill_terminal(args.terminal_id, cx)
909            })??;
910
911        Ok(Default::default())
912    }
913
914    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
915        Err(acp::Error::method_not_found())
916    }
917
918    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
919        Err(acp::Error::method_not_found())
920    }
921
922    async fn release_terminal(
923        &self,
924        args: acp::ReleaseTerminalRequest,
925    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
926        self.session_thread(&args.session_id)?
927            .update(&mut self.cx.clone(), |thread, cx| {
928                thread.release_terminal(args.terminal_id, cx)
929            })??;
930
931        Ok(Default::default())
932    }
933
934    async fn terminal_output(
935        &self,
936        args: acp::TerminalOutputRequest,
937    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
938        self.session_thread(&args.session_id)?
939            .read_with(&mut self.cx.clone(), |thread, cx| {
940                let out = thread
941                    .terminal(args.terminal_id)?
942                    .read(cx)
943                    .current_output(cx);
944
945                Ok(out)
946            })?
947    }
948
949    async fn wait_for_terminal_exit(
950        &self,
951        args: acp::WaitForTerminalExitRequest,
952    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
953        let exit_status = self
954            .session_thread(&args.session_id)?
955            .update(&mut self.cx.clone(), |thread, cx| {
956                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
957            })??
958            .await;
959
960        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
961    }
962}
963
964impl ClientDelegate {
965    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
966        let sessions = self.sessions.borrow();
967        sessions
968            .get(session_id)
969            .context("Failed to get session")
970            .map(|session| session.thread.clone())
971    }
972}