acp.rs

  1use acp_thread::AgentConnection;
  2use acp_tools::AcpConnectionRegistry;
  3use action_log::ActionLog;
  4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
  5use anyhow::anyhow;
  6use collections::HashMap;
  7use futures::AsyncBufReadExt as _;
  8use futures::io::BufReader;
  9use project::Project;
 10use project::agent_server_store::AgentServerCommand;
 11use serde::Deserialize;
 12use settings::Settings as _;
 13use task::ShellBuilder;
 14use util::ResultExt as _;
 15
 16use std::path::PathBuf;
 17use std::{any::Any, cell::RefCell};
 18use std::{path::Path, rc::Rc};
 19use thiserror::Error;
 20
 21use anyhow::{Context as _, Result};
 22use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 23
 24use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
 25use terminal::TerminalBuilder;
 26use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
 27
 28#[derive(Debug, Error)]
 29#[error("Unsupported version")]
 30pub struct UnsupportedVersion;
 31
 32pub struct AcpConnection {
 33    server_name: SharedString,
 34    telemetry_id: SharedString,
 35    connection: Rc<acp::ClientSideConnection>,
 36    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 37    auth_methods: Vec<acp::AuthMethod>,
 38    agent_capabilities: acp::AgentCapabilities,
 39    default_mode: Option<acp::SessionModeId>,
 40    default_model: Option<acp::ModelId>,
 41    root_dir: PathBuf,
 42    // NB: Don't move this into the wait_task, since we need to ensure the process is
 43    // killed on drop (setting kill_on_drop on the command seems to not always work).
 44    child: smol::process::Child,
 45    _io_task: Task<Result<(), acp::Error>>,
 46    _wait_task: Task<Result<()>>,
 47    _stderr_task: Task<Result<()>>,
 48}
 49
 50pub struct AcpSession {
 51    thread: WeakEntity<AcpThread>,
 52    suppress_abort_err: bool,
 53    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 54    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 55}
 56
 57pub async fn connect(
 58    server_name: SharedString,
 59    command: AgentServerCommand,
 60    root_dir: &Path,
 61    default_mode: Option<acp::SessionModeId>,
 62    default_model: Option<acp::ModelId>,
 63    is_remote: bool,
 64    cx: &mut AsyncApp,
 65) -> Result<Rc<dyn AgentConnection>> {
 66    let conn = AcpConnection::stdio(
 67        server_name,
 68        command.clone(),
 69        root_dir,
 70        default_mode,
 71        default_model,
 72        is_remote,
 73        cx,
 74    )
 75    .await?;
 76    Ok(Rc::new(conn) as _)
 77}
 78
 79const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 80
 81impl AcpConnection {
 82    pub async fn stdio(
 83        server_name: SharedString,
 84        command: AgentServerCommand,
 85        root_dir: &Path,
 86        default_mode: Option<acp::SessionModeId>,
 87        default_model: Option<acp::ModelId>,
 88        is_remote: bool,
 89        cx: &mut AsyncApp,
 90    ) -> Result<Self> {
 91        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone())?;
 92        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 93        let mut child =
 94            builder.build_command(Some(command.path.display().to_string()), &command.args);
 95        child
 96            .envs(command.env.iter().flatten())
 97            .stdin(std::process::Stdio::piped())
 98            .stdout(std::process::Stdio::piped())
 99            .stderr(std::process::Stdio::piped());
100        if !is_remote {
101            child.current_dir(root_dir);
102        }
103        let mut child = child.spawn()?;
104
105        let stdout = child.stdout.take().context("Failed to take stdout")?;
106        let stdin = child.stdin.take().context("Failed to take stdin")?;
107        let stderr = child.stderr.take().context("Failed to take stderr")?;
108        log::debug!(
109            "Spawning external agent server: {:?}, {:?}",
110            command.path,
111            command.args
112        );
113        log::trace!("Spawned (pid: {})", child.id());
114
115        let sessions = Rc::new(RefCell::new(HashMap::default()));
116
117        let (release_channel, version) = cx.update(|cx| {
118            (
119                release_channel::ReleaseChannel::try_global(cx)
120                    .map(|release_channel| release_channel.display_name()),
121                release_channel::AppVersion::global(cx).to_string(),
122            )
123        })?;
124
125        let client = ClientDelegate {
126            sessions: sessions.clone(),
127            cx: cx.clone(),
128        };
129        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
130            let foreground_executor = cx.foreground_executor().clone();
131            move |fut| {
132                foreground_executor.spawn(fut).detach();
133            }
134        });
135
136        let io_task = cx.background_spawn(io_task);
137
138        let stderr_task = cx.background_spawn(async move {
139            let mut stderr = BufReader::new(stderr);
140            let mut line = String::new();
141            while let Ok(n) = stderr.read_line(&mut line).await
142                && n > 0
143            {
144                log::warn!("agent stderr: {}", line.trim());
145                line.clear();
146            }
147            Ok(())
148        });
149
150        let wait_task = cx.spawn({
151            let sessions = sessions.clone();
152            let status_fut = child.status();
153            async move |cx| {
154                let status = status_fut.await?;
155
156                for session in sessions.borrow().values() {
157                    session
158                        .thread
159                        .update(cx, |thread, cx| {
160                            thread.emit_load_error(LoadError::Exited { status }, cx)
161                        })
162                        .ok();
163                }
164
165                anyhow::Ok(())
166            }
167        });
168
169        let connection = Rc::new(connection);
170
171        cx.update(|cx| {
172            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
173                registry.set_active_connection(server_name.clone(), &connection, cx)
174            });
175        })?;
176
177        let response = connection
178            .initialize(
179                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
180                    .client_capabilities(
181                        acp::ClientCapabilities::new()
182                            .fs(acp::FileSystemCapability::new()
183                                .read_text_file(true)
184                                .write_text_file(true))
185                            .terminal(true)
186                            // Experimental: Allow for rendering terminal output from the agents
187                            .meta(acp::Meta::from_iter([
188                                ("terminal_output".into(), true.into()),
189                                ("terminal-auth".into(), true.into()),
190                            ])),
191                    )
192                    .client_info(
193                        acp::Implementation::new("zed", version)
194                            .title(release_channel.map(ToOwned::to_owned)),
195                    ),
196            )
197            .await?;
198
199        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
200            return Err(UnsupportedVersion.into());
201        }
202
203        let telemetry_id = response
204            .agent_info
205            // Use the one the agent provides if we have one
206            .map(|info| info.name.into())
207            // Otherwise, just use the name
208            .unwrap_or_else(|| server_name.clone());
209
210        Ok(Self {
211            auth_methods: response.auth_methods,
212            root_dir: root_dir.to_owned(),
213            connection,
214            server_name,
215            telemetry_id,
216            sessions,
217            agent_capabilities: response.agent_capabilities,
218            default_mode,
219            default_model,
220            _io_task: io_task,
221            _wait_task: wait_task,
222            _stderr_task: stderr_task,
223            child,
224        })
225    }
226
227    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
228        &self.agent_capabilities.prompt_capabilities
229    }
230
231    pub fn root_dir(&self) -> &Path {
232        &self.root_dir
233    }
234}
235
236impl Drop for AcpConnection {
237    fn drop(&mut self) {
238        // See the comment on the child field.
239        self.child.kill().log_err();
240    }
241}
242
243impl AgentConnection for AcpConnection {
244    fn telemetry_id(&self) -> SharedString {
245        self.telemetry_id.clone()
246    }
247
248    fn new_thread(
249        self: Rc<Self>,
250        project: Entity<Project>,
251        cwd: &Path,
252        cx: &mut App,
253    ) -> Task<Result<Entity<AcpThread>>> {
254        let name = self.server_name.clone();
255        let conn = self.connection.clone();
256        let sessions = self.sessions.clone();
257        let default_mode = self.default_mode.clone();
258        let default_model = self.default_model.clone();
259        let cwd = cwd.to_path_buf();
260        let context_server_store = project.read(cx).context_server_store().read(cx);
261        let mcp_servers = if project.read(cx).is_local() {
262            context_server_store
263                .configured_server_ids()
264                .iter()
265                .filter_map(|id| {
266                    let configuration = context_server_store.configuration_for_server(id)?;
267                    match &*configuration {
268                        project::context_server_store::ContextServerConfiguration::Custom {
269                            command,
270                            ..
271                        }
272                        | project::context_server_store::ContextServerConfiguration::Extension {
273                            command,
274                            ..
275                        } => Some(acp::McpServer::Stdio(
276                            acp::McpServerStdio::new(id.0.to_string(), &command.path)
277                                .args(command.args.clone())
278                                .env(if let Some(env) = command.env.as_ref() {
279                                    env.iter()
280                                        .map(|(name, value)| acp::EnvVariable::new(name, value))
281                                        .collect()
282                                } else {
283                                    vec![]
284                                }),
285                        )),
286                        project::context_server_store::ContextServerConfiguration::Http {
287                            url,
288                            headers,
289                        } => Some(acp::McpServer::Http(
290                            acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
291                                headers
292                                    .iter()
293                                    .map(|(name, value)| acp::HttpHeader::new(name, value))
294                                    .collect(),
295                            ),
296                        )),
297                    }
298                })
299                .collect()
300        } else {
301            // In SSH projects, the external agent is running on the remote
302            // machine, and currently we only run MCP servers on the local
303            // machine. So don't pass any MCP servers to the agent in that case.
304            Vec::new()
305        };
306
307        cx.spawn(async move |cx| {
308            let response = conn
309                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
310                .await
311                .map_err(|err| {
312                    if err.code == acp::ErrorCode::AuthRequired {
313                        let mut error = AuthRequired::new();
314
315                        if err.message != acp::ErrorCode::AuthRequired.to_string() {
316                            error = error.with_description(err.message);
317                        }
318
319                        anyhow!(error)
320                    } else {
321                        anyhow!(err)
322                    }
323                })?;
324
325            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
326            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
327
328            if let Some(default_mode) = default_mode {
329                if let Some(modes) = modes.as_ref() {
330                    let mut modes_ref = modes.borrow_mut();
331                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
332
333                    if has_mode {
334                        let initial_mode_id = modes_ref.current_mode_id.clone();
335
336                        cx.spawn({
337                            let default_mode = default_mode.clone();
338                            let session_id = response.session_id.clone();
339                            let modes = modes.clone();
340                            let conn = conn.clone();
341                            async move |_| {
342                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
343                                .await.log_err();
344
345                                if result.is_none() {
346                                    modes.borrow_mut().current_mode_id = initial_mode_id;
347                                }
348                            }
349                        }).detach();
350
351                        modes_ref.current_mode_id = default_mode;
352                    } else {
353                        let available_modes = modes_ref
354                            .available_modes
355                            .iter()
356                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
357                            .collect::<Vec<_>>()
358                            .join("\n");
359
360                        log::warn!(
361                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
362                        );
363                    }
364                } else {
365                    log::warn!(
366                        "`{name}` does not support modes, but `default_mode` was set in settings.",
367                    );
368                }
369            }
370
371            if let Some(default_model) = default_model {
372                if let Some(models) = models.as_ref() {
373                    let mut models_ref = models.borrow_mut();
374                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
375
376                    if has_model {
377                        let initial_model_id = models_ref.current_model_id.clone();
378
379                        cx.spawn({
380                            let default_model = default_model.clone();
381                            let session_id = response.session_id.clone();
382                            let models = models.clone();
383                            let conn = conn.clone();
384                            async move |_| {
385                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
386                                .await.log_err();
387
388                                if result.is_none() {
389                                    models.borrow_mut().current_model_id = initial_model_id;
390                                }
391                            }
392                        }).detach();
393
394                        models_ref.current_model_id = default_model;
395                    } else {
396                        let available_models = models_ref
397                            .available_models
398                            .iter()
399                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
400                            .collect::<Vec<_>>()
401                            .join("\n");
402
403                        log::warn!(
404                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
405                        );
406                    }
407                } else {
408                    log::warn!(
409                        "`{name}` does not support model selection, but `default_model` was set in settings.",
410                    );
411                }
412            }
413
414            let session_id = response.session_id;
415            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
416            let thread = cx.new(|cx| {
417                AcpThread::new(
418                    self.server_name.clone(),
419                    self.clone(),
420                    project,
421                    action_log,
422                    session_id.clone(),
423                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
424                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
425                    cx,
426                )
427            })?;
428
429
430            let session = AcpSession {
431                thread: thread.downgrade(),
432                suppress_abort_err: false,
433                session_modes: modes,
434                models,
435            };
436            sessions.borrow_mut().insert(session_id, session);
437
438            Ok(thread)
439        })
440    }
441
442    fn auth_methods(&self) -> &[acp::AuthMethod] {
443        &self.auth_methods
444    }
445
446    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
447        let conn = self.connection.clone();
448        cx.foreground_executor().spawn(async move {
449            conn.authenticate(acp::AuthenticateRequest::new(method_id))
450                .await?;
451            Ok(())
452        })
453    }
454
455    fn prompt(
456        &self,
457        _id: Option<acp_thread::UserMessageId>,
458        params: acp::PromptRequest,
459        cx: &mut App,
460    ) -> Task<Result<acp::PromptResponse>> {
461        let conn = self.connection.clone();
462        let sessions = self.sessions.clone();
463        let session_id = params.session_id.clone();
464        cx.foreground_executor().spawn(async move {
465            let result = conn.prompt(params).await;
466
467            let mut suppress_abort_err = false;
468
469            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
470                suppress_abort_err = session.suppress_abort_err;
471                session.suppress_abort_err = false;
472            }
473
474            match result {
475                Ok(response) => Ok(response),
476                Err(err) => {
477                    if err.code == acp::ErrorCode::AuthRequired {
478                        return Err(anyhow!(acp::Error::auth_required()));
479                    }
480
481                    if err.code != ErrorCode::InternalError {
482                        anyhow::bail!(err)
483                    }
484
485                    let Some(data) = &err.data else {
486                        anyhow::bail!(err)
487                    };
488
489                    // Temporary workaround until the following PR is generally available:
490                    // https://github.com/google-gemini/gemini-cli/pull/6656
491
492                    #[derive(Deserialize)]
493                    #[serde(deny_unknown_fields)]
494                    struct ErrorDetails {
495                        details: Box<str>,
496                    }
497
498                    match serde_json::from_value(data.clone()) {
499                        Ok(ErrorDetails { details }) => {
500                            if suppress_abort_err
501                                && (details.contains("This operation was aborted")
502                                    || details.contains("The user aborted a request"))
503                            {
504                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
505                            } else {
506                                Err(anyhow!(details))
507                            }
508                        }
509                        Err(_) => Err(anyhow!(err)),
510                    }
511                }
512            }
513        })
514    }
515
516    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
517        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
518            session.suppress_abort_err = true;
519        }
520        let conn = self.connection.clone();
521        let params = acp::CancelNotification::new(session_id.clone());
522        cx.foreground_executor()
523            .spawn(async move { conn.cancel(params).await })
524            .detach();
525    }
526
527    fn session_modes(
528        &self,
529        session_id: &acp::SessionId,
530        _cx: &App,
531    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
532        let sessions = self.sessions.clone();
533        let sessions_ref = sessions.borrow();
534        let Some(session) = sessions_ref.get(session_id) else {
535            return None;
536        };
537
538        if let Some(modes) = session.session_modes.as_ref() {
539            Some(Rc::new(AcpSessionModes {
540                connection: self.connection.clone(),
541                session_id: session_id.clone(),
542                state: modes.clone(),
543            }) as _)
544        } else {
545            None
546        }
547    }
548
549    fn model_selector(
550        &self,
551        session_id: &acp::SessionId,
552    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
553        let sessions = self.sessions.clone();
554        let sessions_ref = sessions.borrow();
555        let Some(session) = sessions_ref.get(session_id) else {
556            return None;
557        };
558
559        if let Some(models) = session.models.as_ref() {
560            Some(Rc::new(AcpModelSelector::new(
561                session_id.clone(),
562                self.connection.clone(),
563                models.clone(),
564            )) as _)
565        } else {
566            None
567        }
568    }
569
570    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
571        self
572    }
573}
574
575struct AcpSessionModes {
576    session_id: acp::SessionId,
577    connection: Rc<acp::ClientSideConnection>,
578    state: Rc<RefCell<acp::SessionModeState>>,
579}
580
581impl acp_thread::AgentSessionModes for AcpSessionModes {
582    fn current_mode(&self) -> acp::SessionModeId {
583        self.state.borrow().current_mode_id.clone()
584    }
585
586    fn all_modes(&self) -> Vec<acp::SessionMode> {
587        self.state.borrow().available_modes.clone()
588    }
589
590    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
591        let connection = self.connection.clone();
592        let session_id = self.session_id.clone();
593        let old_mode_id;
594        {
595            let mut state = self.state.borrow_mut();
596            old_mode_id = state.current_mode_id.clone();
597            state.current_mode_id = mode_id.clone();
598        };
599        let state = self.state.clone();
600        cx.foreground_executor().spawn(async move {
601            let result = connection
602                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
603                .await;
604
605            if result.is_err() {
606                state.borrow_mut().current_mode_id = old_mode_id;
607            }
608
609            result?;
610
611            Ok(())
612        })
613    }
614}
615
616struct AcpModelSelector {
617    session_id: acp::SessionId,
618    connection: Rc<acp::ClientSideConnection>,
619    state: Rc<RefCell<acp::SessionModelState>>,
620}
621
622impl AcpModelSelector {
623    fn new(
624        session_id: acp::SessionId,
625        connection: Rc<acp::ClientSideConnection>,
626        state: Rc<RefCell<acp::SessionModelState>>,
627    ) -> Self {
628        Self {
629            session_id,
630            connection,
631            state,
632        }
633    }
634}
635
636impl acp_thread::AgentModelSelector for AcpModelSelector {
637    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
638        Task::ready(Ok(acp_thread::AgentModelList::Flat(
639            self.state
640                .borrow()
641                .available_models
642                .clone()
643                .into_iter()
644                .map(acp_thread::AgentModelInfo::from)
645                .collect(),
646        )))
647    }
648
649    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
650        let connection = self.connection.clone();
651        let session_id = self.session_id.clone();
652        let old_model_id;
653        {
654            let mut state = self.state.borrow_mut();
655            old_model_id = state.current_model_id.clone();
656            state.current_model_id = model_id.clone();
657        };
658        let state = self.state.clone();
659        cx.foreground_executor().spawn(async move {
660            let result = connection
661                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
662                .await;
663
664            if result.is_err() {
665                state.borrow_mut().current_model_id = old_model_id;
666            }
667
668            result?;
669
670            Ok(())
671        })
672    }
673
674    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
675        let state = self.state.borrow();
676        Task::ready(
677            state
678                .available_models
679                .iter()
680                .find(|m| m.model_id == state.current_model_id)
681                .cloned()
682                .map(acp_thread::AgentModelInfo::from)
683                .ok_or_else(|| anyhow::anyhow!("Model not found")),
684        )
685    }
686}
687
688struct ClientDelegate {
689    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
690    cx: AsyncApp,
691}
692
693#[async_trait::async_trait(?Send)]
694impl acp::Client for ClientDelegate {
695    async fn request_permission(
696        &self,
697        arguments: acp::RequestPermissionRequest,
698    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
699        let respect_always_allow_setting;
700        let thread;
701        {
702            let sessions_ref = self.sessions.borrow();
703            let session = sessions_ref
704                .get(&arguments.session_id)
705                .context("Failed to get session")?;
706            respect_always_allow_setting = session.session_modes.is_none();
707            thread = session.thread.clone();
708        }
709
710        let cx = &mut self.cx.clone();
711
712        let task = thread.update(cx, |thread, cx| {
713            thread.request_tool_call_authorization(
714                arguments.tool_call,
715                arguments.options,
716                respect_always_allow_setting,
717                cx,
718            )
719        })??;
720
721        let outcome = task.await;
722
723        Ok(acp::RequestPermissionResponse::new(outcome))
724    }
725
726    async fn write_text_file(
727        &self,
728        arguments: acp::WriteTextFileRequest,
729    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
730        let cx = &mut self.cx.clone();
731        let task = self
732            .session_thread(&arguments.session_id)?
733            .update(cx, |thread, cx| {
734                thread.write_text_file(arguments.path, arguments.content, cx)
735            })?;
736
737        task.await?;
738
739        Ok(Default::default())
740    }
741
742    async fn read_text_file(
743        &self,
744        arguments: acp::ReadTextFileRequest,
745    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
746        let task = self.session_thread(&arguments.session_id)?.update(
747            &mut self.cx.clone(),
748            |thread, cx| {
749                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
750            },
751        )?;
752
753        let content = task.await?;
754
755        Ok(acp::ReadTextFileResponse::new(content))
756    }
757
758    async fn session_notification(
759        &self,
760        notification: acp::SessionNotification,
761    ) -> Result<(), acp::Error> {
762        let sessions = self.sessions.borrow();
763        let session = sessions
764            .get(&notification.session_id)
765            .context("Failed to get session")?;
766
767        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
768            current_mode_id,
769            ..
770        }) = &notification.update
771        {
772            if let Some(session_modes) = &session.session_modes {
773                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
774            } else {
775                log::error!(
776                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
777                );
778            }
779        }
780
781        // Clone so we can inspect meta both before and after handing off to the thread
782        let update_clone = notification.update.clone();
783
784        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
785        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
786            if let Some(meta) = &tc.meta {
787                if let Some(terminal_info) = meta.get("terminal_info") {
788                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
789                    {
790                        let terminal_id = acp::TerminalId::new(id_str);
791                        let cwd = terminal_info
792                            .get("cwd")
793                            .and_then(|v| v.as_str().map(PathBuf::from));
794
795                        // Create a minimal display-only lower-level terminal and register it.
796                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
797                            let builder = TerminalBuilder::new_display_only(
798                                CursorShape::default(),
799                                AlternateScroll::On,
800                                None,
801                                0,
802                            )?;
803                            let lower = cx.new(|cx| builder.subscribe(cx));
804                            thread.on_terminal_provider_event(
805                                TerminalProviderEvent::Created {
806                                    terminal_id,
807                                    label: tc.title.clone(),
808                                    cwd,
809                                    output_byte_limit: None,
810                                    terminal: lower,
811                                },
812                                cx,
813                            );
814                            anyhow::Ok(())
815                        });
816                    }
817                }
818            }
819        }
820
821        // Forward the update to the acp_thread as usual.
822        session.thread.update(&mut self.cx.clone(), |thread, cx| {
823            thread.handle_session_update(notification.update.clone(), cx)
824        })??;
825
826        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
827        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
828            if let Some(meta) = &tcu.meta {
829                if let Some(term_out) = meta.get("terminal_output") {
830                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
831                        let terminal_id = acp::TerminalId::new(id_str);
832                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
833                            let data = s.as_bytes().to_vec();
834                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
835                                thread.on_terminal_provider_event(
836                                    TerminalProviderEvent::Output { terminal_id, data },
837                                    cx,
838                                );
839                            });
840                        }
841                    }
842                }
843
844                // terminal_exit
845                if let Some(term_exit) = meta.get("terminal_exit") {
846                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
847                        let terminal_id = acp::TerminalId::new(id_str);
848                        let status = acp::TerminalExitStatus::new()
849                            .exit_code(
850                                term_exit
851                                    .get("exit_code")
852                                    .and_then(|v| v.as_u64())
853                                    .map(|i| i as u32),
854                            )
855                            .signal(
856                                term_exit
857                                    .get("signal")
858                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
859                            );
860
861                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
862                            thread.on_terminal_provider_event(
863                                TerminalProviderEvent::Exit {
864                                    terminal_id,
865                                    status,
866                                },
867                                cx,
868                            );
869                        });
870                    }
871                }
872            }
873        }
874
875        Ok(())
876    }
877
878    async fn create_terminal(
879        &self,
880        args: acp::CreateTerminalRequest,
881    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
882        let thread = self.session_thread(&args.session_id)?;
883        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
884
885        let terminal_entity = acp_thread::create_terminal_entity(
886            args.command.clone(),
887            &args.args,
888            args.env
889                .into_iter()
890                .map(|env| (env.name, env.value))
891                .collect(),
892            args.cwd.clone(),
893            &project,
894            &mut self.cx.clone(),
895        )
896        .await?;
897
898        // Register with renderer
899        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
900            thread.register_terminal_created(
901                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
902                format!("{} {}", args.command, args.args.join(" ")),
903                args.cwd.clone(),
904                args.output_byte_limit,
905                terminal_entity,
906                cx,
907            )
908        })?;
909        let terminal_id =
910            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
911        Ok(acp::CreateTerminalResponse::new(terminal_id))
912    }
913
914    async fn kill_terminal_command(
915        &self,
916        args: acp::KillTerminalCommandRequest,
917    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
918        self.session_thread(&args.session_id)?
919            .update(&mut self.cx.clone(), |thread, cx| {
920                thread.kill_terminal(args.terminal_id, cx)
921            })??;
922
923        Ok(Default::default())
924    }
925
926    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
927        Err(acp::Error::method_not_found())
928    }
929
930    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
931        Err(acp::Error::method_not_found())
932    }
933
934    async fn release_terminal(
935        &self,
936        args: acp::ReleaseTerminalRequest,
937    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
938        self.session_thread(&args.session_id)?
939            .update(&mut self.cx.clone(), |thread, cx| {
940                thread.release_terminal(args.terminal_id, cx)
941            })??;
942
943        Ok(Default::default())
944    }
945
946    async fn terminal_output(
947        &self,
948        args: acp::TerminalOutputRequest,
949    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
950        self.session_thread(&args.session_id)?
951            .read_with(&mut self.cx.clone(), |thread, cx| {
952                let out = thread
953                    .terminal(args.terminal_id)?
954                    .read(cx)
955                    .current_output(cx);
956
957                Ok(out)
958            })?
959    }
960
961    async fn wait_for_terminal_exit(
962        &self,
963        args: acp::WaitForTerminalExitRequest,
964    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
965        let exit_status = self
966            .session_thread(&args.session_id)?
967            .update(&mut self.cx.clone(), |thread, cx| {
968                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
969            })??
970            .await;
971
972        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
973    }
974}
975
976impl ClientDelegate {
977    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
978        let sessions = self.sessions.borrow();
979        sessions
980            .get(session_id)
981            .context("Failed to get session")
982            .map(|session| session.thread.clone())
983    }
984}