acp.rs

  1use acp_thread::AgentConnection;
  2use acp_tools::AcpConnectionRegistry;
  3use action_log::ActionLog;
  4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
  5use anyhow::anyhow;
  6use collections::HashMap;
  7use futures::AsyncBufReadExt as _;
  8use futures::io::BufReader;
  9use project::Project;
 10use project::agent_server_store::AgentServerCommand;
 11use serde::Deserialize;
 12use settings::Settings as _;
 13use task::ShellBuilder;
 14use util::ResultExt as _;
 15
 16use std::path::PathBuf;
 17use std::{any::Any, cell::RefCell};
 18use std::{path::Path, rc::Rc};
 19use thiserror::Error;
 20
 21use anyhow::{Context as _, Result};
 22use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 23
 24use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
 25use terminal::TerminalBuilder;
 26use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
 27
 28#[derive(Debug, Error)]
 29#[error("Unsupported version")]
 30pub struct UnsupportedVersion;
 31
 32pub struct AcpConnection {
 33    server_name: SharedString,
 34    telemetry_id: SharedString,
 35    connection: Rc<acp::ClientSideConnection>,
 36    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 37    auth_methods: Vec<acp::AuthMethod>,
 38    agent_capabilities: acp::AgentCapabilities,
 39    default_mode: Option<acp::SessionModeId>,
 40    default_model: Option<acp::ModelId>,
 41    root_dir: PathBuf,
 42    // NB: Don't move this into the wait_task, since we need to ensure the process is
 43    // killed on drop (setting kill_on_drop on the command seems to not always work).
 44    child: smol::process::Child,
 45    _io_task: Task<Result<(), acp::Error>>,
 46    _wait_task: Task<Result<()>>,
 47    _stderr_task: Task<Result<()>>,
 48}
 49
 50pub struct AcpSession {
 51    thread: WeakEntity<AcpThread>,
 52    suppress_abort_err: bool,
 53    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 54    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 55}
 56
 57pub async fn connect(
 58    server_name: SharedString,
 59    command: AgentServerCommand,
 60    root_dir: &Path,
 61    default_mode: Option<acp::SessionModeId>,
 62    default_model: Option<acp::ModelId>,
 63    is_remote: bool,
 64    cx: &mut AsyncApp,
 65) -> Result<Rc<dyn AgentConnection>> {
 66    let conn = AcpConnection::stdio(
 67        server_name,
 68        command.clone(),
 69        root_dir,
 70        default_mode,
 71        default_model,
 72        is_remote,
 73        cx,
 74    )
 75    .await?;
 76    Ok(Rc::new(conn) as _)
 77}
 78
 79const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 80
 81impl AcpConnection {
 82    pub async fn stdio(
 83        server_name: SharedString,
 84        command: AgentServerCommand,
 85        root_dir: &Path,
 86        default_mode: Option<acp::SessionModeId>,
 87        default_model: Option<acp::ModelId>,
 88        is_remote: bool,
 89        cx: &mut AsyncApp,
 90    ) -> Result<Self> {
 91        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone())?;
 92        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 93        let mut child =
 94            builder.build_command(Some(command.path.display().to_string()), &command.args);
 95        child
 96            .envs(command.env.iter().flatten())
 97            .stdin(std::process::Stdio::piped())
 98            .stdout(std::process::Stdio::piped())
 99            .stderr(std::process::Stdio::piped());
100        if !is_remote {
101            child.current_dir(root_dir);
102        }
103        let mut child = child.spawn()?;
104
105        let stdout = child.stdout.take().context("Failed to take stdout")?;
106        let stdin = child.stdin.take().context("Failed to take stdin")?;
107        let stderr = child.stderr.take().context("Failed to take stderr")?;
108        log::debug!(
109            "Spawning external agent server: {:?}, {:?}",
110            command.path,
111            command.args
112        );
113        log::trace!("Spawned (pid: {})", child.id());
114
115        let sessions = Rc::new(RefCell::new(HashMap::default()));
116
117        let (release_channel, version) = cx.update(|cx| {
118            (
119                release_channel::ReleaseChannel::try_global(cx)
120                    .map(|release_channel| release_channel.display_name()),
121                release_channel::AppVersion::global(cx).to_string(),
122            )
123        })?;
124
125        let client = ClientDelegate {
126            sessions: sessions.clone(),
127            cx: cx.clone(),
128        };
129        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
130            let foreground_executor = cx.foreground_executor().clone();
131            move |fut| {
132                foreground_executor.spawn(fut).detach();
133            }
134        });
135
136        let io_task = cx.background_spawn(io_task);
137
138        let stderr_task = cx.background_spawn(async move {
139            let mut stderr = BufReader::new(stderr);
140            let mut line = String::new();
141            while let Ok(n) = stderr.read_line(&mut line).await
142                && n > 0
143            {
144                log::warn!("agent stderr: {}", line.trim());
145                line.clear();
146            }
147            Ok(())
148        });
149
150        let wait_task = cx.spawn({
151            let sessions = sessions.clone();
152            let status_fut = child.status();
153            async move |cx| {
154                let status = status_fut.await?;
155
156                for session in sessions.borrow().values() {
157                    session
158                        .thread
159                        .update(cx, |thread, cx| {
160                            thread.emit_load_error(LoadError::Exited { status }, cx)
161                        })
162                        .ok();
163                }
164
165                anyhow::Ok(())
166            }
167        });
168
169        let connection = Rc::new(connection);
170
171        cx.update(|cx| {
172            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
173                registry.set_active_connection(server_name.clone(), &connection, cx)
174            });
175        })?;
176
177        let response = connection
178            .initialize(
179                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
180                    .client_capabilities(
181                        acp::ClientCapabilities::new()
182                            .fs(acp::FileSystemCapability::new()
183                                .read_text_file(true)
184                                .write_text_file(true))
185                            .terminal(true)
186                            // Experimental: Allow for rendering terminal output from the agents
187                            .meta(acp::Meta::from_iter([
188                                ("terminal_output".into(), true.into()),
189                                ("terminal-auth".into(), true.into()),
190                            ])),
191                    )
192                    .client_info(
193                        acp::Implementation::new("zed", version)
194                            .title(release_channel.map(ToOwned::to_owned)),
195                    ),
196            )
197            .await?;
198
199        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
200            return Err(UnsupportedVersion.into());
201        }
202
203        let telemetry_id = response
204            .agent_info
205            // Use the one the agent provides if we have one
206            .map(|info| info.name.into())
207            // Otherwise, just use the name
208            .unwrap_or_else(|| server_name.clone());
209
210        Ok(Self {
211            auth_methods: response.auth_methods,
212            root_dir: root_dir.to_owned(),
213            connection,
214            server_name,
215            telemetry_id,
216            sessions,
217            agent_capabilities: response.agent_capabilities,
218            default_mode,
219            default_model,
220            _io_task: io_task,
221            _wait_task: wait_task,
222            _stderr_task: stderr_task,
223            child,
224        })
225    }
226
227    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
228        &self.agent_capabilities.prompt_capabilities
229    }
230
231    pub fn root_dir(&self) -> &Path {
232        &self.root_dir
233    }
234}
235
236impl Drop for AcpConnection {
237    fn drop(&mut self) {
238        // See the comment on the child field.
239        self.child.kill().log_err();
240    }
241}
242
243impl AgentConnection for AcpConnection {
244    fn telemetry_id(&self) -> SharedString {
245        self.telemetry_id.clone()
246    }
247
248    fn new_thread(
249        self: Rc<Self>,
250        project: Entity<Project>,
251        cwd: &Path,
252        cx: &mut App,
253    ) -> Task<Result<Entity<AcpThread>>> {
254        let name = self.server_name.clone();
255        let conn = self.connection.clone();
256        let sessions = self.sessions.clone();
257        let default_mode = self.default_mode.clone();
258        let default_model = self.default_model.clone();
259        let cwd = cwd.to_path_buf();
260        let context_server_store = project.read(cx).context_server_store().read(cx);
261        let mcp_servers = if project.read(cx).is_local() {
262            context_server_store
263                .configured_server_ids()
264                .iter()
265                .filter_map(|id| {
266                    let configuration = context_server_store.configuration_for_server(id)?;
267                    match &*configuration {
268                        project::context_server_store::ContextServerConfiguration::Custom {
269                            command,
270                            ..
271                        }
272                        | project::context_server_store::ContextServerConfiguration::Extension {
273                            command,
274                            ..
275                        } => Some(acp::McpServer::Stdio(
276                            acp::McpServerStdio::new(id.0.to_string(), &command.path)
277                                .args(command.args.clone())
278                                .env(if let Some(env) = command.env.as_ref() {
279                                    env.iter()
280                                        .map(|(name, value)| acp::EnvVariable::new(name, value))
281                                        .collect()
282                                } else {
283                                    vec![]
284                                }),
285                        )),
286                        project::context_server_store::ContextServerConfiguration::Http {
287                            url,
288                            headers,
289                            timeout: _,
290                        } => Some(acp::McpServer::Http(
291                            acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
292                                headers
293                                    .iter()
294                                    .map(|(name, value)| acp::HttpHeader::new(name, value))
295                                    .collect(),
296                            ),
297                        )),
298                    }
299                })
300                .collect()
301        } else {
302            // In SSH projects, the external agent is running on the remote
303            // machine, and currently we only run MCP servers on the local
304            // machine. So don't pass any MCP servers to the agent in that case.
305            Vec::new()
306        };
307
308        cx.spawn(async move |cx| {
309            let response = conn
310                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
311                .await
312                .map_err(|err| {
313                    if err.code == acp::ErrorCode::AuthRequired {
314                        let mut error = AuthRequired::new();
315
316                        if err.message != acp::ErrorCode::AuthRequired.to_string() {
317                            error = error.with_description(err.message);
318                        }
319
320                        anyhow!(error)
321                    } else {
322                        anyhow!(err)
323                    }
324                })?;
325
326            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
327            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
328
329            if let Some(default_mode) = default_mode {
330                if let Some(modes) = modes.as_ref() {
331                    let mut modes_ref = modes.borrow_mut();
332                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
333
334                    if has_mode {
335                        let initial_mode_id = modes_ref.current_mode_id.clone();
336
337                        cx.spawn({
338                            let default_mode = default_mode.clone();
339                            let session_id = response.session_id.clone();
340                            let modes = modes.clone();
341                            let conn = conn.clone();
342                            async move |_| {
343                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
344                                .await.log_err();
345
346                                if result.is_none() {
347                                    modes.borrow_mut().current_mode_id = initial_mode_id;
348                                }
349                            }
350                        }).detach();
351
352                        modes_ref.current_mode_id = default_mode;
353                    } else {
354                        let available_modes = modes_ref
355                            .available_modes
356                            .iter()
357                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
358                            .collect::<Vec<_>>()
359                            .join("\n");
360
361                        log::warn!(
362                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
363                        );
364                    }
365                } else {
366                    log::warn!(
367                        "`{name}` does not support modes, but `default_mode` was set in settings.",
368                    );
369                }
370            }
371
372            if let Some(default_model) = default_model {
373                if let Some(models) = models.as_ref() {
374                    let mut models_ref = models.borrow_mut();
375                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
376
377                    if has_model {
378                        let initial_model_id = models_ref.current_model_id.clone();
379
380                        cx.spawn({
381                            let default_model = default_model.clone();
382                            let session_id = response.session_id.clone();
383                            let models = models.clone();
384                            let conn = conn.clone();
385                            async move |_| {
386                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
387                                .await.log_err();
388
389                                if result.is_none() {
390                                    models.borrow_mut().current_model_id = initial_model_id;
391                                }
392                            }
393                        }).detach();
394
395                        models_ref.current_model_id = default_model;
396                    } else {
397                        let available_models = models_ref
398                            .available_models
399                            .iter()
400                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
401                            .collect::<Vec<_>>()
402                            .join("\n");
403
404                        log::warn!(
405                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
406                        );
407                    }
408                } else {
409                    log::warn!(
410                        "`{name}` does not support model selection, but `default_model` was set in settings.",
411                    );
412                }
413            }
414
415            let session_id = response.session_id;
416            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
417            let thread = cx.new(|cx| {
418                AcpThread::new(
419                    self.server_name.clone(),
420                    self.clone(),
421                    project,
422                    action_log,
423                    session_id.clone(),
424                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
425                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
426                    cx,
427                )
428            })?;
429
430
431            let session = AcpSession {
432                thread: thread.downgrade(),
433                suppress_abort_err: false,
434                session_modes: modes,
435                models,
436            };
437            sessions.borrow_mut().insert(session_id, session);
438
439            Ok(thread)
440        })
441    }
442
443    fn auth_methods(&self) -> &[acp::AuthMethod] {
444        &self.auth_methods
445    }
446
447    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
448        let conn = self.connection.clone();
449        cx.foreground_executor().spawn(async move {
450            conn.authenticate(acp::AuthenticateRequest::new(method_id))
451                .await?;
452            Ok(())
453        })
454    }
455
456    fn prompt(
457        &self,
458        _id: Option<acp_thread::UserMessageId>,
459        params: acp::PromptRequest,
460        cx: &mut App,
461    ) -> Task<Result<acp::PromptResponse>> {
462        let conn = self.connection.clone();
463        let sessions = self.sessions.clone();
464        let session_id = params.session_id.clone();
465        cx.foreground_executor().spawn(async move {
466            let result = conn.prompt(params).await;
467
468            let mut suppress_abort_err = false;
469
470            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
471                suppress_abort_err = session.suppress_abort_err;
472                session.suppress_abort_err = false;
473            }
474
475            match result {
476                Ok(response) => Ok(response),
477                Err(err) => {
478                    if err.code == acp::ErrorCode::AuthRequired {
479                        return Err(anyhow!(acp::Error::auth_required()));
480                    }
481
482                    if err.code != ErrorCode::InternalError {
483                        anyhow::bail!(err)
484                    }
485
486                    let Some(data) = &err.data else {
487                        anyhow::bail!(err)
488                    };
489
490                    // Temporary workaround until the following PR is generally available:
491                    // https://github.com/google-gemini/gemini-cli/pull/6656
492
493                    #[derive(Deserialize)]
494                    #[serde(deny_unknown_fields)]
495                    struct ErrorDetails {
496                        details: Box<str>,
497                    }
498
499                    match serde_json::from_value(data.clone()) {
500                        Ok(ErrorDetails { details }) => {
501                            if suppress_abort_err
502                                && (details.contains("This operation was aborted")
503                                    || details.contains("The user aborted a request"))
504                            {
505                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
506                            } else {
507                                Err(anyhow!(details))
508                            }
509                        }
510                        Err(_) => Err(anyhow!(err)),
511                    }
512                }
513            }
514        })
515    }
516
517    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
518        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
519            session.suppress_abort_err = true;
520        }
521        let conn = self.connection.clone();
522        let params = acp::CancelNotification::new(session_id.clone());
523        cx.foreground_executor()
524            .spawn(async move { conn.cancel(params).await })
525            .detach();
526    }
527
528    fn session_modes(
529        &self,
530        session_id: &acp::SessionId,
531        _cx: &App,
532    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
533        let sessions = self.sessions.clone();
534        let sessions_ref = sessions.borrow();
535        let Some(session) = sessions_ref.get(session_id) else {
536            return None;
537        };
538
539        if let Some(modes) = session.session_modes.as_ref() {
540            Some(Rc::new(AcpSessionModes {
541                connection: self.connection.clone(),
542                session_id: session_id.clone(),
543                state: modes.clone(),
544            }) as _)
545        } else {
546            None
547        }
548    }
549
550    fn model_selector(
551        &self,
552        session_id: &acp::SessionId,
553    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
554        let sessions = self.sessions.clone();
555        let sessions_ref = sessions.borrow();
556        let Some(session) = sessions_ref.get(session_id) else {
557            return None;
558        };
559
560        if let Some(models) = session.models.as_ref() {
561            Some(Rc::new(AcpModelSelector::new(
562                session_id.clone(),
563                self.connection.clone(),
564                models.clone(),
565            )) as _)
566        } else {
567            None
568        }
569    }
570
571    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
572        self
573    }
574}
575
576struct AcpSessionModes {
577    session_id: acp::SessionId,
578    connection: Rc<acp::ClientSideConnection>,
579    state: Rc<RefCell<acp::SessionModeState>>,
580}
581
582impl acp_thread::AgentSessionModes for AcpSessionModes {
583    fn current_mode(&self) -> acp::SessionModeId {
584        self.state.borrow().current_mode_id.clone()
585    }
586
587    fn all_modes(&self) -> Vec<acp::SessionMode> {
588        self.state.borrow().available_modes.clone()
589    }
590
591    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
592        let connection = self.connection.clone();
593        let session_id = self.session_id.clone();
594        let old_mode_id;
595        {
596            let mut state = self.state.borrow_mut();
597            old_mode_id = state.current_mode_id.clone();
598            state.current_mode_id = mode_id.clone();
599        };
600        let state = self.state.clone();
601        cx.foreground_executor().spawn(async move {
602            let result = connection
603                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
604                .await;
605
606            if result.is_err() {
607                state.borrow_mut().current_mode_id = old_mode_id;
608            }
609
610            result?;
611
612            Ok(())
613        })
614    }
615}
616
617struct AcpModelSelector {
618    session_id: acp::SessionId,
619    connection: Rc<acp::ClientSideConnection>,
620    state: Rc<RefCell<acp::SessionModelState>>,
621}
622
623impl AcpModelSelector {
624    fn new(
625        session_id: acp::SessionId,
626        connection: Rc<acp::ClientSideConnection>,
627        state: Rc<RefCell<acp::SessionModelState>>,
628    ) -> Self {
629        Self {
630            session_id,
631            connection,
632            state,
633        }
634    }
635}
636
637impl acp_thread::AgentModelSelector for AcpModelSelector {
638    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
639        Task::ready(Ok(acp_thread::AgentModelList::Flat(
640            self.state
641                .borrow()
642                .available_models
643                .clone()
644                .into_iter()
645                .map(acp_thread::AgentModelInfo::from)
646                .collect(),
647        )))
648    }
649
650    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
651        let connection = self.connection.clone();
652        let session_id = self.session_id.clone();
653        let old_model_id;
654        {
655            let mut state = self.state.borrow_mut();
656            old_model_id = state.current_model_id.clone();
657            state.current_model_id = model_id.clone();
658        };
659        let state = self.state.clone();
660        cx.foreground_executor().spawn(async move {
661            let result = connection
662                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
663                .await;
664
665            if result.is_err() {
666                state.borrow_mut().current_model_id = old_model_id;
667            }
668
669            result?;
670
671            Ok(())
672        })
673    }
674
675    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
676        let state = self.state.borrow();
677        Task::ready(
678            state
679                .available_models
680                .iter()
681                .find(|m| m.model_id == state.current_model_id)
682                .cloned()
683                .map(acp_thread::AgentModelInfo::from)
684                .ok_or_else(|| anyhow::anyhow!("Model not found")),
685        )
686    }
687}
688
689struct ClientDelegate {
690    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
691    cx: AsyncApp,
692}
693
694#[async_trait::async_trait(?Send)]
695impl acp::Client for ClientDelegate {
696    async fn request_permission(
697        &self,
698        arguments: acp::RequestPermissionRequest,
699    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
700        let respect_always_allow_setting;
701        let thread;
702        {
703            let sessions_ref = self.sessions.borrow();
704            let session = sessions_ref
705                .get(&arguments.session_id)
706                .context("Failed to get session")?;
707            respect_always_allow_setting = session.session_modes.is_none();
708            thread = session.thread.clone();
709        }
710
711        let cx = &mut self.cx.clone();
712
713        let task = thread.update(cx, |thread, cx| {
714            thread.request_tool_call_authorization(
715                arguments.tool_call,
716                arguments.options,
717                respect_always_allow_setting,
718                cx,
719            )
720        })??;
721
722        let outcome = task.await;
723
724        Ok(acp::RequestPermissionResponse::new(outcome))
725    }
726
727    async fn write_text_file(
728        &self,
729        arguments: acp::WriteTextFileRequest,
730    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
731        let cx = &mut self.cx.clone();
732        let task = self
733            .session_thread(&arguments.session_id)?
734            .update(cx, |thread, cx| {
735                thread.write_text_file(arguments.path, arguments.content, cx)
736            })?;
737
738        task.await?;
739
740        Ok(Default::default())
741    }
742
743    async fn read_text_file(
744        &self,
745        arguments: acp::ReadTextFileRequest,
746    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
747        let task = self.session_thread(&arguments.session_id)?.update(
748            &mut self.cx.clone(),
749            |thread, cx| {
750                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
751            },
752        )?;
753
754        let content = task.await?;
755
756        Ok(acp::ReadTextFileResponse::new(content))
757    }
758
759    async fn session_notification(
760        &self,
761        notification: acp::SessionNotification,
762    ) -> Result<(), acp::Error> {
763        let sessions = self.sessions.borrow();
764        let session = sessions
765            .get(&notification.session_id)
766            .context("Failed to get session")?;
767
768        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
769            current_mode_id,
770            ..
771        }) = &notification.update
772        {
773            if let Some(session_modes) = &session.session_modes {
774                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
775            } else {
776                log::error!(
777                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
778                );
779            }
780        }
781
782        // Clone so we can inspect meta both before and after handing off to the thread
783        let update_clone = notification.update.clone();
784
785        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
786        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
787            if let Some(meta) = &tc.meta {
788                if let Some(terminal_info) = meta.get("terminal_info") {
789                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
790                    {
791                        let terminal_id = acp::TerminalId::new(id_str);
792                        let cwd = terminal_info
793                            .get("cwd")
794                            .and_then(|v| v.as_str().map(PathBuf::from));
795
796                        // Create a minimal display-only lower-level terminal and register it.
797                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
798                            let builder = TerminalBuilder::new_display_only(
799                                CursorShape::default(),
800                                AlternateScroll::On,
801                                None,
802                                0,
803                            )?;
804                            let lower = cx.new(|cx| builder.subscribe(cx));
805                            thread.on_terminal_provider_event(
806                                TerminalProviderEvent::Created {
807                                    terminal_id,
808                                    label: tc.title.clone(),
809                                    cwd,
810                                    output_byte_limit: None,
811                                    terminal: lower,
812                                },
813                                cx,
814                            );
815                            anyhow::Ok(())
816                        });
817                    }
818                }
819            }
820        }
821
822        // Forward the update to the acp_thread as usual.
823        session.thread.update(&mut self.cx.clone(), |thread, cx| {
824            thread.handle_session_update(notification.update.clone(), cx)
825        })??;
826
827        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
828        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
829            if let Some(meta) = &tcu.meta {
830                if let Some(term_out) = meta.get("terminal_output") {
831                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
832                        let terminal_id = acp::TerminalId::new(id_str);
833                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
834                            let data = s.as_bytes().to_vec();
835                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
836                                thread.on_terminal_provider_event(
837                                    TerminalProviderEvent::Output { terminal_id, data },
838                                    cx,
839                                );
840                            });
841                        }
842                    }
843                }
844
845                // terminal_exit
846                if let Some(term_exit) = meta.get("terminal_exit") {
847                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
848                        let terminal_id = acp::TerminalId::new(id_str);
849                        let status = acp::TerminalExitStatus::new()
850                            .exit_code(
851                                term_exit
852                                    .get("exit_code")
853                                    .and_then(|v| v.as_u64())
854                                    .map(|i| i as u32),
855                            )
856                            .signal(
857                                term_exit
858                                    .get("signal")
859                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
860                            );
861
862                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
863                            thread.on_terminal_provider_event(
864                                TerminalProviderEvent::Exit {
865                                    terminal_id,
866                                    status,
867                                },
868                                cx,
869                            );
870                        });
871                    }
872                }
873            }
874        }
875
876        Ok(())
877    }
878
879    async fn create_terminal(
880        &self,
881        args: acp::CreateTerminalRequest,
882    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
883        let thread = self.session_thread(&args.session_id)?;
884        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
885
886        let terminal_entity = acp_thread::create_terminal_entity(
887            args.command.clone(),
888            &args.args,
889            args.env
890                .into_iter()
891                .map(|env| (env.name, env.value))
892                .collect(),
893            args.cwd.clone(),
894            &project,
895            &mut self.cx.clone(),
896        )
897        .await?;
898
899        // Register with renderer
900        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
901            thread.register_terminal_created(
902                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
903                format!("{} {}", args.command, args.args.join(" ")),
904                args.cwd.clone(),
905                args.output_byte_limit,
906                terminal_entity,
907                cx,
908            )
909        })?;
910        let terminal_id =
911            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
912        Ok(acp::CreateTerminalResponse::new(terminal_id))
913    }
914
915    async fn kill_terminal_command(
916        &self,
917        args: acp::KillTerminalCommandRequest,
918    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
919        self.session_thread(&args.session_id)?
920            .update(&mut self.cx.clone(), |thread, cx| {
921                thread.kill_terminal(args.terminal_id, cx)
922            })??;
923
924        Ok(Default::default())
925    }
926
927    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
928        Err(acp::Error::method_not_found())
929    }
930
931    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
932        Err(acp::Error::method_not_found())
933    }
934
935    async fn release_terminal(
936        &self,
937        args: acp::ReleaseTerminalRequest,
938    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
939        self.session_thread(&args.session_id)?
940            .update(&mut self.cx.clone(), |thread, cx| {
941                thread.release_terminal(args.terminal_id, cx)
942            })??;
943
944        Ok(Default::default())
945    }
946
947    async fn terminal_output(
948        &self,
949        args: acp::TerminalOutputRequest,
950    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
951        self.session_thread(&args.session_id)?
952            .read_with(&mut self.cx.clone(), |thread, cx| {
953                let out = thread
954                    .terminal(args.terminal_id)?
955                    .read(cx)
956                    .current_output(cx);
957
958                Ok(out)
959            })?
960    }
961
962    async fn wait_for_terminal_exit(
963        &self,
964        args: acp::WaitForTerminalExitRequest,
965    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
966        let exit_status = self
967            .session_thread(&args.session_id)?
968            .update(&mut self.cx.clone(), |thread, cx| {
969                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
970            })??
971            .await;
972
973        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
974    }
975}
976
977impl ClientDelegate {
978    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
979        let sessions = self.sessions.borrow();
980        sessions
981            .get(session_id)
982            .context("Failed to get session")
983            .map(|session| session.thread.clone())
984    }
985}