acp.rs

   1use acp_thread::AgentConnection;
   2use acp_tools::AcpConnectionRegistry;
   3use action_log::ActionLog;
   4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   5use anyhow::anyhow;
   6use collections::HashMap;
   7use futures::AsyncBufReadExt as _;
   8use futures::io::BufReader;
   9use project::Project;
  10use project::agent_server_store::AgentServerCommand;
  11use serde::Deserialize;
  12use settings::Settings as _;
  13use task::ShellBuilder;
  14#[cfg(windows)]
  15use task::ShellKind;
  16use util::ResultExt as _;
  17
  18use std::path::PathBuf;
  19use std::{any::Any, cell::RefCell};
  20use std::{path::Path, rc::Rc};
  21use thiserror::Error;
  22
  23use anyhow::{Context as _, Result};
  24use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  25
  26use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  27use terminal::TerminalBuilder;
  28use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  29
  30#[derive(Debug, Error)]
  31#[error("Unsupported version")]
  32pub struct UnsupportedVersion;
  33
  34pub struct AcpConnection {
  35    server_name: SharedString,
  36    telemetry_id: SharedString,
  37    connection: Rc<acp::ClientSideConnection>,
  38    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  39    auth_methods: Vec<acp::AuthMethod>,
  40    agent_capabilities: acp::AgentCapabilities,
  41    default_mode: Option<acp::SessionModeId>,
  42    default_model: Option<acp::ModelId>,
  43    root_dir: PathBuf,
  44    // NB: Don't move this into the wait_task, since we need to ensure the process is
  45    // killed on drop (setting kill_on_drop on the command seems to not always work).
  46    child: smol::process::Child,
  47    _io_task: Task<Result<(), acp::Error>>,
  48    _wait_task: Task<Result<()>>,
  49    _stderr_task: Task<Result<()>>,
  50}
  51
  52pub struct AcpSession {
  53    thread: WeakEntity<AcpThread>,
  54    suppress_abort_err: bool,
  55    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  56    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  57}
  58
  59pub async fn connect(
  60    server_name: SharedString,
  61    command: AgentServerCommand,
  62    root_dir: &Path,
  63    default_mode: Option<acp::SessionModeId>,
  64    default_model: Option<acp::ModelId>,
  65    is_remote: bool,
  66    cx: &mut AsyncApp,
  67) -> Result<Rc<dyn AgentConnection>> {
  68    let conn = AcpConnection::stdio(
  69        server_name,
  70        command.clone(),
  71        root_dir,
  72        default_mode,
  73        default_model,
  74        is_remote,
  75        cx,
  76    )
  77    .await?;
  78    Ok(Rc::new(conn) as _)
  79}
  80
  81const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
  82
  83impl AcpConnection {
  84    pub async fn stdio(
  85        server_name: SharedString,
  86        command: AgentServerCommand,
  87        root_dir: &Path,
  88        default_mode: Option<acp::SessionModeId>,
  89        default_model: Option<acp::ModelId>,
  90        is_remote: bool,
  91        cx: &mut AsyncApp,
  92    ) -> Result<Self> {
  93        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone())?;
  94        let builder = ShellBuilder::new(&shell, cfg!(windows));
  95        #[cfg(windows)]
  96        let kind = builder.kind();
  97        let (cmd, args) = builder.build(Some(command.path.display().to_string()), &command.args);
  98
  99        let mut child = util::command::new_smol_command(cmd);
 100        #[cfg(windows)]
 101        if kind == ShellKind::Cmd {
 102            use smol::process::windows::CommandExt;
 103            for arg in args {
 104                child.raw_arg(arg);
 105            }
 106        } else {
 107            child.args(args);
 108        }
 109        #[cfg(not(windows))]
 110        child.args(args);
 111
 112        child
 113            .envs(command.env.iter().flatten())
 114            .stdin(std::process::Stdio::piped())
 115            .stdout(std::process::Stdio::piped())
 116            .stderr(std::process::Stdio::piped());
 117        if !is_remote {
 118            child.current_dir(root_dir);
 119        }
 120        let mut child = child.spawn()?;
 121
 122        let stdout = child.stdout.take().context("Failed to take stdout")?;
 123        let stdin = child.stdin.take().context("Failed to take stdin")?;
 124        let stderr = child.stderr.take().context("Failed to take stderr")?;
 125        log::debug!(
 126            "Spawning external agent server: {:?}, {:?}",
 127            command.path,
 128            command.args
 129        );
 130        log::trace!("Spawned (pid: {})", child.id());
 131
 132        let sessions = Rc::new(RefCell::new(HashMap::default()));
 133
 134        let (release_channel, version) = cx.update(|cx| {
 135            (
 136                release_channel::ReleaseChannel::try_global(cx)
 137                    .map(|release_channel| release_channel.display_name()),
 138                release_channel::AppVersion::global(cx).to_string(),
 139            )
 140        })?;
 141
 142        let client = ClientDelegate {
 143            sessions: sessions.clone(),
 144            cx: cx.clone(),
 145        };
 146        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 147            let foreground_executor = cx.foreground_executor().clone();
 148            move |fut| {
 149                foreground_executor.spawn(fut).detach();
 150            }
 151        });
 152
 153        let io_task = cx.background_spawn(io_task);
 154
 155        let stderr_task = cx.background_spawn(async move {
 156            let mut stderr = BufReader::new(stderr);
 157            let mut line = String::new();
 158            while let Ok(n) = stderr.read_line(&mut line).await
 159                && n > 0
 160            {
 161                log::warn!("agent stderr: {}", line.trim());
 162                line.clear();
 163            }
 164            Ok(())
 165        });
 166
 167        let wait_task = cx.spawn({
 168            let sessions = sessions.clone();
 169            let status_fut = child.status();
 170            async move |cx| {
 171                let status = status_fut.await?;
 172
 173                for session in sessions.borrow().values() {
 174                    session
 175                        .thread
 176                        .update(cx, |thread, cx| {
 177                            thread.emit_load_error(LoadError::Exited { status }, cx)
 178                        })
 179                        .ok();
 180                }
 181
 182                anyhow::Ok(())
 183            }
 184        });
 185
 186        let connection = Rc::new(connection);
 187
 188        cx.update(|cx| {
 189            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 190                registry.set_active_connection(server_name.clone(), &connection, cx)
 191            });
 192        })?;
 193
 194        let response = connection
 195            .initialize(
 196                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 197                    .client_capabilities(
 198                        acp::ClientCapabilities::new()
 199                            .fs(acp::FileSystemCapability::new()
 200                                .read_text_file(true)
 201                                .write_text_file(true))
 202                            .terminal(true)
 203                            // Experimental: Allow for rendering terminal output from the agents
 204                            .meta(acp::Meta::from_iter([
 205                                ("terminal_output".into(), true.into()),
 206                                ("terminal-auth".into(), true.into()),
 207                            ])),
 208                    )
 209                    .client_info(
 210                        acp::Implementation::new("zed", version)
 211                            .title(release_channel.map(ToOwned::to_owned)),
 212                    ),
 213            )
 214            .await?;
 215
 216        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 217            return Err(UnsupportedVersion.into());
 218        }
 219
 220        let telemetry_id = response
 221            .agent_info
 222            // Use the one the agent provides if we have one
 223            .map(|info| info.name.into())
 224            // Otherwise, just use the name
 225            .unwrap_or_else(|| server_name.clone());
 226
 227        Ok(Self {
 228            auth_methods: response.auth_methods,
 229            root_dir: root_dir.to_owned(),
 230            connection,
 231            server_name,
 232            telemetry_id,
 233            sessions,
 234            agent_capabilities: response.agent_capabilities,
 235            default_mode,
 236            default_model,
 237            _io_task: io_task,
 238            _wait_task: wait_task,
 239            _stderr_task: stderr_task,
 240            child,
 241        })
 242    }
 243
 244    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 245        &self.agent_capabilities.prompt_capabilities
 246    }
 247
 248    pub fn root_dir(&self) -> &Path {
 249        &self.root_dir
 250    }
 251}
 252
 253impl Drop for AcpConnection {
 254    fn drop(&mut self) {
 255        // See the comment on the child field.
 256        self.child.kill().log_err();
 257    }
 258}
 259
 260impl AgentConnection for AcpConnection {
 261    fn telemetry_id(&self) -> SharedString {
 262        self.telemetry_id.clone()
 263    }
 264
 265    fn new_thread(
 266        self: Rc<Self>,
 267        project: Entity<Project>,
 268        cwd: &Path,
 269        cx: &mut App,
 270    ) -> Task<Result<Entity<AcpThread>>> {
 271        let name = self.server_name.clone();
 272        let conn = self.connection.clone();
 273        let sessions = self.sessions.clone();
 274        let default_mode = self.default_mode.clone();
 275        let default_model = self.default_model.clone();
 276        let cwd = cwd.to_path_buf();
 277        let context_server_store = project.read(cx).context_server_store().read(cx);
 278        let mcp_servers = if project.read(cx).is_local() {
 279            context_server_store
 280                .configured_server_ids()
 281                .iter()
 282                .filter_map(|id| {
 283                    let configuration = context_server_store.configuration_for_server(id)?;
 284                    match &*configuration {
 285                        project::context_server_store::ContextServerConfiguration::Custom {
 286                            command,
 287                            ..
 288                        }
 289                        | project::context_server_store::ContextServerConfiguration::Extension {
 290                            command,
 291                            ..
 292                        } => Some(acp::McpServer::Stdio(
 293                            acp::McpServerStdio::new(id.0.to_string(), &command.path)
 294                                .args(command.args.clone())
 295                                .env(if let Some(env) = command.env.as_ref() {
 296                                    env.iter()
 297                                        .map(|(name, value)| acp::EnvVariable::new(name, value))
 298                                        .collect()
 299                                } else {
 300                                    vec![]
 301                                }),
 302                        )),
 303                        project::context_server_store::ContextServerConfiguration::Http {
 304                            url,
 305                            headers,
 306                        } => Some(acp::McpServer::Http(
 307                            acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
 308                                headers
 309                                    .iter()
 310                                    .map(|(name, value)| acp::HttpHeader::new(name, value))
 311                                    .collect(),
 312                            ),
 313                        )),
 314                    }
 315                })
 316                .collect()
 317        } else {
 318            // In SSH projects, the external agent is running on the remote
 319            // machine, and currently we only run MCP servers on the local
 320            // machine. So don't pass any MCP servers to the agent in that case.
 321            Vec::new()
 322        };
 323
 324        cx.spawn(async move |cx| {
 325            let response = conn
 326                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
 327                .await
 328                .map_err(|err| {
 329                    if err.code == acp::ErrorCode::AuthRequired {
 330                        let mut error = AuthRequired::new();
 331
 332                        if err.message != acp::ErrorCode::AuthRequired.to_string() {
 333                            error = error.with_description(err.message);
 334                        }
 335
 336                        anyhow!(error)
 337                    } else {
 338                        anyhow!(err)
 339                    }
 340                })?;
 341
 342            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
 343            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
 344
 345            if let Some(default_mode) = default_mode {
 346                if let Some(modes) = modes.as_ref() {
 347                    let mut modes_ref = modes.borrow_mut();
 348                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 349
 350                    if has_mode {
 351                        let initial_mode_id = modes_ref.current_mode_id.clone();
 352
 353                        cx.spawn({
 354                            let default_mode = default_mode.clone();
 355                            let session_id = response.session_id.clone();
 356                            let modes = modes.clone();
 357                            let conn = conn.clone();
 358                            async move |_| {
 359                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 360                                .await.log_err();
 361
 362                                if result.is_none() {
 363                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 364                                }
 365                            }
 366                        }).detach();
 367
 368                        modes_ref.current_mode_id = default_mode;
 369                    } else {
 370                        let available_modes = modes_ref
 371                            .available_modes
 372                            .iter()
 373                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 374                            .collect::<Vec<_>>()
 375                            .join("\n");
 376
 377                        log::warn!(
 378                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 379                        );
 380                    }
 381                } else {
 382                    log::warn!(
 383                        "`{name}` does not support modes, but `default_mode` was set in settings.",
 384                    );
 385                }
 386            }
 387
 388            if let Some(default_model) = default_model {
 389                if let Some(models) = models.as_ref() {
 390                    let mut models_ref = models.borrow_mut();
 391                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 392
 393                    if has_model {
 394                        let initial_model_id = models_ref.current_model_id.clone();
 395
 396                        cx.spawn({
 397                            let default_model = default_model.clone();
 398                            let session_id = response.session_id.clone();
 399                            let models = models.clone();
 400                            let conn = conn.clone();
 401                            async move |_| {
 402                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 403                                .await.log_err();
 404
 405                                if result.is_none() {
 406                                    models.borrow_mut().current_model_id = initial_model_id;
 407                                }
 408                            }
 409                        }).detach();
 410
 411                        models_ref.current_model_id = default_model;
 412                    } else {
 413                        let available_models = models_ref
 414                            .available_models
 415                            .iter()
 416                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 417                            .collect::<Vec<_>>()
 418                            .join("\n");
 419
 420                        log::warn!(
 421                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 422                        );
 423                    }
 424                } else {
 425                    log::warn!(
 426                        "`{name}` does not support model selection, but `default_model` was set in settings.",
 427                    );
 428                }
 429            }
 430
 431            let session_id = response.session_id;
 432            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
 433            let thread = cx.new(|cx| {
 434                AcpThread::new(
 435                    self.server_name.clone(),
 436                    self.clone(),
 437                    project,
 438                    action_log,
 439                    session_id.clone(),
 440                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 441                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 442                    cx,
 443                )
 444            })?;
 445
 446
 447            let session = AcpSession {
 448                thread: thread.downgrade(),
 449                suppress_abort_err: false,
 450                session_modes: modes,
 451                models,
 452            };
 453            sessions.borrow_mut().insert(session_id, session);
 454
 455            Ok(thread)
 456        })
 457    }
 458
 459    fn auth_methods(&self) -> &[acp::AuthMethod] {
 460        &self.auth_methods
 461    }
 462
 463    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 464        let conn = self.connection.clone();
 465        cx.foreground_executor().spawn(async move {
 466            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 467                .await?;
 468            Ok(())
 469        })
 470    }
 471
 472    fn prompt(
 473        &self,
 474        _id: Option<acp_thread::UserMessageId>,
 475        params: acp::PromptRequest,
 476        cx: &mut App,
 477    ) -> Task<Result<acp::PromptResponse>> {
 478        let conn = self.connection.clone();
 479        let sessions = self.sessions.clone();
 480        let session_id = params.session_id.clone();
 481        cx.foreground_executor().spawn(async move {
 482            let result = conn.prompt(params).await;
 483
 484            let mut suppress_abort_err = false;
 485
 486            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 487                suppress_abort_err = session.suppress_abort_err;
 488                session.suppress_abort_err = false;
 489            }
 490
 491            match result {
 492                Ok(response) => Ok(response),
 493                Err(err) => {
 494                    if err.code == acp::ErrorCode::AuthRequired {
 495                        return Err(anyhow!(acp::Error::auth_required()));
 496                    }
 497
 498                    if err.code != ErrorCode::InternalError {
 499                        anyhow::bail!(err)
 500                    }
 501
 502                    let Some(data) = &err.data else {
 503                        anyhow::bail!(err)
 504                    };
 505
 506                    // Temporary workaround until the following PR is generally available:
 507                    // https://github.com/google-gemini/gemini-cli/pull/6656
 508
 509                    #[derive(Deserialize)]
 510                    #[serde(deny_unknown_fields)]
 511                    struct ErrorDetails {
 512                        details: Box<str>,
 513                    }
 514
 515                    match serde_json::from_value(data.clone()) {
 516                        Ok(ErrorDetails { details }) => {
 517                            if suppress_abort_err
 518                                && (details.contains("This operation was aborted")
 519                                    || details.contains("The user aborted a request"))
 520                            {
 521                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
 522                            } else {
 523                                Err(anyhow!(details))
 524                            }
 525                        }
 526                        Err(_) => Err(anyhow!(err)),
 527                    }
 528                }
 529            }
 530        })
 531    }
 532
 533    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
 534        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
 535            session.suppress_abort_err = true;
 536        }
 537        let conn = self.connection.clone();
 538        let params = acp::CancelNotification::new(session_id.clone());
 539        cx.foreground_executor()
 540            .spawn(async move { conn.cancel(params).await })
 541            .detach();
 542    }
 543
 544    fn session_modes(
 545        &self,
 546        session_id: &acp::SessionId,
 547        _cx: &App,
 548    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
 549        let sessions = self.sessions.clone();
 550        let sessions_ref = sessions.borrow();
 551        let Some(session) = sessions_ref.get(session_id) else {
 552            return None;
 553        };
 554
 555        if let Some(modes) = session.session_modes.as_ref() {
 556            Some(Rc::new(AcpSessionModes {
 557                connection: self.connection.clone(),
 558                session_id: session_id.clone(),
 559                state: modes.clone(),
 560            }) as _)
 561        } else {
 562            None
 563        }
 564    }
 565
 566    fn model_selector(
 567        &self,
 568        session_id: &acp::SessionId,
 569    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
 570        let sessions = self.sessions.clone();
 571        let sessions_ref = sessions.borrow();
 572        let Some(session) = sessions_ref.get(session_id) else {
 573            return None;
 574        };
 575
 576        if let Some(models) = session.models.as_ref() {
 577            Some(Rc::new(AcpModelSelector::new(
 578                session_id.clone(),
 579                self.connection.clone(),
 580                models.clone(),
 581            )) as _)
 582        } else {
 583            None
 584        }
 585    }
 586
 587    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 588        self
 589    }
 590}
 591
 592struct AcpSessionModes {
 593    session_id: acp::SessionId,
 594    connection: Rc<acp::ClientSideConnection>,
 595    state: Rc<RefCell<acp::SessionModeState>>,
 596}
 597
 598impl acp_thread::AgentSessionModes for AcpSessionModes {
 599    fn current_mode(&self) -> acp::SessionModeId {
 600        self.state.borrow().current_mode_id.clone()
 601    }
 602
 603    fn all_modes(&self) -> Vec<acp::SessionMode> {
 604        self.state.borrow().available_modes.clone()
 605    }
 606
 607    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
 608        let connection = self.connection.clone();
 609        let session_id = self.session_id.clone();
 610        let old_mode_id;
 611        {
 612            let mut state = self.state.borrow_mut();
 613            old_mode_id = state.current_mode_id.clone();
 614            state.current_mode_id = mode_id.clone();
 615        };
 616        let state = self.state.clone();
 617        cx.foreground_executor().spawn(async move {
 618            let result = connection
 619                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
 620                .await;
 621
 622            if result.is_err() {
 623                state.borrow_mut().current_mode_id = old_mode_id;
 624            }
 625
 626            result?;
 627
 628            Ok(())
 629        })
 630    }
 631}
 632
 633struct AcpModelSelector {
 634    session_id: acp::SessionId,
 635    connection: Rc<acp::ClientSideConnection>,
 636    state: Rc<RefCell<acp::SessionModelState>>,
 637}
 638
 639impl AcpModelSelector {
 640    fn new(
 641        session_id: acp::SessionId,
 642        connection: Rc<acp::ClientSideConnection>,
 643        state: Rc<RefCell<acp::SessionModelState>>,
 644    ) -> Self {
 645        Self {
 646            session_id,
 647            connection,
 648            state,
 649        }
 650    }
 651}
 652
 653impl acp_thread::AgentModelSelector for AcpModelSelector {
 654    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
 655        Task::ready(Ok(acp_thread::AgentModelList::Flat(
 656            self.state
 657                .borrow()
 658                .available_models
 659                .clone()
 660                .into_iter()
 661                .map(acp_thread::AgentModelInfo::from)
 662                .collect(),
 663        )))
 664    }
 665
 666    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
 667        let connection = self.connection.clone();
 668        let session_id = self.session_id.clone();
 669        let old_model_id;
 670        {
 671            let mut state = self.state.borrow_mut();
 672            old_model_id = state.current_model_id.clone();
 673            state.current_model_id = model_id.clone();
 674        };
 675        let state = self.state.clone();
 676        cx.foreground_executor().spawn(async move {
 677            let result = connection
 678                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
 679                .await;
 680
 681            if result.is_err() {
 682                state.borrow_mut().current_model_id = old_model_id;
 683            }
 684
 685            result?;
 686
 687            Ok(())
 688        })
 689    }
 690
 691    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
 692        let state = self.state.borrow();
 693        Task::ready(
 694            state
 695                .available_models
 696                .iter()
 697                .find(|m| m.model_id == state.current_model_id)
 698                .cloned()
 699                .map(acp_thread::AgentModelInfo::from)
 700                .ok_or_else(|| anyhow::anyhow!("Model not found")),
 701        )
 702    }
 703}
 704
 705struct ClientDelegate {
 706    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 707    cx: AsyncApp,
 708}
 709
 710#[async_trait::async_trait(?Send)]
 711impl acp::Client for ClientDelegate {
 712    async fn request_permission(
 713        &self,
 714        arguments: acp::RequestPermissionRequest,
 715    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
 716        let respect_always_allow_setting;
 717        let thread;
 718        {
 719            let sessions_ref = self.sessions.borrow();
 720            let session = sessions_ref
 721                .get(&arguments.session_id)
 722                .context("Failed to get session")?;
 723            respect_always_allow_setting = session.session_modes.is_none();
 724            thread = session.thread.clone();
 725        }
 726
 727        let cx = &mut self.cx.clone();
 728
 729        let task = thread.update(cx, |thread, cx| {
 730            thread.request_tool_call_authorization(
 731                arguments.tool_call,
 732                arguments.options,
 733                respect_always_allow_setting,
 734                cx,
 735            )
 736        })??;
 737
 738        let outcome = task.await;
 739
 740        Ok(acp::RequestPermissionResponse::new(outcome))
 741    }
 742
 743    async fn write_text_file(
 744        &self,
 745        arguments: acp::WriteTextFileRequest,
 746    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
 747        let cx = &mut self.cx.clone();
 748        let task = self
 749            .session_thread(&arguments.session_id)?
 750            .update(cx, |thread, cx| {
 751                thread.write_text_file(arguments.path, arguments.content, cx)
 752            })?;
 753
 754        task.await?;
 755
 756        Ok(Default::default())
 757    }
 758
 759    async fn read_text_file(
 760        &self,
 761        arguments: acp::ReadTextFileRequest,
 762    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
 763        let task = self.session_thread(&arguments.session_id)?.update(
 764            &mut self.cx.clone(),
 765            |thread, cx| {
 766                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
 767            },
 768        )?;
 769
 770        let content = task.await?;
 771
 772        Ok(acp::ReadTextFileResponse::new(content))
 773    }
 774
 775    async fn session_notification(
 776        &self,
 777        notification: acp::SessionNotification,
 778    ) -> Result<(), acp::Error> {
 779        let sessions = self.sessions.borrow();
 780        let session = sessions
 781            .get(&notification.session_id)
 782            .context("Failed to get session")?;
 783
 784        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
 785            current_mode_id,
 786            ..
 787        }) = &notification.update
 788        {
 789            if let Some(session_modes) = &session.session_modes {
 790                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
 791            } else {
 792                log::error!(
 793                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
 794                );
 795            }
 796        }
 797
 798        // Clone so we can inspect meta both before and after handing off to the thread
 799        let update_clone = notification.update.clone();
 800
 801        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
 802        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
 803            if let Some(meta) = &tc.meta {
 804                if let Some(terminal_info) = meta.get("terminal_info") {
 805                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
 806                    {
 807                        let terminal_id = acp::TerminalId::new(id_str);
 808                        let cwd = terminal_info
 809                            .get("cwd")
 810                            .and_then(|v| v.as_str().map(PathBuf::from));
 811
 812                        // Create a minimal display-only lower-level terminal and register it.
 813                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
 814                            let builder = TerminalBuilder::new_display_only(
 815                                CursorShape::default(),
 816                                AlternateScroll::On,
 817                                None,
 818                                0,
 819                            )?;
 820                            let lower = cx.new(|cx| builder.subscribe(cx));
 821                            thread.on_terminal_provider_event(
 822                                TerminalProviderEvent::Created {
 823                                    terminal_id,
 824                                    label: tc.title.clone(),
 825                                    cwd,
 826                                    output_byte_limit: None,
 827                                    terminal: lower,
 828                                },
 829                                cx,
 830                            );
 831                            anyhow::Ok(())
 832                        });
 833                    }
 834                }
 835            }
 836        }
 837
 838        // Forward the update to the acp_thread as usual.
 839        session.thread.update(&mut self.cx.clone(), |thread, cx| {
 840            thread.handle_session_update(notification.update.clone(), cx)
 841        })??;
 842
 843        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
 844        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
 845            if let Some(meta) = &tcu.meta {
 846                if let Some(term_out) = meta.get("terminal_output") {
 847                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
 848                        let terminal_id = acp::TerminalId::new(id_str);
 849                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
 850                            let data = s.as_bytes().to_vec();
 851                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
 852                                thread.on_terminal_provider_event(
 853                                    TerminalProviderEvent::Output { terminal_id, data },
 854                                    cx,
 855                                );
 856                            });
 857                        }
 858                    }
 859                }
 860
 861                // terminal_exit
 862                if let Some(term_exit) = meta.get("terminal_exit") {
 863                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
 864                        let terminal_id = acp::TerminalId::new(id_str);
 865                        let status = acp::TerminalExitStatus::new()
 866                            .exit_code(
 867                                term_exit
 868                                    .get("exit_code")
 869                                    .and_then(|v| v.as_u64())
 870                                    .map(|i| i as u32),
 871                            )
 872                            .signal(
 873                                term_exit
 874                                    .get("signal")
 875                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
 876                            );
 877
 878                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
 879                            thread.on_terminal_provider_event(
 880                                TerminalProviderEvent::Exit {
 881                                    terminal_id,
 882                                    status,
 883                                },
 884                                cx,
 885                            );
 886                        });
 887                    }
 888                }
 889            }
 890        }
 891
 892        Ok(())
 893    }
 894
 895    async fn create_terminal(
 896        &self,
 897        args: acp::CreateTerminalRequest,
 898    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
 899        let thread = self.session_thread(&args.session_id)?;
 900        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
 901
 902        let terminal_entity = acp_thread::create_terminal_entity(
 903            args.command.clone(),
 904            &args.args,
 905            args.env
 906                .into_iter()
 907                .map(|env| (env.name, env.value))
 908                .collect(),
 909            args.cwd.clone(),
 910            &project,
 911            &mut self.cx.clone(),
 912        )
 913        .await?;
 914
 915        // Register with renderer
 916        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
 917            thread.register_terminal_created(
 918                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
 919                format!("{} {}", args.command, args.args.join(" ")),
 920                args.cwd.clone(),
 921                args.output_byte_limit,
 922                terminal_entity,
 923                cx,
 924            )
 925        })?;
 926        let terminal_id =
 927            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
 928        Ok(acp::CreateTerminalResponse::new(terminal_id))
 929    }
 930
 931    async fn kill_terminal_command(
 932        &self,
 933        args: acp::KillTerminalCommandRequest,
 934    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
 935        self.session_thread(&args.session_id)?
 936            .update(&mut self.cx.clone(), |thread, cx| {
 937                thread.kill_terminal(args.terminal_id, cx)
 938            })??;
 939
 940        Ok(Default::default())
 941    }
 942
 943    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
 944        Err(acp::Error::method_not_found())
 945    }
 946
 947    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
 948        Err(acp::Error::method_not_found())
 949    }
 950
 951    async fn release_terminal(
 952        &self,
 953        args: acp::ReleaseTerminalRequest,
 954    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
 955        self.session_thread(&args.session_id)?
 956            .update(&mut self.cx.clone(), |thread, cx| {
 957                thread.release_terminal(args.terminal_id, cx)
 958            })??;
 959
 960        Ok(Default::default())
 961    }
 962
 963    async fn terminal_output(
 964        &self,
 965        args: acp::TerminalOutputRequest,
 966    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
 967        self.session_thread(&args.session_id)?
 968            .read_with(&mut self.cx.clone(), |thread, cx| {
 969                let out = thread
 970                    .terminal(args.terminal_id)?
 971                    .read(cx)
 972                    .current_output(cx);
 973
 974                Ok(out)
 975            })?
 976    }
 977
 978    async fn wait_for_terminal_exit(
 979        &self,
 980        args: acp::WaitForTerminalExitRequest,
 981    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
 982        let exit_status = self
 983            .session_thread(&args.session_id)?
 984            .update(&mut self.cx.clone(), |thread, cx| {
 985                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
 986            })??
 987            .await;
 988
 989        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
 990    }
 991}
 992
 993impl ClientDelegate {
 994    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
 995        let sessions = self.sessions.borrow();
 996        sessions
 997            .get(session_id)
 998            .context("Failed to get session")
 999            .map(|session| session.thread.clone())
1000    }
1001}