acp.rs

   1use acp_thread::AgentConnection;
   2use acp_tools::AcpConnectionRegistry;
   3use action_log::ActionLog;
   4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   5use anyhow::anyhow;
   6use collections::HashMap;
   7use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
   8use futures::AsyncBufReadExt as _;
   9use futures::io::BufReader;
  10use project::Project;
  11use project::agent_server_store::AgentServerCommand;
  12use serde::Deserialize;
  13use settings::Settings as _;
  14use task::ShellBuilder;
  15use util::ResultExt as _;
  16
  17use std::path::PathBuf;
  18use std::{any::Any, cell::RefCell};
  19use std::{path::Path, rc::Rc};
  20use thiserror::Error;
  21
  22use anyhow::{Context as _, Result};
  23use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  24
  25use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  26use terminal::TerminalBuilder;
  27use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  28
  29#[derive(Debug, Error)]
  30#[error("Unsupported version")]
  31pub struct UnsupportedVersion;
  32
  33pub struct AcpConnection {
  34    server_name: SharedString,
  35    telemetry_id: SharedString,
  36    connection: Rc<acp::ClientSideConnection>,
  37    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  38    auth_methods: Vec<acp::AuthMethod>,
  39    agent_capabilities: acp::AgentCapabilities,
  40    default_mode: Option<acp::SessionModeId>,
  41    default_model: Option<acp::ModelId>,
  42    default_config_options: HashMap<String, String>,
  43    root_dir: PathBuf,
  44    // NB: Don't move this into the wait_task, since we need to ensure the process is
  45    // killed on drop (setting kill_on_drop on the command seems to not always work).
  46    child: smol::process::Child,
  47    _io_task: Task<Result<(), acp::Error>>,
  48    _wait_task: Task<Result<()>>,
  49    _stderr_task: Task<Result<()>>,
  50}
  51
  52struct ConfigOptions {
  53    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  54    tx: Rc<RefCell<watch::Sender<()>>>,
  55    rx: watch::Receiver<()>,
  56}
  57
  58impl ConfigOptions {
  59    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  60        let (tx, rx) = watch::channel(());
  61        Self {
  62            config_options,
  63            tx: Rc::new(RefCell::new(tx)),
  64            rx,
  65        }
  66    }
  67}
  68
  69pub struct AcpSession {
  70    thread: WeakEntity<AcpThread>,
  71    suppress_abort_err: bool,
  72    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  73    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  74    config_options: Option<ConfigOptions>,
  75}
  76
  77pub async fn connect(
  78    server_name: SharedString,
  79    command: AgentServerCommand,
  80    root_dir: &Path,
  81    default_mode: Option<acp::SessionModeId>,
  82    default_model: Option<acp::ModelId>,
  83    default_config_options: HashMap<String, String>,
  84    is_remote: bool,
  85    cx: &mut AsyncApp,
  86) -> Result<Rc<dyn AgentConnection>> {
  87    let conn = AcpConnection::stdio(
  88        server_name,
  89        command.clone(),
  90        root_dir,
  91        default_mode,
  92        default_model,
  93        default_config_options,
  94        is_remote,
  95        cx,
  96    )
  97    .await?;
  98    Ok(Rc::new(conn) as _)
  99}
 100
 101const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 102
 103impl AcpConnection {
 104    pub async fn stdio(
 105        server_name: SharedString,
 106        command: AgentServerCommand,
 107        root_dir: &Path,
 108        default_mode: Option<acp::SessionModeId>,
 109        default_model: Option<acp::ModelId>,
 110        default_config_options: HashMap<String, String>,
 111        is_remote: bool,
 112        cx: &mut AsyncApp,
 113    ) -> Result<Self> {
 114        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone())?;
 115        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 116        let mut child =
 117            builder.build_command(Some(command.path.display().to_string()), &command.args);
 118        child
 119            .envs(command.env.iter().flatten())
 120            .stdin(std::process::Stdio::piped())
 121            .stdout(std::process::Stdio::piped())
 122            .stderr(std::process::Stdio::piped());
 123        if !is_remote {
 124            child.current_dir(root_dir);
 125        }
 126        let mut child = child.spawn()?;
 127
 128        let stdout = child.stdout.take().context("Failed to take stdout")?;
 129        let stdin = child.stdin.take().context("Failed to take stdin")?;
 130        let stderr = child.stderr.take().context("Failed to take stderr")?;
 131        log::debug!(
 132            "Spawning external agent server: {:?}, {:?}",
 133            command.path,
 134            command.args
 135        );
 136        log::trace!("Spawned (pid: {})", child.id());
 137
 138        let sessions = Rc::new(RefCell::new(HashMap::default()));
 139
 140        let (release_channel, version) = cx.update(|cx| {
 141            (
 142                release_channel::ReleaseChannel::try_global(cx)
 143                    .map(|release_channel| release_channel.display_name()),
 144                release_channel::AppVersion::global(cx).to_string(),
 145            )
 146        })?;
 147
 148        let client = ClientDelegate {
 149            sessions: sessions.clone(),
 150            cx: cx.clone(),
 151        };
 152        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 153            let foreground_executor = cx.foreground_executor().clone();
 154            move |fut| {
 155                foreground_executor.spawn(fut).detach();
 156            }
 157        });
 158
 159        let io_task = cx.background_spawn(io_task);
 160
 161        let stderr_task = cx.background_spawn(async move {
 162            let mut stderr = BufReader::new(stderr);
 163            let mut line = String::new();
 164            while let Ok(n) = stderr.read_line(&mut line).await
 165                && n > 0
 166            {
 167                log::warn!("agent stderr: {}", line.trim());
 168                line.clear();
 169            }
 170            Ok(())
 171        });
 172
 173        let wait_task = cx.spawn({
 174            let sessions = sessions.clone();
 175            let status_fut = child.status();
 176            async move |cx| {
 177                let status = status_fut.await?;
 178
 179                for session in sessions.borrow().values() {
 180                    session
 181                        .thread
 182                        .update(cx, |thread, cx| {
 183                            thread.emit_load_error(LoadError::Exited { status }, cx)
 184                        })
 185                        .ok();
 186                }
 187
 188                anyhow::Ok(())
 189            }
 190        });
 191
 192        let connection = Rc::new(connection);
 193
 194        cx.update(|cx| {
 195            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 196                registry.set_active_connection(server_name.clone(), &connection, cx)
 197            });
 198        })?;
 199
 200        let response = connection
 201            .initialize(
 202                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 203                    .client_capabilities(
 204                        acp::ClientCapabilities::new()
 205                            .fs(acp::FileSystemCapability::new()
 206                                .read_text_file(true)
 207                                .write_text_file(true))
 208                            .terminal(true)
 209                            // Experimental: Allow for rendering terminal output from the agents
 210                            .meta(acp::Meta::from_iter([
 211                                ("terminal_output".into(), true.into()),
 212                                ("terminal-auth".into(), true.into()),
 213                            ])),
 214                    )
 215                    .client_info(
 216                        acp::Implementation::new("zed", version)
 217                            .title(release_channel.map(ToOwned::to_owned)),
 218                    ),
 219            )
 220            .await?;
 221
 222        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 223            return Err(UnsupportedVersion.into());
 224        }
 225
 226        let telemetry_id = response
 227            .agent_info
 228            // Use the one the agent provides if we have one
 229            .map(|info| info.name.into())
 230            // Otherwise, just use the name
 231            .unwrap_or_else(|| server_name.clone());
 232
 233        Ok(Self {
 234            auth_methods: response.auth_methods,
 235            root_dir: root_dir.to_owned(),
 236            connection,
 237            server_name,
 238            telemetry_id,
 239            sessions,
 240            agent_capabilities: response.agent_capabilities,
 241            default_mode,
 242            default_model,
 243            default_config_options,
 244            _io_task: io_task,
 245            _wait_task: wait_task,
 246            _stderr_task: stderr_task,
 247            child,
 248        })
 249    }
 250
 251    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 252        &self.agent_capabilities.prompt_capabilities
 253    }
 254
 255    pub fn root_dir(&self) -> &Path {
 256        &self.root_dir
 257    }
 258}
 259
 260impl Drop for AcpConnection {
 261    fn drop(&mut self) {
 262        // See the comment on the child field.
 263        self.child.kill().log_err();
 264    }
 265}
 266
 267impl AgentConnection for AcpConnection {
 268    fn telemetry_id(&self) -> SharedString {
 269        self.telemetry_id.clone()
 270    }
 271
 272    fn new_thread(
 273        self: Rc<Self>,
 274        project: Entity<Project>,
 275        cwd: &Path,
 276        cx: &mut App,
 277    ) -> Task<Result<Entity<AcpThread>>> {
 278        let name = self.server_name.clone();
 279        let conn = self.connection.clone();
 280        let sessions = self.sessions.clone();
 281        let default_mode = self.default_mode.clone();
 282        let default_model = self.default_model.clone();
 283        let default_config_options = self.default_config_options.clone();
 284        let cwd = cwd.to_path_buf();
 285        let context_server_store = project.read(cx).context_server_store().read(cx);
 286        let mcp_servers = if project.read(cx).is_local() {
 287            context_server_store
 288                .configured_server_ids()
 289                .iter()
 290                .filter_map(|id| {
 291                    let configuration = context_server_store.configuration_for_server(id)?;
 292                    match &*configuration {
 293                        project::context_server_store::ContextServerConfiguration::Custom {
 294                            command,
 295                            ..
 296                        }
 297                        | project::context_server_store::ContextServerConfiguration::Extension {
 298                            command,
 299                            ..
 300                        } => Some(acp::McpServer::Stdio(
 301                            acp::McpServerStdio::new(id.0.to_string(), &command.path)
 302                                .args(command.args.clone())
 303                                .env(if let Some(env) = command.env.as_ref() {
 304                                    env.iter()
 305                                        .map(|(name, value)| acp::EnvVariable::new(name, value))
 306                                        .collect()
 307                                } else {
 308                                    vec![]
 309                                }),
 310                        )),
 311                        project::context_server_store::ContextServerConfiguration::Http {
 312                            url,
 313                            headers,
 314                        } => Some(acp::McpServer::Http(
 315                            acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
 316                                headers
 317                                    .iter()
 318                                    .map(|(name, value)| acp::HttpHeader::new(name, value))
 319                                    .collect(),
 320                            ),
 321                        )),
 322                    }
 323                })
 324                .collect()
 325        } else {
 326            // In SSH projects, the external agent is running on the remote
 327            // machine, and currently we only run MCP servers on the local
 328            // machine. So don't pass any MCP servers to the agent in that case.
 329            Vec::new()
 330        };
 331
 332        cx.spawn(async move |cx| {
 333            let response = conn
 334                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
 335                .await
 336                .map_err(|err| {
 337                    if err.code == acp::ErrorCode::AuthRequired {
 338                        let mut error = AuthRequired::new();
 339
 340                        if err.message != acp::ErrorCode::AuthRequired.to_string() {
 341                            error = error.with_description(err.message);
 342                        }
 343
 344                        anyhow!(error)
 345                    } else {
 346                        anyhow!(err)
 347                    }
 348                })?;
 349
 350            let use_config_options = cx.update(|cx| cx.has_flag::<AcpBetaFeatureFlag>())?;
 351
 352            // Config options take precedence over legacy modes/models
 353            let (modes, models, config_options) = if use_config_options && let Some(opts) = response.config_options {
 354                (
 355                    None,
 356                    None,
 357                    Some(Rc::new(RefCell::new(opts))),
 358                )
 359            } else {
 360                // Fall back to legacy modes/models
 361                let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
 362                let models = response.models.map(|models| Rc::new(RefCell::new(models)));
 363                (modes, models, None)
 364            };
 365
 366            if let Some(default_mode) = default_mode {
 367                if let Some(modes) = modes.as_ref() {
 368                    let mut modes_ref = modes.borrow_mut();
 369                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 370
 371                    if has_mode {
 372                        let initial_mode_id = modes_ref.current_mode_id.clone();
 373
 374                        cx.spawn({
 375                            let default_mode = default_mode.clone();
 376                            let session_id = response.session_id.clone();
 377                            let modes = modes.clone();
 378                            let conn = conn.clone();
 379                            async move |_| {
 380                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 381                                .await.log_err();
 382
 383                                if result.is_none() {
 384                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 385                                }
 386                            }
 387                        }).detach();
 388
 389                        modes_ref.current_mode_id = default_mode;
 390                    } else {
 391                        let available_modes = modes_ref
 392                            .available_modes
 393                            .iter()
 394                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 395                            .collect::<Vec<_>>()
 396                            .join("\n");
 397
 398                        log::warn!(
 399                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 400                        );
 401                    }
 402                } else {
 403                    log::warn!(
 404                        "`{name}` does not support modes, but `default_mode` was set in settings.",
 405                    );
 406                }
 407            }
 408
 409            if let Some(default_model) = default_model {
 410                if let Some(models) = models.as_ref() {
 411                    let mut models_ref = models.borrow_mut();
 412                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 413
 414                    if has_model {
 415                        let initial_model_id = models_ref.current_model_id.clone();
 416
 417                        cx.spawn({
 418                            let default_model = default_model.clone();
 419                            let session_id = response.session_id.clone();
 420                            let models = models.clone();
 421                            let conn = conn.clone();
 422                            async move |_| {
 423                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 424                                .await.log_err();
 425
 426                                if result.is_none() {
 427                                    models.borrow_mut().current_model_id = initial_model_id;
 428                                }
 429                            }
 430                        }).detach();
 431
 432                        models_ref.current_model_id = default_model;
 433                    } else {
 434                        let available_models = models_ref
 435                            .available_models
 436                            .iter()
 437                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 438                            .collect::<Vec<_>>()
 439                            .join("\n");
 440
 441                        log::warn!(
 442                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 443                        );
 444                    }
 445                } else {
 446                    log::warn!(
 447                        "`{name}` does not support model selection, but `default_model` was set in settings.",
 448                    );
 449                }
 450            }
 451
 452            if let Some(config_opts) = config_options.as_ref() {
 453                let defaults_to_apply: Vec<_> = {
 454                    let config_opts_ref = config_opts.borrow();
 455                    config_opts_ref
 456                        .iter()
 457                        .filter_map(|config_option| {
 458                            let default_value = default_config_options.get(&*config_option.id.0)?;
 459
 460                            let is_valid = match &config_option.kind {
 461                                acp::SessionConfigKind::Select(select) => match &select.options {
 462                                    acp::SessionConfigSelectOptions::Ungrouped(options) => {
 463                                        options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
 464                                    }
 465                                    acp::SessionConfigSelectOptions::Grouped(groups) => groups
 466                                        .iter()
 467                                        .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
 468                                    _ => false,
 469                                },
 470                                _ => false,
 471                            };
 472
 473                            if is_valid {
 474                                let initial_value = match &config_option.kind {
 475                                    acp::SessionConfigKind::Select(select) => {
 476                                        Some(select.current_value.clone())
 477                                    }
 478                                    _ => None,
 479                                };
 480                                Some((config_option.id.clone(), default_value.clone(), initial_value))
 481                            } else {
 482                                log::warn!(
 483                                    "`{}` is not a valid value for config option `{}` in {}",
 484                                    default_value,
 485                                    config_option.id.0,
 486                                    name
 487                                );
 488                                None
 489                            }
 490                        })
 491                        .collect()
 492                };
 493
 494                for (config_id, default_value, initial_value) in defaults_to_apply {
 495                    cx.spawn({
 496                        let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 497                        let session_id = response.session_id.clone();
 498                        let config_id_clone = config_id.clone();
 499                        let config_opts = config_opts.clone();
 500                        let conn = conn.clone();
 501                        async move |_| {
 502                            let result = conn
 503                                .set_session_config_option(
 504                                    acp::SetSessionConfigOptionRequest::new(
 505                                        session_id,
 506                                        config_id_clone.clone(),
 507                                        default_value_id,
 508                                    ),
 509                                )
 510                                .await
 511                                .log_err();
 512
 513                            if result.is_none() {
 514                                if let Some(initial) = initial_value {
 515                                    let mut opts = config_opts.borrow_mut();
 516                                    if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 517                                        if let acp::SessionConfigKind::Select(select) =
 518                                            &mut opt.kind
 519                                        {
 520                                            select.current_value = initial;
 521                                        }
 522                                    }
 523                                }
 524                            }
 525                        }
 526                    })
 527                    .detach();
 528
 529                    let mut opts = config_opts.borrow_mut();
 530                    if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 531                        if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 532                            select.current_value = acp::SessionConfigValueId::new(default_value);
 533                        }
 534                    }
 535                }
 536            }
 537
 538            let session_id = response.session_id;
 539            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
 540            let thread = cx.new(|cx| {
 541                AcpThread::new(
 542                    self.server_name.clone(),
 543                    self.clone(),
 544                    project,
 545                    action_log,
 546                    session_id.clone(),
 547                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 548                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 549                    cx,
 550                )
 551            })?;
 552
 553
 554            let session = AcpSession {
 555                thread: thread.downgrade(),
 556                suppress_abort_err: false,
 557                session_modes: modes,
 558                models,
 559                config_options: config_options.map(|opts| ConfigOptions::new(opts))
 560            };
 561            sessions.borrow_mut().insert(session_id, session);
 562
 563            Ok(thread)
 564        })
 565    }
 566
 567    fn auth_methods(&self) -> &[acp::AuthMethod] {
 568        &self.auth_methods
 569    }
 570
 571    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 572        let conn = self.connection.clone();
 573        cx.foreground_executor().spawn(async move {
 574            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 575                .await?;
 576            Ok(())
 577        })
 578    }
 579
 580    fn prompt(
 581        &self,
 582        _id: Option<acp_thread::UserMessageId>,
 583        params: acp::PromptRequest,
 584        cx: &mut App,
 585    ) -> Task<Result<acp::PromptResponse>> {
 586        let conn = self.connection.clone();
 587        let sessions = self.sessions.clone();
 588        let session_id = params.session_id.clone();
 589        cx.foreground_executor().spawn(async move {
 590            let result = conn.prompt(params).await;
 591
 592            let mut suppress_abort_err = false;
 593
 594            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 595                suppress_abort_err = session.suppress_abort_err;
 596                session.suppress_abort_err = false;
 597            }
 598
 599            match result {
 600                Ok(response) => Ok(response),
 601                Err(err) => {
 602                    if err.code == acp::ErrorCode::AuthRequired {
 603                        return Err(anyhow!(acp::Error::auth_required()));
 604                    }
 605
 606                    if err.code != ErrorCode::InternalError {
 607                        anyhow::bail!(err)
 608                    }
 609
 610                    let Some(data) = &err.data else {
 611                        anyhow::bail!(err)
 612                    };
 613
 614                    // Temporary workaround until the following PR is generally available:
 615                    // https://github.com/google-gemini/gemini-cli/pull/6656
 616
 617                    #[derive(Deserialize)]
 618                    #[serde(deny_unknown_fields)]
 619                    struct ErrorDetails {
 620                        details: Box<str>,
 621                    }
 622
 623                    match serde_json::from_value(data.clone()) {
 624                        Ok(ErrorDetails { details }) => {
 625                            if suppress_abort_err
 626                                && (details.contains("This operation was aborted")
 627                                    || details.contains("The user aborted a request"))
 628                            {
 629                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
 630                            } else {
 631                                Err(anyhow!(details))
 632                            }
 633                        }
 634                        Err(_) => Err(anyhow!(err)),
 635                    }
 636                }
 637            }
 638        })
 639    }
 640
 641    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
 642        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
 643            session.suppress_abort_err = true;
 644        }
 645        let conn = self.connection.clone();
 646        let params = acp::CancelNotification::new(session_id.clone());
 647        cx.foreground_executor()
 648            .spawn(async move { conn.cancel(params).await })
 649            .detach();
 650    }
 651
 652    fn session_modes(
 653        &self,
 654        session_id: &acp::SessionId,
 655        _cx: &App,
 656    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
 657        let sessions = self.sessions.clone();
 658        let sessions_ref = sessions.borrow();
 659        let Some(session) = sessions_ref.get(session_id) else {
 660            return None;
 661        };
 662
 663        if let Some(modes) = session.session_modes.as_ref() {
 664            Some(Rc::new(AcpSessionModes {
 665                connection: self.connection.clone(),
 666                session_id: session_id.clone(),
 667                state: modes.clone(),
 668            }) as _)
 669        } else {
 670            None
 671        }
 672    }
 673
 674    fn model_selector(
 675        &self,
 676        session_id: &acp::SessionId,
 677    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
 678        let sessions = self.sessions.clone();
 679        let sessions_ref = sessions.borrow();
 680        let Some(session) = sessions_ref.get(session_id) else {
 681            return None;
 682        };
 683
 684        if let Some(models) = session.models.as_ref() {
 685            Some(Rc::new(AcpModelSelector::new(
 686                session_id.clone(),
 687                self.connection.clone(),
 688                models.clone(),
 689            )) as _)
 690        } else {
 691            None
 692        }
 693    }
 694
 695    fn session_config_options(
 696        &self,
 697        session_id: &acp::SessionId,
 698        _cx: &App,
 699    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
 700        let sessions = self.sessions.borrow();
 701        let session = sessions.get(session_id)?;
 702
 703        let config_opts = session.config_options.as_ref()?;
 704
 705        Some(Rc::new(AcpSessionConfigOptions {
 706            session_id: session_id.clone(),
 707            connection: self.connection.clone(),
 708            state: config_opts.config_options.clone(),
 709            watch_tx: config_opts.tx.clone(),
 710            watch_rx: config_opts.rx.clone(),
 711        }) as _)
 712    }
 713
 714    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 715        self
 716    }
 717}
 718
 719struct AcpSessionModes {
 720    session_id: acp::SessionId,
 721    connection: Rc<acp::ClientSideConnection>,
 722    state: Rc<RefCell<acp::SessionModeState>>,
 723}
 724
 725impl acp_thread::AgentSessionModes for AcpSessionModes {
 726    fn current_mode(&self) -> acp::SessionModeId {
 727        self.state.borrow().current_mode_id.clone()
 728    }
 729
 730    fn all_modes(&self) -> Vec<acp::SessionMode> {
 731        self.state.borrow().available_modes.clone()
 732    }
 733
 734    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
 735        let connection = self.connection.clone();
 736        let session_id = self.session_id.clone();
 737        let old_mode_id;
 738        {
 739            let mut state = self.state.borrow_mut();
 740            old_mode_id = state.current_mode_id.clone();
 741            state.current_mode_id = mode_id.clone();
 742        };
 743        let state = self.state.clone();
 744        cx.foreground_executor().spawn(async move {
 745            let result = connection
 746                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
 747                .await;
 748
 749            if result.is_err() {
 750                state.borrow_mut().current_mode_id = old_mode_id;
 751            }
 752
 753            result?;
 754
 755            Ok(())
 756        })
 757    }
 758}
 759
 760struct AcpModelSelector {
 761    session_id: acp::SessionId,
 762    connection: Rc<acp::ClientSideConnection>,
 763    state: Rc<RefCell<acp::SessionModelState>>,
 764}
 765
 766impl AcpModelSelector {
 767    fn new(
 768        session_id: acp::SessionId,
 769        connection: Rc<acp::ClientSideConnection>,
 770        state: Rc<RefCell<acp::SessionModelState>>,
 771    ) -> Self {
 772        Self {
 773            session_id,
 774            connection,
 775            state,
 776        }
 777    }
 778}
 779
 780impl acp_thread::AgentModelSelector for AcpModelSelector {
 781    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
 782        Task::ready(Ok(acp_thread::AgentModelList::Flat(
 783            self.state
 784                .borrow()
 785                .available_models
 786                .clone()
 787                .into_iter()
 788                .map(acp_thread::AgentModelInfo::from)
 789                .collect(),
 790        )))
 791    }
 792
 793    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
 794        let connection = self.connection.clone();
 795        let session_id = self.session_id.clone();
 796        let old_model_id;
 797        {
 798            let mut state = self.state.borrow_mut();
 799            old_model_id = state.current_model_id.clone();
 800            state.current_model_id = model_id.clone();
 801        };
 802        let state = self.state.clone();
 803        cx.foreground_executor().spawn(async move {
 804            let result = connection
 805                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
 806                .await;
 807
 808            if result.is_err() {
 809                state.borrow_mut().current_model_id = old_model_id;
 810            }
 811
 812            result?;
 813
 814            Ok(())
 815        })
 816    }
 817
 818    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
 819        let state = self.state.borrow();
 820        Task::ready(
 821            state
 822                .available_models
 823                .iter()
 824                .find(|m| m.model_id == state.current_model_id)
 825                .cloned()
 826                .map(acp_thread::AgentModelInfo::from)
 827                .ok_or_else(|| anyhow::anyhow!("Model not found")),
 828        )
 829    }
 830}
 831
 832struct AcpSessionConfigOptions {
 833    session_id: acp::SessionId,
 834    connection: Rc<acp::ClientSideConnection>,
 835    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 836    watch_tx: Rc<RefCell<watch::Sender<()>>>,
 837    watch_rx: watch::Receiver<()>,
 838}
 839
 840impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
 841    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
 842        self.state.borrow().clone()
 843    }
 844
 845    fn set_config_option(
 846        &self,
 847        config_id: acp::SessionConfigId,
 848        value: acp::SessionConfigValueId,
 849        cx: &mut App,
 850    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
 851        let connection = self.connection.clone();
 852        let session_id = self.session_id.clone();
 853        let state = self.state.clone();
 854
 855        let watch_tx = self.watch_tx.clone();
 856
 857        cx.foreground_executor().spawn(async move {
 858            let response = connection
 859                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 860                    session_id, config_id, value,
 861                ))
 862                .await?;
 863
 864            *state.borrow_mut() = response.config_options.clone();
 865            watch_tx.borrow_mut().send(()).ok();
 866            Ok(response.config_options)
 867        })
 868    }
 869
 870    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
 871        Some(self.watch_rx.clone())
 872    }
 873}
 874
 875struct ClientDelegate {
 876    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 877    cx: AsyncApp,
 878}
 879
 880#[async_trait::async_trait(?Send)]
 881impl acp::Client for ClientDelegate {
 882    async fn request_permission(
 883        &self,
 884        arguments: acp::RequestPermissionRequest,
 885    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
 886        let respect_always_allow_setting;
 887        let thread;
 888        {
 889            let sessions_ref = self.sessions.borrow();
 890            let session = sessions_ref
 891                .get(&arguments.session_id)
 892                .context("Failed to get session")?;
 893            respect_always_allow_setting = session.session_modes.is_none();
 894            thread = session.thread.clone();
 895        }
 896
 897        let cx = &mut self.cx.clone();
 898
 899        let task = thread.update(cx, |thread, cx| {
 900            thread.request_tool_call_authorization(
 901                arguments.tool_call,
 902                arguments.options,
 903                respect_always_allow_setting,
 904                cx,
 905            )
 906        })??;
 907
 908        let outcome = task.await;
 909
 910        Ok(acp::RequestPermissionResponse::new(outcome))
 911    }
 912
 913    async fn write_text_file(
 914        &self,
 915        arguments: acp::WriteTextFileRequest,
 916    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
 917        let cx = &mut self.cx.clone();
 918        let task = self
 919            .session_thread(&arguments.session_id)?
 920            .update(cx, |thread, cx| {
 921                thread.write_text_file(arguments.path, arguments.content, cx)
 922            })?;
 923
 924        task.await?;
 925
 926        Ok(Default::default())
 927    }
 928
 929    async fn read_text_file(
 930        &self,
 931        arguments: acp::ReadTextFileRequest,
 932    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
 933        let task = self.session_thread(&arguments.session_id)?.update(
 934            &mut self.cx.clone(),
 935            |thread, cx| {
 936                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
 937            },
 938        )?;
 939
 940        let content = task.await?;
 941
 942        Ok(acp::ReadTextFileResponse::new(content))
 943    }
 944
 945    async fn session_notification(
 946        &self,
 947        notification: acp::SessionNotification,
 948    ) -> Result<(), acp::Error> {
 949        let sessions = self.sessions.borrow();
 950        let session = sessions
 951            .get(&notification.session_id)
 952            .context("Failed to get session")?;
 953
 954        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
 955            current_mode_id,
 956            ..
 957        }) = &notification.update
 958        {
 959            if let Some(session_modes) = &session.session_modes {
 960                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
 961            } else {
 962                log::error!(
 963                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
 964                );
 965            }
 966        }
 967
 968        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
 969            config_options,
 970            ..
 971        }) = &notification.update
 972        {
 973            if let Some(opts) = &session.config_options {
 974                *opts.config_options.borrow_mut() = config_options.clone();
 975                opts.tx.borrow_mut().send(()).ok();
 976            } else {
 977                log::error!(
 978                    "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
 979                );
 980            }
 981        }
 982
 983        // Clone so we can inspect meta both before and after handing off to the thread
 984        let update_clone = notification.update.clone();
 985
 986        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
 987        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
 988            if let Some(meta) = &tc.meta {
 989                if let Some(terminal_info) = meta.get("terminal_info") {
 990                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
 991                    {
 992                        let terminal_id = acp::TerminalId::new(id_str);
 993                        let cwd = terminal_info
 994                            .get("cwd")
 995                            .and_then(|v| v.as_str().map(PathBuf::from));
 996
 997                        // Create a minimal display-only lower-level terminal and register it.
 998                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
 999                            let builder = TerminalBuilder::new_display_only(
1000                                CursorShape::default(),
1001                                AlternateScroll::On,
1002                                None,
1003                                0,
1004                            )?;
1005                            let lower = cx.new(|cx| builder.subscribe(cx));
1006                            thread.on_terminal_provider_event(
1007                                TerminalProviderEvent::Created {
1008                                    terminal_id,
1009                                    label: tc.title.clone(),
1010                                    cwd,
1011                                    output_byte_limit: None,
1012                                    terminal: lower,
1013                                },
1014                                cx,
1015                            );
1016                            anyhow::Ok(())
1017                        });
1018                    }
1019                }
1020            }
1021        }
1022
1023        // Forward the update to the acp_thread as usual.
1024        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1025            thread.handle_session_update(notification.update.clone(), cx)
1026        })??;
1027
1028        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1029        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1030            if let Some(meta) = &tcu.meta {
1031                if let Some(term_out) = meta.get("terminal_output") {
1032                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1033                        let terminal_id = acp::TerminalId::new(id_str);
1034                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1035                            let data = s.as_bytes().to_vec();
1036                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1037                                thread.on_terminal_provider_event(
1038                                    TerminalProviderEvent::Output { terminal_id, data },
1039                                    cx,
1040                                );
1041                            });
1042                        }
1043                    }
1044                }
1045
1046                // terminal_exit
1047                if let Some(term_exit) = meta.get("terminal_exit") {
1048                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1049                        let terminal_id = acp::TerminalId::new(id_str);
1050                        let status = acp::TerminalExitStatus::new()
1051                            .exit_code(
1052                                term_exit
1053                                    .get("exit_code")
1054                                    .and_then(|v| v.as_u64())
1055                                    .map(|i| i as u32),
1056                            )
1057                            .signal(
1058                                term_exit
1059                                    .get("signal")
1060                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1061                            );
1062
1063                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1064                            thread.on_terminal_provider_event(
1065                                TerminalProviderEvent::Exit {
1066                                    terminal_id,
1067                                    status,
1068                                },
1069                                cx,
1070                            );
1071                        });
1072                    }
1073                }
1074            }
1075        }
1076
1077        Ok(())
1078    }
1079
1080    async fn create_terminal(
1081        &self,
1082        args: acp::CreateTerminalRequest,
1083    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1084        let thread = self.session_thread(&args.session_id)?;
1085        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1086
1087        let terminal_entity = acp_thread::create_terminal_entity(
1088            args.command.clone(),
1089            &args.args,
1090            args.env
1091                .into_iter()
1092                .map(|env| (env.name, env.value))
1093                .collect(),
1094            args.cwd.clone(),
1095            &project,
1096            &mut self.cx.clone(),
1097        )
1098        .await?;
1099
1100        // Register with renderer
1101        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1102            thread.register_terminal_created(
1103                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1104                format!("{} {}", args.command, args.args.join(" ")),
1105                args.cwd.clone(),
1106                args.output_byte_limit,
1107                terminal_entity,
1108                cx,
1109            )
1110        })?;
1111        let terminal_id =
1112            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
1113        Ok(acp::CreateTerminalResponse::new(terminal_id))
1114    }
1115
1116    async fn kill_terminal_command(
1117        &self,
1118        args: acp::KillTerminalCommandRequest,
1119    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1120        self.session_thread(&args.session_id)?
1121            .update(&mut self.cx.clone(), |thread, cx| {
1122                thread.kill_terminal(args.terminal_id, cx)
1123            })??;
1124
1125        Ok(Default::default())
1126    }
1127
1128    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1129        Err(acp::Error::method_not_found())
1130    }
1131
1132    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1133        Err(acp::Error::method_not_found())
1134    }
1135
1136    async fn release_terminal(
1137        &self,
1138        args: acp::ReleaseTerminalRequest,
1139    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1140        self.session_thread(&args.session_id)?
1141            .update(&mut self.cx.clone(), |thread, cx| {
1142                thread.release_terminal(args.terminal_id, cx)
1143            })??;
1144
1145        Ok(Default::default())
1146    }
1147
1148    async fn terminal_output(
1149        &self,
1150        args: acp::TerminalOutputRequest,
1151    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1152        self.session_thread(&args.session_id)?
1153            .read_with(&mut self.cx.clone(), |thread, cx| {
1154                let out = thread
1155                    .terminal(args.terminal_id)?
1156                    .read(cx)
1157                    .current_output(cx);
1158
1159                Ok(out)
1160            })?
1161    }
1162
1163    async fn wait_for_terminal_exit(
1164        &self,
1165        args: acp::WaitForTerminalExitRequest,
1166    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1167        let exit_status = self
1168            .session_thread(&args.session_id)?
1169            .update(&mut self.cx.clone(), |thread, cx| {
1170                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1171            })??
1172            .await;
1173
1174        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1175    }
1176}
1177
1178impl ClientDelegate {
1179    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1180        let sessions = self.sessions.borrow();
1181        sessions
1182            .get(session_id)
1183            .context("Failed to get session")
1184            .map(|session| session.thread.clone())
1185    }
1186}