acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::io::BufReader;
  13use project::Project;
  14use project::agent_server_store::AgentServerCommand;
  15use serde::Deserialize;
  16use settings::Settings as _;
  17use task::ShellBuilder;
  18use util::ResultExt as _;
  19use util::process::Child;
  20
  21use std::path::PathBuf;
  22use std::process::Stdio;
  23use std::{any::Any, cell::RefCell};
  24use std::{path::Path, rc::Rc};
  25use thiserror::Error;
  26
  27use anyhow::{Context as _, Result};
  28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  29
  30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  31use terminal::TerminalBuilder;
  32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  33
  34#[derive(Debug, Error)]
  35#[error("Unsupported version")]
  36pub struct UnsupportedVersion;
  37
  38pub struct AcpConnection {
  39    server_name: SharedString,
  40    telemetry_id: SharedString,
  41    connection: Rc<acp::ClientSideConnection>,
  42    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  43    auth_methods: Vec<acp::AuthMethod>,
  44    agent_capabilities: acp::AgentCapabilities,
  45    default_mode: Option<acp::SessionModeId>,
  46    default_model: Option<acp::ModelId>,
  47    default_config_options: HashMap<String, String>,
  48    root_dir: PathBuf,
  49    child: Child,
  50    session_list: Option<Rc<AcpSessionList>>,
  51    _io_task: Task<Result<(), acp::Error>>,
  52    _wait_task: Task<Result<()>>,
  53    _stderr_task: Task<Result<()>>,
  54}
  55
  56struct ConfigOptions {
  57    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  58    tx: Rc<RefCell<watch::Sender<()>>>,
  59    rx: watch::Receiver<()>,
  60}
  61
  62impl ConfigOptions {
  63    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  64        let (tx, rx) = watch::channel(());
  65        Self {
  66            config_options,
  67            tx: Rc::new(RefCell::new(tx)),
  68            rx,
  69        }
  70    }
  71}
  72
  73pub struct AcpSession {
  74    thread: WeakEntity<AcpThread>,
  75    suppress_abort_err: bool,
  76    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  77    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  78    config_options: Option<ConfigOptions>,
  79}
  80
  81pub struct AcpSessionList {
  82    connection: Rc<acp::ClientSideConnection>,
  83    updates_tx: Rc<RefCell<watch::Sender<()>>>,
  84    updates_rx: watch::Receiver<()>,
  85}
  86
  87impl AcpSessionList {
  88    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
  89        let (tx, rx) = watch::channel(());
  90        Self {
  91            connection,
  92            updates_tx: Rc::new(RefCell::new(tx)),
  93            updates_rx: rx,
  94        }
  95    }
  96
  97    fn notify_update(&self) {
  98        self.updates_tx.borrow_mut().send(()).ok();
  99    }
 100}
 101
 102impl AgentSessionList for AcpSessionList {
 103    fn list_sessions(
 104        &self,
 105        request: AgentSessionListRequest,
 106        cx: &mut App,
 107    ) -> Task<Result<AgentSessionListResponse>> {
 108        let conn = self.connection.clone();
 109        cx.foreground_executor().spawn(async move {
 110            let acp_request = acp::ListSessionsRequest::new()
 111                .cwd(request.cwd)
 112                .cursor(request.cursor);
 113            let response = conn.list_sessions(acp_request).await?;
 114            Ok(AgentSessionListResponse {
 115                sessions: response
 116                    .sessions
 117                    .into_iter()
 118                    .map(|s| AgentSessionInfo {
 119                        session_id: s.session_id,
 120                        cwd: Some(s.cwd),
 121                        title: s.title.map(Into::into),
 122                        updated_at: s.updated_at.and_then(|date_str| {
 123                            chrono::DateTime::parse_from_rfc3339(&date_str)
 124                                .ok()
 125                                .map(|dt| dt.with_timezone(&chrono::Utc))
 126                        }),
 127                        meta: s.meta,
 128                    })
 129                    .collect(),
 130                next_cursor: response.next_cursor,
 131                meta: response.meta,
 132            })
 133        })
 134    }
 135
 136    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
 137        Some(self.updates_rx.clone())
 138    }
 139
 140    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 141        self
 142    }
 143}
 144
 145pub async fn connect(
 146    server_name: SharedString,
 147    command: AgentServerCommand,
 148    root_dir: &Path,
 149    default_mode: Option<acp::SessionModeId>,
 150    default_model: Option<acp::ModelId>,
 151    default_config_options: HashMap<String, String>,
 152    is_remote: bool,
 153    cx: &mut AsyncApp,
 154) -> Result<Rc<dyn AgentConnection>> {
 155    let conn = AcpConnection::stdio(
 156        server_name,
 157        command.clone(),
 158        root_dir,
 159        default_mode,
 160        default_model,
 161        default_config_options,
 162        is_remote,
 163        cx,
 164    )
 165    .await?;
 166    Ok(Rc::new(conn) as _)
 167}
 168
 169const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 170
 171impl AcpConnection {
 172    pub async fn stdio(
 173        server_name: SharedString,
 174        command: AgentServerCommand,
 175        root_dir: &Path,
 176        default_mode: Option<acp::SessionModeId>,
 177        default_model: Option<acp::ModelId>,
 178        default_config_options: HashMap<String, String>,
 179        is_remote: bool,
 180        cx: &mut AsyncApp,
 181    ) -> Result<Self> {
 182        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 183        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 184        let mut child =
 185            builder.build_std_command(Some(command.path.display().to_string()), &command.args);
 186        child.envs(command.env.iter().flatten());
 187        if !is_remote {
 188            child.current_dir(root_dir);
 189        }
 190        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 191
 192        let stdout = child.stdout.take().context("Failed to take stdout")?;
 193        let stdin = child.stdin.take().context("Failed to take stdin")?;
 194        let stderr = child.stderr.take().context("Failed to take stderr")?;
 195        log::debug!(
 196            "Spawning external agent server: {:?}, {:?}",
 197            command.path,
 198            command.args
 199        );
 200        log::trace!("Spawned (pid: {})", child.id());
 201
 202        let sessions = Rc::new(RefCell::new(HashMap::default()));
 203
 204        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 205            (
 206                release_channel::ReleaseChannel::try_global(cx)
 207                    .map(|release_channel| release_channel.display_name()),
 208                release_channel::AppVersion::global(cx).to_string(),
 209            )
 210        });
 211
 212        let client = ClientDelegate {
 213            sessions: sessions.clone(),
 214            cx: cx.clone(),
 215        };
 216        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 217            let foreground_executor = cx.foreground_executor().clone();
 218            move |fut| {
 219                foreground_executor.spawn(fut).detach();
 220            }
 221        });
 222
 223        let io_task = cx.background_spawn(io_task);
 224
 225        let stderr_task = cx.background_spawn(async move {
 226            let mut stderr = BufReader::new(stderr);
 227            let mut line = String::new();
 228            while let Ok(n) = stderr.read_line(&mut line).await
 229                && n > 0
 230            {
 231                log::warn!("agent stderr: {}", line.trim());
 232                line.clear();
 233            }
 234            Ok(())
 235        });
 236
 237        let wait_task = cx.spawn({
 238            let sessions = sessions.clone();
 239            let status_fut = child.status();
 240            async move |cx| {
 241                let status = status_fut.await?;
 242
 243                for session in sessions.borrow().values() {
 244                    session
 245                        .thread
 246                        .update(cx, |thread, cx| {
 247                            thread.emit_load_error(LoadError::Exited { status }, cx)
 248                        })
 249                        .ok();
 250                }
 251
 252                anyhow::Ok(())
 253            }
 254        });
 255
 256        let connection = Rc::new(connection);
 257
 258        cx.update(|cx| {
 259            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 260                registry.set_active_connection(server_name.clone(), &connection, cx)
 261            });
 262        });
 263
 264        let response = connection
 265            .initialize(
 266                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 267                    .client_capabilities(
 268                        acp::ClientCapabilities::new()
 269                            .fs(acp::FileSystemCapability::new()
 270                                .read_text_file(true)
 271                                .write_text_file(true))
 272                            .terminal(true)
 273                            // Experimental: Allow for rendering terminal output from the agents
 274                            .meta(acp::Meta::from_iter([
 275                                ("terminal_output".into(), true.into()),
 276                                ("terminal-auth".into(), true.into()),
 277                            ])),
 278                    )
 279                    .client_info(
 280                        acp::Implementation::new("zed", version)
 281                            .title(release_channel.map(ToOwned::to_owned)),
 282                    ),
 283            )
 284            .await?;
 285
 286        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 287            return Err(UnsupportedVersion.into());
 288        }
 289
 290        let telemetry_id = response
 291            .agent_info
 292            // Use the one the agent provides if we have one
 293            .map(|info| info.name.into())
 294            // Otherwise, just use the name
 295            .unwrap_or_else(|| server_name.clone());
 296
 297        let session_list = if response
 298            .agent_capabilities
 299            .session_capabilities
 300            .list
 301            .is_some()
 302        {
 303            Some(Rc::new(AcpSessionList::new(connection.clone())))
 304        } else {
 305            None
 306        };
 307
 308        Ok(Self {
 309            auth_methods: response.auth_methods,
 310            root_dir: root_dir.to_owned(),
 311            connection,
 312            server_name,
 313            telemetry_id,
 314            sessions,
 315            agent_capabilities: response.agent_capabilities,
 316            default_mode,
 317            default_model,
 318            default_config_options,
 319            session_list,
 320            _io_task: io_task,
 321            _wait_task: wait_task,
 322            _stderr_task: stderr_task,
 323            child,
 324        })
 325    }
 326
 327    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 328        &self.agent_capabilities.prompt_capabilities
 329    }
 330
 331    pub fn root_dir(&self) -> &Path {
 332        &self.root_dir
 333    }
 334}
 335
 336impl Drop for AcpConnection {
 337    fn drop(&mut self) {
 338        self.child.kill().log_err();
 339    }
 340}
 341
 342impl AgentConnection for AcpConnection {
 343    fn telemetry_id(&self) -> SharedString {
 344        self.telemetry_id.clone()
 345    }
 346
 347    fn new_thread(
 348        self: Rc<Self>,
 349        project: Entity<Project>,
 350        cwd: &Path,
 351        cx: &mut App,
 352    ) -> Task<Result<Entity<AcpThread>>> {
 353        let name = self.server_name.clone();
 354        let conn = self.connection.clone();
 355        let sessions = self.sessions.clone();
 356        let default_mode = self.default_mode.clone();
 357        let default_model = self.default_model.clone();
 358        let default_config_options = self.default_config_options.clone();
 359        let cwd = cwd.to_path_buf();
 360        let context_server_store = project.read(cx).context_server_store().read(cx);
 361        let is_local = project.read(cx).is_local();
 362        let mcp_servers = context_server_store
 363            .configured_server_ids()
 364            .iter()
 365            .filter_map(|id| {
 366                let configuration = context_server_store.configuration_for_server(id)?;
 367                match &*configuration {
 368                    project::context_server_store::ContextServerConfiguration::Custom {
 369                        command,
 370                        remote,
 371                        ..
 372                    }
 373                    | project::context_server_store::ContextServerConfiguration::Extension {
 374                        command,
 375                        remote,
 376                        ..
 377                    } if is_local || *remote => Some(acp::McpServer::Stdio(
 378                        acp::McpServerStdio::new(id.0.to_string(), &command.path)
 379                            .args(command.args.clone())
 380                            .env(if let Some(env) = command.env.as_ref() {
 381                                env.iter()
 382                                    .map(|(name, value)| acp::EnvVariable::new(name, value))
 383                                    .collect()
 384                            } else {
 385                                vec![]
 386                            }),
 387                    )),
 388                    project::context_server_store::ContextServerConfiguration::Http {
 389                        url,
 390                        headers,
 391                        timeout: _,
 392                    } => Some(acp::McpServer::Http(
 393                        acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
 394                            headers
 395                                .iter()
 396                                .map(|(name, value)| acp::HttpHeader::new(name, value))
 397                                .collect(),
 398                        ),
 399                    )),
 400                    _ => None,
 401                }
 402            })
 403            .collect();
 404
 405        cx.spawn(async move |cx| {
 406            let response = conn
 407                .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
 408                .await
 409                .map_err(|err| {
 410                    if err.code == acp::ErrorCode::AuthRequired {
 411                        let mut error = AuthRequired::new();
 412
 413                        if err.message != acp::ErrorCode::AuthRequired.to_string() {
 414                            error = error.with_description(err.message);
 415                        }
 416
 417                        anyhow!(error)
 418                    } else {
 419                        anyhow!(err)
 420                    }
 421                })?;
 422
 423            let use_config_options = cx.update(|cx| cx.has_flag::<AcpBetaFeatureFlag>());
 424
 425            // Config options take precedence over legacy modes/models
 426            let (modes, models, config_options) = if use_config_options && let Some(opts) = response.config_options {
 427                (
 428                    None,
 429                    None,
 430                    Some(Rc::new(RefCell::new(opts))),
 431                )
 432            } else {
 433                // Fall back to legacy modes/models
 434                let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
 435                let models = response.models.map(|models| Rc::new(RefCell::new(models)));
 436                (modes, models, None)
 437            };
 438
 439            if let Some(default_mode) = default_mode {
 440                if let Some(modes) = modes.as_ref() {
 441                    let mut modes_ref = modes.borrow_mut();
 442                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 443
 444                    if has_mode {
 445                        let initial_mode_id = modes_ref.current_mode_id.clone();
 446
 447                        cx.spawn({
 448                            let default_mode = default_mode.clone();
 449                            let session_id = response.session_id.clone();
 450                            let modes = modes.clone();
 451                            let conn = conn.clone();
 452                            async move |_| {
 453                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 454                                .await.log_err();
 455
 456                                if result.is_none() {
 457                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 458                                }
 459                            }
 460                        }).detach();
 461
 462                        modes_ref.current_mode_id = default_mode;
 463                    } else {
 464                        let available_modes = modes_ref
 465                            .available_modes
 466                            .iter()
 467                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 468                            .collect::<Vec<_>>()
 469                            .join("\n");
 470
 471                        log::warn!(
 472                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 473                        );
 474                    }
 475                } else {
 476                    log::warn!(
 477                        "`{name}` does not support modes, but `default_mode` was set in settings.",
 478                    );
 479                }
 480            }
 481
 482            if let Some(default_model) = default_model {
 483                if let Some(models) = models.as_ref() {
 484                    let mut models_ref = models.borrow_mut();
 485                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 486
 487                    if has_model {
 488                        let initial_model_id = models_ref.current_model_id.clone();
 489
 490                        cx.spawn({
 491                            let default_model = default_model.clone();
 492                            let session_id = response.session_id.clone();
 493                            let models = models.clone();
 494                            let conn = conn.clone();
 495                            async move |_| {
 496                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 497                                .await.log_err();
 498
 499                                if result.is_none() {
 500                                    models.borrow_mut().current_model_id = initial_model_id;
 501                                }
 502                            }
 503                        }).detach();
 504
 505                        models_ref.current_model_id = default_model;
 506                    } else {
 507                        let available_models = models_ref
 508                            .available_models
 509                            .iter()
 510                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 511                            .collect::<Vec<_>>()
 512                            .join("\n");
 513
 514                        log::warn!(
 515                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 516                        );
 517                    }
 518                } else {
 519                    log::warn!(
 520                        "`{name}` does not support model selection, but `default_model` was set in settings.",
 521                    );
 522                }
 523            }
 524
 525            if let Some(config_opts) = config_options.as_ref() {
 526                let defaults_to_apply: Vec<_> = {
 527                    let config_opts_ref = config_opts.borrow();
 528                    config_opts_ref
 529                        .iter()
 530                        .filter_map(|config_option| {
 531                            let default_value = default_config_options.get(&*config_option.id.0)?;
 532
 533                            let is_valid = match &config_option.kind {
 534                                acp::SessionConfigKind::Select(select) => match &select.options {
 535                                    acp::SessionConfigSelectOptions::Ungrouped(options) => {
 536                                        options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
 537                                    }
 538                                    acp::SessionConfigSelectOptions::Grouped(groups) => groups
 539                                        .iter()
 540                                        .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
 541                                    _ => false,
 542                                },
 543                                _ => false,
 544                            };
 545
 546                            if is_valid {
 547                                let initial_value = match &config_option.kind {
 548                                    acp::SessionConfigKind::Select(select) => {
 549                                        Some(select.current_value.clone())
 550                                    }
 551                                    _ => None,
 552                                };
 553                                Some((config_option.id.clone(), default_value.clone(), initial_value))
 554                            } else {
 555                                log::warn!(
 556                                    "`{}` is not a valid value for config option `{}` in {}",
 557                                    default_value,
 558                                    config_option.id.0,
 559                                    name
 560                                );
 561                                None
 562                            }
 563                        })
 564                        .collect()
 565                };
 566
 567                for (config_id, default_value, initial_value) in defaults_to_apply {
 568                    cx.spawn({
 569                        let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 570                        let session_id = response.session_id.clone();
 571                        let config_id_clone = config_id.clone();
 572                        let config_opts = config_opts.clone();
 573                        let conn = conn.clone();
 574                        async move |_| {
 575                            let result = conn
 576                                .set_session_config_option(
 577                                    acp::SetSessionConfigOptionRequest::new(
 578                                        session_id,
 579                                        config_id_clone.clone(),
 580                                        default_value_id,
 581                                    ),
 582                                )
 583                                .await
 584                                .log_err();
 585
 586                            if result.is_none() {
 587                                if let Some(initial) = initial_value {
 588                                    let mut opts = config_opts.borrow_mut();
 589                                    if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 590                                        if let acp::SessionConfigKind::Select(select) =
 591                                            &mut opt.kind
 592                                        {
 593                                            select.current_value = initial;
 594                                        }
 595                                    }
 596                                }
 597                            }
 598                        }
 599                    })
 600                    .detach();
 601
 602                    let mut opts = config_opts.borrow_mut();
 603                    if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 604                        if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 605                            select.current_value = acp::SessionConfigValueId::new(default_value);
 606                        }
 607                    }
 608                }
 609            }
 610
 611            let session_id = response.session_id;
 612            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 613            let thread: Entity<AcpThread> = cx.new(|cx| {
 614                AcpThread::new(
 615                    self.server_name.clone(),
 616                    self.clone(),
 617                    project,
 618                    action_log,
 619                    session_id.clone(),
 620                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 621                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 622                    cx,
 623                )
 624            });
 625
 626
 627            let session = AcpSession {
 628                thread: thread.downgrade(),
 629                suppress_abort_err: false,
 630                session_modes: modes,
 631                models,
 632                config_options: config_options.map(|opts| ConfigOptions::new(opts))
 633            };
 634            sessions.borrow_mut().insert(session_id, session);
 635
 636            if let Some(session_list) = &self.session_list {
 637                session_list.notify_update();
 638            }
 639
 640            Ok(thread)
 641        })
 642    }
 643
 644    fn auth_methods(&self) -> &[acp::AuthMethod] {
 645        &self.auth_methods
 646    }
 647
 648    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 649        let conn = self.connection.clone();
 650        cx.foreground_executor().spawn(async move {
 651            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 652                .await?;
 653            Ok(())
 654        })
 655    }
 656
 657    fn prompt(
 658        &self,
 659        _id: Option<acp_thread::UserMessageId>,
 660        params: acp::PromptRequest,
 661        cx: &mut App,
 662    ) -> Task<Result<acp::PromptResponse>> {
 663        let conn = self.connection.clone();
 664        let sessions = self.sessions.clone();
 665        let session_id = params.session_id.clone();
 666        cx.foreground_executor().spawn(async move {
 667            let result = conn.prompt(params).await;
 668
 669            let mut suppress_abort_err = false;
 670
 671            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 672                suppress_abort_err = session.suppress_abort_err;
 673                session.suppress_abort_err = false;
 674            }
 675
 676            match result {
 677                Ok(response) => Ok(response),
 678                Err(err) => {
 679                    if err.code == acp::ErrorCode::AuthRequired {
 680                        return Err(anyhow!(acp::Error::auth_required()));
 681                    }
 682
 683                    if err.code != ErrorCode::InternalError {
 684                        anyhow::bail!(err)
 685                    }
 686
 687                    let Some(data) = &err.data else {
 688                        anyhow::bail!(err)
 689                    };
 690
 691                    // Temporary workaround until the following PR is generally available:
 692                    // https://github.com/google-gemini/gemini-cli/pull/6656
 693
 694                    #[derive(Deserialize)]
 695                    #[serde(deny_unknown_fields)]
 696                    struct ErrorDetails {
 697                        details: Box<str>,
 698                    }
 699
 700                    match serde_json::from_value(data.clone()) {
 701                        Ok(ErrorDetails { details }) => {
 702                            if suppress_abort_err
 703                                && (details.contains("This operation was aborted")
 704                                    || details.contains("The user aborted a request"))
 705                            {
 706                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
 707                            } else {
 708                                Err(anyhow!(details))
 709                            }
 710                        }
 711                        Err(_) => Err(anyhow!(err)),
 712                    }
 713                }
 714            }
 715        })
 716    }
 717
 718    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
 719        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
 720            session.suppress_abort_err = true;
 721        }
 722        let conn = self.connection.clone();
 723        let params = acp::CancelNotification::new(session_id.clone());
 724        cx.foreground_executor()
 725            .spawn(async move { conn.cancel(params).await })
 726            .detach();
 727    }
 728
 729    fn session_modes(
 730        &self,
 731        session_id: &acp::SessionId,
 732        _cx: &App,
 733    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
 734        let sessions = self.sessions.clone();
 735        let sessions_ref = sessions.borrow();
 736        let Some(session) = sessions_ref.get(session_id) else {
 737            return None;
 738        };
 739
 740        if let Some(modes) = session.session_modes.as_ref() {
 741            Some(Rc::new(AcpSessionModes {
 742                connection: self.connection.clone(),
 743                session_id: session_id.clone(),
 744                state: modes.clone(),
 745            }) as _)
 746        } else {
 747            None
 748        }
 749    }
 750
 751    fn model_selector(
 752        &self,
 753        session_id: &acp::SessionId,
 754    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
 755        let sessions = self.sessions.clone();
 756        let sessions_ref = sessions.borrow();
 757        let Some(session) = sessions_ref.get(session_id) else {
 758            return None;
 759        };
 760
 761        if let Some(models) = session.models.as_ref() {
 762            Some(Rc::new(AcpModelSelector::new(
 763                session_id.clone(),
 764                self.connection.clone(),
 765                models.clone(),
 766            )) as _)
 767        } else {
 768            None
 769        }
 770    }
 771
 772    fn session_config_options(
 773        &self,
 774        session_id: &acp::SessionId,
 775        _cx: &App,
 776    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
 777        let sessions = self.sessions.borrow();
 778        let session = sessions.get(session_id)?;
 779
 780        let config_opts = session.config_options.as_ref()?;
 781
 782        Some(Rc::new(AcpSessionConfigOptions {
 783            session_id: session_id.clone(),
 784            connection: self.connection.clone(),
 785            state: config_opts.config_options.clone(),
 786            watch_tx: config_opts.tx.clone(),
 787            watch_rx: config_opts.rx.clone(),
 788        }) as _)
 789    }
 790
 791    fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
 792        if cx.has_flag::<AcpBetaFeatureFlag>() {
 793            self.session_list.clone().map(|s| s as _)
 794        } else {
 795            None
 796        }
 797    }
 798
 799    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 800        self
 801    }
 802}
 803
 804struct AcpSessionModes {
 805    session_id: acp::SessionId,
 806    connection: Rc<acp::ClientSideConnection>,
 807    state: Rc<RefCell<acp::SessionModeState>>,
 808}
 809
 810impl acp_thread::AgentSessionModes for AcpSessionModes {
 811    fn current_mode(&self) -> acp::SessionModeId {
 812        self.state.borrow().current_mode_id.clone()
 813    }
 814
 815    fn all_modes(&self) -> Vec<acp::SessionMode> {
 816        self.state.borrow().available_modes.clone()
 817    }
 818
 819    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
 820        let connection = self.connection.clone();
 821        let session_id = self.session_id.clone();
 822        let old_mode_id;
 823        {
 824            let mut state = self.state.borrow_mut();
 825            old_mode_id = state.current_mode_id.clone();
 826            state.current_mode_id = mode_id.clone();
 827        };
 828        let state = self.state.clone();
 829        cx.foreground_executor().spawn(async move {
 830            let result = connection
 831                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
 832                .await;
 833
 834            if result.is_err() {
 835                state.borrow_mut().current_mode_id = old_mode_id;
 836            }
 837
 838            result?;
 839
 840            Ok(())
 841        })
 842    }
 843}
 844
 845struct AcpModelSelector {
 846    session_id: acp::SessionId,
 847    connection: Rc<acp::ClientSideConnection>,
 848    state: Rc<RefCell<acp::SessionModelState>>,
 849}
 850
 851impl AcpModelSelector {
 852    fn new(
 853        session_id: acp::SessionId,
 854        connection: Rc<acp::ClientSideConnection>,
 855        state: Rc<RefCell<acp::SessionModelState>>,
 856    ) -> Self {
 857        Self {
 858            session_id,
 859            connection,
 860            state,
 861        }
 862    }
 863}
 864
 865impl acp_thread::AgentModelSelector for AcpModelSelector {
 866    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
 867        Task::ready(Ok(acp_thread::AgentModelList::Flat(
 868            self.state
 869                .borrow()
 870                .available_models
 871                .clone()
 872                .into_iter()
 873                .map(acp_thread::AgentModelInfo::from)
 874                .collect(),
 875        )))
 876    }
 877
 878    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
 879        let connection = self.connection.clone();
 880        let session_id = self.session_id.clone();
 881        let old_model_id;
 882        {
 883            let mut state = self.state.borrow_mut();
 884            old_model_id = state.current_model_id.clone();
 885            state.current_model_id = model_id.clone();
 886        };
 887        let state = self.state.clone();
 888        cx.foreground_executor().spawn(async move {
 889            let result = connection
 890                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
 891                .await;
 892
 893            if result.is_err() {
 894                state.borrow_mut().current_model_id = old_model_id;
 895            }
 896
 897            result?;
 898
 899            Ok(())
 900        })
 901    }
 902
 903    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
 904        let state = self.state.borrow();
 905        Task::ready(
 906            state
 907                .available_models
 908                .iter()
 909                .find(|m| m.model_id == state.current_model_id)
 910                .cloned()
 911                .map(acp_thread::AgentModelInfo::from)
 912                .ok_or_else(|| anyhow::anyhow!("Model not found")),
 913        )
 914    }
 915}
 916
 917struct AcpSessionConfigOptions {
 918    session_id: acp::SessionId,
 919    connection: Rc<acp::ClientSideConnection>,
 920    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 921    watch_tx: Rc<RefCell<watch::Sender<()>>>,
 922    watch_rx: watch::Receiver<()>,
 923}
 924
 925impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
 926    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
 927        self.state.borrow().clone()
 928    }
 929
 930    fn set_config_option(
 931        &self,
 932        config_id: acp::SessionConfigId,
 933        value: acp::SessionConfigValueId,
 934        cx: &mut App,
 935    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
 936        let connection = self.connection.clone();
 937        let session_id = self.session_id.clone();
 938        let state = self.state.clone();
 939
 940        let watch_tx = self.watch_tx.clone();
 941
 942        cx.foreground_executor().spawn(async move {
 943            let response = connection
 944                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 945                    session_id, config_id, value,
 946                ))
 947                .await?;
 948
 949            *state.borrow_mut() = response.config_options.clone();
 950            watch_tx.borrow_mut().send(()).ok();
 951            Ok(response.config_options)
 952        })
 953    }
 954
 955    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
 956        Some(self.watch_rx.clone())
 957    }
 958}
 959
 960struct ClientDelegate {
 961    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 962    cx: AsyncApp,
 963}
 964
 965#[async_trait::async_trait(?Send)]
 966impl acp::Client for ClientDelegate {
 967    async fn request_permission(
 968        &self,
 969        arguments: acp::RequestPermissionRequest,
 970    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
 971        let respect_always_allow_setting;
 972        let thread;
 973        {
 974            let sessions_ref = self.sessions.borrow();
 975            let session = sessions_ref
 976                .get(&arguments.session_id)
 977                .context("Failed to get session")?;
 978            respect_always_allow_setting = session.session_modes.is_none();
 979            thread = session.thread.clone();
 980        }
 981
 982        let cx = &mut self.cx.clone();
 983
 984        let task = thread.update(cx, |thread, cx| {
 985            thread.request_tool_call_authorization(
 986                arguments.tool_call,
 987                arguments.options,
 988                respect_always_allow_setting,
 989                cx,
 990            )
 991        })??;
 992
 993        let outcome = task.await;
 994
 995        Ok(acp::RequestPermissionResponse::new(outcome))
 996    }
 997
 998    async fn write_text_file(
 999        &self,
1000        arguments: acp::WriteTextFileRequest,
1001    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1002        let cx = &mut self.cx.clone();
1003        let task = self
1004            .session_thread(&arguments.session_id)?
1005            .update(cx, |thread, cx| {
1006                thread.write_text_file(arguments.path, arguments.content, cx)
1007            })?;
1008
1009        task.await?;
1010
1011        Ok(Default::default())
1012    }
1013
1014    async fn read_text_file(
1015        &self,
1016        arguments: acp::ReadTextFileRequest,
1017    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1018        let task = self.session_thread(&arguments.session_id)?.update(
1019            &mut self.cx.clone(),
1020            |thread, cx| {
1021                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1022            },
1023        )?;
1024
1025        let content = task.await?;
1026
1027        Ok(acp::ReadTextFileResponse::new(content))
1028    }
1029
1030    async fn session_notification(
1031        &self,
1032        notification: acp::SessionNotification,
1033    ) -> Result<(), acp::Error> {
1034        let sessions = self.sessions.borrow();
1035        let session = sessions
1036            .get(&notification.session_id)
1037            .context("Failed to get session")?;
1038
1039        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1040            current_mode_id,
1041            ..
1042        }) = &notification.update
1043        {
1044            if let Some(session_modes) = &session.session_modes {
1045                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1046            } else {
1047                log::error!(
1048                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
1049                );
1050            }
1051        }
1052
1053        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1054            config_options,
1055            ..
1056        }) = &notification.update
1057        {
1058            if let Some(opts) = &session.config_options {
1059                *opts.config_options.borrow_mut() = config_options.clone();
1060                opts.tx.borrow_mut().send(()).ok();
1061            } else {
1062                log::error!(
1063                    "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
1064                );
1065            }
1066        }
1067
1068        // Clone so we can inspect meta both before and after handing off to the thread
1069        let update_clone = notification.update.clone();
1070
1071        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1072        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1073            if let Some(meta) = &tc.meta {
1074                if let Some(terminal_info) = meta.get("terminal_info") {
1075                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1076                    {
1077                        let terminal_id = acp::TerminalId::new(id_str);
1078                        let cwd = terminal_info
1079                            .get("cwd")
1080                            .and_then(|v| v.as_str().map(PathBuf::from));
1081
1082                        // Create a minimal display-only lower-level terminal and register it.
1083                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1084                            let builder = TerminalBuilder::new_display_only(
1085                                CursorShape::default(),
1086                                AlternateScroll::On,
1087                                None,
1088                                0,
1089                            )?;
1090                            let lower = cx.new(|cx| builder.subscribe(cx));
1091                            thread.on_terminal_provider_event(
1092                                TerminalProviderEvent::Created {
1093                                    terminal_id,
1094                                    label: tc.title.clone(),
1095                                    cwd,
1096                                    output_byte_limit: None,
1097                                    terminal: lower,
1098                                },
1099                                cx,
1100                            );
1101                            anyhow::Ok(())
1102                        });
1103                    }
1104                }
1105            }
1106        }
1107
1108        // Forward the update to the acp_thread as usual.
1109        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1110            thread.handle_session_update(notification.update.clone(), cx)
1111        })??;
1112
1113        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1114        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1115            if let Some(meta) = &tcu.meta {
1116                if let Some(term_out) = meta.get("terminal_output") {
1117                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1118                        let terminal_id = acp::TerminalId::new(id_str);
1119                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1120                            let data = s.as_bytes().to_vec();
1121                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1122                                thread.on_terminal_provider_event(
1123                                    TerminalProviderEvent::Output { terminal_id, data },
1124                                    cx,
1125                                );
1126                            });
1127                        }
1128                    }
1129                }
1130
1131                // terminal_exit
1132                if let Some(term_exit) = meta.get("terminal_exit") {
1133                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1134                        let terminal_id = acp::TerminalId::new(id_str);
1135                        let status = acp::TerminalExitStatus::new()
1136                            .exit_code(
1137                                term_exit
1138                                    .get("exit_code")
1139                                    .and_then(|v| v.as_u64())
1140                                    .map(|i| i as u32),
1141                            )
1142                            .signal(
1143                                term_exit
1144                                    .get("signal")
1145                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1146                            );
1147
1148                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1149                            thread.on_terminal_provider_event(
1150                                TerminalProviderEvent::Exit {
1151                                    terminal_id,
1152                                    status,
1153                                },
1154                                cx,
1155                            );
1156                        });
1157                    }
1158                }
1159            }
1160        }
1161
1162        Ok(())
1163    }
1164
1165    async fn create_terminal(
1166        &self,
1167        args: acp::CreateTerminalRequest,
1168    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1169        let thread = self.session_thread(&args.session_id)?;
1170        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1171
1172        let terminal_entity = acp_thread::create_terminal_entity(
1173            args.command.clone(),
1174            &args.args,
1175            args.env
1176                .into_iter()
1177                .map(|env| (env.name, env.value))
1178                .collect(),
1179            args.cwd.clone(),
1180            &project,
1181            &mut self.cx.clone(),
1182        )
1183        .await?;
1184
1185        // Register with renderer
1186        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1187            thread.register_terminal_created(
1188                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1189                format!("{} {}", args.command, args.args.join(" ")),
1190                args.cwd.clone(),
1191                args.output_byte_limit,
1192                terminal_entity,
1193                cx,
1194            )
1195        })?;
1196        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1197        Ok(acp::CreateTerminalResponse::new(terminal_id))
1198    }
1199
1200    async fn kill_terminal_command(
1201        &self,
1202        args: acp::KillTerminalCommandRequest,
1203    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1204        self.session_thread(&args.session_id)?
1205            .update(&mut self.cx.clone(), |thread, cx| {
1206                thread.kill_terminal(args.terminal_id, cx)
1207            })??;
1208
1209        Ok(Default::default())
1210    }
1211
1212    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1213        Err(acp::Error::method_not_found())
1214    }
1215
1216    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1217        Err(acp::Error::method_not_found())
1218    }
1219
1220    async fn release_terminal(
1221        &self,
1222        args: acp::ReleaseTerminalRequest,
1223    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1224        self.session_thread(&args.session_id)?
1225            .update(&mut self.cx.clone(), |thread, cx| {
1226                thread.release_terminal(args.terminal_id, cx)
1227            })??;
1228
1229        Ok(Default::default())
1230    }
1231
1232    async fn terminal_output(
1233        &self,
1234        args: acp::TerminalOutputRequest,
1235    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1236        self.session_thread(&args.session_id)?
1237            .read_with(&mut self.cx.clone(), |thread, cx| {
1238                let out = thread
1239                    .terminal(args.terminal_id)?
1240                    .read(cx)
1241                    .current_output(cx);
1242
1243                Ok(out)
1244            })?
1245    }
1246
1247    async fn wait_for_terminal_exit(
1248        &self,
1249        args: acp::WaitForTerminalExitRequest,
1250    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1251        let exit_status = self
1252            .session_thread(&args.session_id)?
1253            .update(&mut self.cx.clone(), |thread, cx| {
1254                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1255            })??
1256            .await;
1257
1258        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1259    }
1260}
1261
1262impl ClientDelegate {
1263    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1264        let sessions = self.sessions.borrow();
1265        sessions
1266            .get(session_id)
1267            .context("Failed to get session")
1268            .map(|session| session.thread.clone())
1269    }
1270}