acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::io::BufReader;
  13use project::agent_server_store::AgentServerCommand;
  14use project::{AgentId, Project};
  15use serde::Deserialize;
  16use settings::Settings as _;
  17use task::{ShellBuilder, SpawnInTerminal};
  18use util::ResultExt as _;
  19use util::path_list::PathList;
  20use util::process::Child;
  21
  22use std::path::PathBuf;
  23use std::process::Stdio;
  24use std::rc::Rc;
  25use std::{any::Any, cell::RefCell};
  26use thiserror::Error;
  27
  28use anyhow::{Context as _, Result};
  29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  30
  31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  32use terminal::TerminalBuilder;
  33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  34
  35use crate::GEMINI_ID;
  36
  37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  38
  39#[derive(Debug, Error)]
  40#[error("Unsupported version")]
  41pub struct UnsupportedVersion;
  42
  43pub struct AcpConnection {
  44    id: AgentId,
  45    display_name: SharedString,
  46    telemetry_id: SharedString,
  47    connection: Rc<acp::ClientSideConnection>,
  48    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  49    auth_methods: Vec<acp::AuthMethod>,
  50    command: AgentServerCommand,
  51    agent_capabilities: acp::AgentCapabilities,
  52    default_mode: Option<acp::SessionModeId>,
  53    default_model: Option<acp::ModelId>,
  54    default_config_options: HashMap<String, String>,
  55    child: Child,
  56    session_list: Option<Rc<AcpSessionList>>,
  57    _io_task: Task<Result<(), acp::Error>>,
  58    _wait_task: Task<Result<()>>,
  59    _stderr_task: Task<Result<()>>,
  60}
  61
  62struct ConfigOptions {
  63    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  64    tx: Rc<RefCell<watch::Sender<()>>>,
  65    rx: watch::Receiver<()>,
  66}
  67
  68impl ConfigOptions {
  69    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  70        let (tx, rx) = watch::channel(());
  71        Self {
  72            config_options,
  73            tx: Rc::new(RefCell::new(tx)),
  74            rx,
  75        }
  76    }
  77}
  78
  79pub struct AcpSession {
  80    thread: WeakEntity<AcpThread>,
  81    suppress_abort_err: bool,
  82    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  83    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  84    config_options: Option<ConfigOptions>,
  85}
  86
  87pub struct AcpSessionList {
  88    connection: Rc<acp::ClientSideConnection>,
  89    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
  90    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
  91}
  92
  93impl AcpSessionList {
  94    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
  95        let (tx, rx) = smol::channel::unbounded();
  96        Self {
  97            connection,
  98            updates_tx: tx,
  99            updates_rx: rx,
 100        }
 101    }
 102
 103    fn notify_update(&self) {
 104        self.updates_tx
 105            .try_send(acp_thread::SessionListUpdate::Refresh)
 106            .log_err();
 107    }
 108
 109    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 110        self.updates_tx
 111            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 112            .log_err();
 113    }
 114}
 115
 116impl AgentSessionList for AcpSessionList {
 117    fn list_sessions(
 118        &self,
 119        request: AgentSessionListRequest,
 120        cx: &mut App,
 121    ) -> Task<Result<AgentSessionListResponse>> {
 122        let conn = self.connection.clone();
 123        cx.foreground_executor().spawn(async move {
 124            let acp_request = acp::ListSessionsRequest::new()
 125                .cwd(request.cwd)
 126                .cursor(request.cursor);
 127            let response = conn.list_sessions(acp_request).await?;
 128            Ok(AgentSessionListResponse {
 129                sessions: response
 130                    .sessions
 131                    .into_iter()
 132                    .map(|s| AgentSessionInfo {
 133                        session_id: s.session_id,
 134                        work_dirs: Some(PathList::new(&[s.cwd])),
 135                        title: s.title.map(Into::into),
 136                        updated_at: s.updated_at.and_then(|date_str| {
 137                            chrono::DateTime::parse_from_rfc3339(&date_str)
 138                                .ok()
 139                                .map(|dt| dt.with_timezone(&chrono::Utc))
 140                        }),
 141                        created_at: None,
 142                        meta: s.meta,
 143                    })
 144                    .collect(),
 145                next_cursor: response.next_cursor,
 146                meta: response.meta,
 147            })
 148        })
 149    }
 150
 151    fn watch(
 152        &self,
 153        _cx: &mut App,
 154    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 155        Some(self.updates_rx.clone())
 156    }
 157
 158    fn notify_refresh(&self) {
 159        self.notify_update();
 160    }
 161
 162    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 163        self
 164    }
 165}
 166
 167pub async fn connect(
 168    agent_id: AgentId,
 169    project: Entity<Project>,
 170    display_name: SharedString,
 171    command: AgentServerCommand,
 172    default_mode: Option<acp::SessionModeId>,
 173    default_model: Option<acp::ModelId>,
 174    default_config_options: HashMap<String, String>,
 175    cx: &mut AsyncApp,
 176) -> Result<Rc<dyn AgentConnection>> {
 177    let conn = AcpConnection::stdio(
 178        agent_id,
 179        project,
 180        display_name,
 181        command.clone(),
 182        default_mode,
 183        default_model,
 184        default_config_options,
 185        cx,
 186    )
 187    .await?;
 188    Ok(Rc::new(conn) as _)
 189}
 190
 191const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 192
 193impl AcpConnection {
 194    pub async fn stdio(
 195        agent_id: AgentId,
 196        project: Entity<Project>,
 197        display_name: SharedString,
 198        command: AgentServerCommand,
 199        default_mode: Option<acp::SessionModeId>,
 200        default_model: Option<acp::ModelId>,
 201        default_config_options: HashMap<String, String>,
 202        cx: &mut AsyncApp,
 203    ) -> Result<Self> {
 204        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 205        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 206        let mut child =
 207            builder.build_std_command(Some(command.path.display().to_string()), &command.args);
 208        child.envs(command.env.iter().flatten());
 209        if let Some(cwd) = project.update(cx, |project, cx| {
 210            project
 211                .default_path_list(cx)
 212                .ordered_paths()
 213                .next()
 214                .cloned()
 215        }) {
 216            child.current_dir(cwd);
 217        }
 218        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 219
 220        let stdout = child.stdout.take().context("Failed to take stdout")?;
 221        let stdin = child.stdin.take().context("Failed to take stdin")?;
 222        let stderr = child.stderr.take().context("Failed to take stderr")?;
 223        log::debug!(
 224            "Spawning external agent server: {:?}, {:?}",
 225            command.path,
 226            command.args
 227        );
 228        log::trace!("Spawned (pid: {})", child.id());
 229
 230        let sessions = Rc::new(RefCell::new(HashMap::default()));
 231
 232        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 233            (
 234                release_channel::ReleaseChannel::try_global(cx)
 235                    .map(|release_channel| release_channel.display_name()),
 236                release_channel::AppVersion::global(cx).to_string(),
 237            )
 238        });
 239
 240        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 241            Rc::new(RefCell::new(None));
 242
 243        let client = ClientDelegate {
 244            sessions: sessions.clone(),
 245            session_list: client_session_list.clone(),
 246            cx: cx.clone(),
 247        };
 248        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 249            let foreground_executor = cx.foreground_executor().clone();
 250            move |fut| {
 251                foreground_executor.spawn(fut).detach();
 252            }
 253        });
 254
 255        let io_task = cx.background_spawn(io_task);
 256
 257        let stderr_task = cx.background_spawn(async move {
 258            let mut stderr = BufReader::new(stderr);
 259            let mut line = String::new();
 260            while let Ok(n) = stderr.read_line(&mut line).await
 261                && n > 0
 262            {
 263                log::warn!("agent stderr: {}", line.trim());
 264                line.clear();
 265            }
 266            Ok(())
 267        });
 268
 269        let wait_task = cx.spawn({
 270            let sessions = sessions.clone();
 271            let status_fut = child.status();
 272            async move |cx| {
 273                let status = status_fut.await?;
 274
 275                for session in sessions.borrow().values() {
 276                    session
 277                        .thread
 278                        .update(cx, |thread, cx| {
 279                            thread.emit_load_error(LoadError::Exited { status }, cx)
 280                        })
 281                        .ok();
 282                }
 283
 284                anyhow::Ok(())
 285            }
 286        });
 287
 288        let connection = Rc::new(connection);
 289
 290        cx.update(|cx| {
 291            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 292                registry.set_active_connection(agent_id.clone(), &connection, cx)
 293            });
 294        });
 295
 296        let response = connection
 297            .initialize(
 298                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 299                    .client_capabilities(
 300                        acp::ClientCapabilities::new()
 301                            .fs(acp::FileSystemCapabilities::new()
 302                                .read_text_file(true)
 303                                .write_text_file(true))
 304                            .terminal(true)
 305                            .auth(acp::AuthCapabilities::new().terminal(true))
 306                            // Experimental: Allow for rendering terminal output from the agents
 307                            .meta(acp::Meta::from_iter([
 308                                ("terminal_output".into(), true.into()),
 309                                ("terminal-auth".into(), true.into()),
 310                            ])),
 311                    )
 312                    .client_info(
 313                        acp::Implementation::new("zed", version)
 314                            .title(release_channel.map(ToOwned::to_owned)),
 315                    ),
 316            )
 317            .await?;
 318
 319        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 320            return Err(UnsupportedVersion.into());
 321        }
 322
 323        let telemetry_id = response
 324            .agent_info
 325            // Use the one the agent provides if we have one
 326            .map(|info| info.name.into())
 327            // Otherwise, just use the name
 328            .unwrap_or_else(|| agent_id.0.to_string().into());
 329
 330        let session_list = if response
 331            .agent_capabilities
 332            .session_capabilities
 333            .list
 334            .is_some()
 335        {
 336            let list = Rc::new(AcpSessionList::new(connection.clone()));
 337            *client_session_list.borrow_mut() = Some(list.clone());
 338            Some(list)
 339        } else {
 340            None
 341        };
 342
 343        // TODO: Remove this override once Google team releases their official auth methods
 344        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 345            let mut args = command.args.clone();
 346            args.retain(|a| a != "--experimental-acp" && a != "--acp");
 347            let value = serde_json::json!({
 348                "label": "gemini /auth",
 349                "command": command.path.to_string_lossy().into_owned(),
 350                "args": args,
 351                "env": command.env.clone().unwrap_or_default(),
 352            });
 353            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 354            vec![acp::AuthMethod::Agent(
 355                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 356                    .description("Login with your Google or Vertex AI account")
 357                    .meta(meta),
 358            )]
 359        } else {
 360            response.auth_methods
 361        };
 362        Ok(Self {
 363            id: agent_id,
 364            auth_methods,
 365            command,
 366            connection,
 367            display_name,
 368            telemetry_id,
 369            sessions,
 370            agent_capabilities: response.agent_capabilities,
 371            default_mode,
 372            default_model,
 373            default_config_options,
 374            session_list,
 375            _io_task: io_task,
 376            _wait_task: wait_task,
 377            _stderr_task: stderr_task,
 378            child,
 379        })
 380    }
 381
 382    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 383        &self.agent_capabilities.prompt_capabilities
 384    }
 385
 386    fn apply_default_config_options(
 387        &self,
 388        session_id: &acp::SessionId,
 389        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 390        cx: &mut AsyncApp,
 391    ) {
 392        let id = self.id.clone();
 393        let defaults_to_apply: Vec<_> = {
 394            let config_opts_ref = config_options.borrow();
 395            config_opts_ref
 396                .iter()
 397                .filter_map(|config_option| {
 398                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 399
 400                    let is_valid = match &config_option.kind {
 401                        acp::SessionConfigKind::Select(select) => match &select.options {
 402                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 403                                .iter()
 404                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 405                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 406                                groups.iter().any(|g| {
 407                                    g.options
 408                                        .iter()
 409                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 410                                })
 411                            }
 412                            _ => false,
 413                        },
 414                        _ => false,
 415                    };
 416
 417                    if is_valid {
 418                        let initial_value = match &config_option.kind {
 419                            acp::SessionConfigKind::Select(select) => {
 420                                Some(select.current_value.clone())
 421                            }
 422                            _ => None,
 423                        };
 424                        Some((
 425                            config_option.id.clone(),
 426                            default_value.clone(),
 427                            initial_value,
 428                        ))
 429                    } else {
 430                        log::warn!(
 431                            "`{}` is not a valid value for config option `{}` in {}",
 432                            default_value,
 433                            config_option.id.0,
 434                            id
 435                        );
 436                        None
 437                    }
 438                })
 439                .collect()
 440        };
 441
 442        for (config_id, default_value, initial_value) in defaults_to_apply {
 443            cx.spawn({
 444                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 445                let session_id = session_id.clone();
 446                let config_id_clone = config_id.clone();
 447                let config_opts = config_options.clone();
 448                let conn = self.connection.clone();
 449                async move |_| {
 450                    let result = conn
 451                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 452                            session_id,
 453                            config_id_clone.clone(),
 454                            default_value_id,
 455                        ))
 456                        .await
 457                        .log_err();
 458
 459                    if result.is_none() {
 460                        if let Some(initial) = initial_value {
 461                            let mut opts = config_opts.borrow_mut();
 462                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 463                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 464                                    select.current_value = initial;
 465                                }
 466                            }
 467                        }
 468                    }
 469                }
 470            })
 471            .detach();
 472
 473            let mut opts = config_options.borrow_mut();
 474            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 475                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 476                    select.current_value = acp::SessionConfigValueId::new(default_value);
 477                }
 478            }
 479        }
 480    }
 481}
 482
 483impl Drop for AcpConnection {
 484    fn drop(&mut self) {
 485        self.child.kill().log_err();
 486    }
 487}
 488
 489fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 490    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 491}
 492
 493fn terminal_auth_task(
 494    command: &AgentServerCommand,
 495    agent_id: &AgentId,
 496    method: &acp::AuthMethodTerminal,
 497) -> SpawnInTerminal {
 498    let mut args = command.args.clone();
 499    args.extend(method.args.clone());
 500
 501    let mut env = command.env.clone().unwrap_or_default();
 502    env.extend(method.env.clone());
 503
 504    acp_thread::build_terminal_auth_task(
 505        terminal_auth_task_id(agent_id, &method.id),
 506        method.name.clone(),
 507        command.path.to_string_lossy().into_owned(),
 508        args,
 509        env,
 510    )
 511}
 512
 513/// Used to support the _meta method prior to stabilization
 514fn meta_terminal_auth_task(
 515    agent_id: &AgentId,
 516    method_id: &acp::AuthMethodId,
 517    method: &acp::AuthMethod,
 518) -> Option<SpawnInTerminal> {
 519    #[derive(Deserialize)]
 520    struct MetaTerminalAuth {
 521        label: String,
 522        command: String,
 523        #[serde(default)]
 524        args: Vec<String>,
 525        #[serde(default)]
 526        env: HashMap<String, String>,
 527    }
 528
 529    let meta = match method {
 530        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 531        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 532        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 533        _ => None,
 534    }?;
 535    let terminal_auth =
 536        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 537
 538    Some(acp_thread::build_terminal_auth_task(
 539        terminal_auth_task_id(agent_id, method_id),
 540        terminal_auth.label.clone(),
 541        terminal_auth.command,
 542        terminal_auth.args,
 543        terminal_auth.env,
 544    ))
 545}
 546
 547impl AgentConnection for AcpConnection {
 548    fn agent_id(&self) -> AgentId {
 549        self.id.clone()
 550    }
 551
 552    fn telemetry_id(&self) -> SharedString {
 553        self.telemetry_id.clone()
 554    }
 555
 556    fn new_session(
 557        self: Rc<Self>,
 558        project: Entity<Project>,
 559        work_dirs: PathList,
 560        cx: &mut App,
 561    ) -> Task<Result<Entity<AcpThread>>> {
 562        // TODO: remove this once ACP supports multiple working directories
 563        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 564            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 565        };
 566        let name = self.id.0.clone();
 567        let mcp_servers = mcp_servers_for_project(&project, cx);
 568
 569        cx.spawn(async move |cx| {
 570            let response = self.connection
 571                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
 572                .await
 573                .map_err(map_acp_error)?;
 574
 575            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
 576
 577            if let Some(default_mode) = self.default_mode.clone() {
 578                if let Some(modes) = modes.as_ref() {
 579                    let mut modes_ref = modes.borrow_mut();
 580                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 581
 582                    if has_mode {
 583                        let initial_mode_id = modes_ref.current_mode_id.clone();
 584
 585                        cx.spawn({
 586                            let default_mode = default_mode.clone();
 587                            let session_id = response.session_id.clone();
 588                            let modes = modes.clone();
 589                            let conn = self.connection.clone();
 590                            async move |_| {
 591                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 592                                .await.log_err();
 593
 594                                if result.is_none() {
 595                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 596                                }
 597                            }
 598                        }).detach();
 599
 600                        modes_ref.current_mode_id = default_mode;
 601                    } else {
 602                        let available_modes = modes_ref
 603                            .available_modes
 604                            .iter()
 605                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 606                            .collect::<Vec<_>>()
 607                            .join("\n");
 608
 609                        log::warn!(
 610                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 611                        );
 612                    }
 613                }
 614            }
 615
 616            if let Some(default_model) = self.default_model.clone() {
 617                if let Some(models) = models.as_ref() {
 618                    let mut models_ref = models.borrow_mut();
 619                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 620
 621                    if has_model {
 622                        let initial_model_id = models_ref.current_model_id.clone();
 623
 624                        cx.spawn({
 625                            let default_model = default_model.clone();
 626                            let session_id = response.session_id.clone();
 627                            let models = models.clone();
 628                            let conn = self.connection.clone();
 629                            async move |_| {
 630                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 631                                .await.log_err();
 632
 633                                if result.is_none() {
 634                                    models.borrow_mut().current_model_id = initial_model_id;
 635                                }
 636                            }
 637                        }).detach();
 638
 639                        models_ref.current_model_id = default_model;
 640                    } else {
 641                        let available_models = models_ref
 642                            .available_models
 643                            .iter()
 644                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 645                            .collect::<Vec<_>>()
 646                            .join("\n");
 647
 648                        log::warn!(
 649                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 650                        );
 651                    }
 652                }
 653            }
 654
 655            if let Some(config_opts) = config_options.as_ref() {
 656                self.apply_default_config_options(&response.session_id, config_opts, cx);
 657            }
 658
 659            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 660            let thread: Entity<AcpThread> = cx.new(|cx| {
 661                AcpThread::new(
 662                    None,
 663                    self.display_name.clone(),
 664                    Some(work_dirs),
 665                    self.clone(),
 666                    project,
 667                    action_log,
 668                    response.session_id.clone(),
 669                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 670                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 671                    cx,
 672                )
 673            });
 674
 675            self.sessions.borrow_mut().insert(
 676                response.session_id,
 677                AcpSession {
 678                    thread: thread.downgrade(),
 679                    suppress_abort_err: false,
 680                    session_modes: modes,
 681                    models,
 682                    config_options: config_options.map(ConfigOptions::new),
 683                },
 684            );
 685
 686            Ok(thread)
 687        })
 688    }
 689
 690    fn supports_load_session(&self) -> bool {
 691        self.agent_capabilities.load_session
 692    }
 693
 694    fn supports_resume_session(&self) -> bool {
 695        self.agent_capabilities
 696            .session_capabilities
 697            .resume
 698            .is_some()
 699    }
 700
 701    fn load_session(
 702        self: Rc<Self>,
 703        session_id: acp::SessionId,
 704        project: Entity<Project>,
 705        work_dirs: PathList,
 706        title: Option<SharedString>,
 707        cx: &mut App,
 708    ) -> Task<Result<Entity<AcpThread>>> {
 709        if !self.agent_capabilities.load_session {
 710            return Task::ready(Err(anyhow!(LoadError::Other(
 711                "Loading sessions is not supported by this agent.".into()
 712            ))));
 713        }
 714        // TODO: remove this once ACP supports multiple working directories
 715        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 716            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 717        };
 718
 719        let mcp_servers = mcp_servers_for_project(&project, cx);
 720        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 721        let title = title.unwrap_or_else(|| self.display_name.clone());
 722        let thread: Entity<AcpThread> = cx.new(|cx| {
 723            AcpThread::new(
 724                None,
 725                title,
 726                Some(work_dirs.clone()),
 727                self.clone(),
 728                project,
 729                action_log,
 730                session_id.clone(),
 731                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 732                cx,
 733            )
 734        });
 735
 736        self.sessions.borrow_mut().insert(
 737            session_id.clone(),
 738            AcpSession {
 739                thread: thread.downgrade(),
 740                suppress_abort_err: false,
 741                session_modes: None,
 742                models: None,
 743                config_options: None,
 744            },
 745        );
 746
 747        cx.spawn(async move |cx| {
 748            let response = match self
 749                .connection
 750                .load_session(
 751                    acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
 752                )
 753                .await
 754            {
 755                Ok(response) => response,
 756                Err(err) => {
 757                    self.sessions.borrow_mut().remove(&session_id);
 758                    return Err(map_acp_error(err));
 759                }
 760            };
 761
 762            let (modes, models, config_options) =
 763                config_state(response.modes, response.models, response.config_options);
 764
 765            if let Some(config_opts) = config_options.as_ref() {
 766                self.apply_default_config_options(&session_id, config_opts, cx);
 767            }
 768
 769            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 770                session.session_modes = modes;
 771                session.models = models;
 772                session.config_options = config_options.map(ConfigOptions::new);
 773            }
 774
 775            Ok(thread)
 776        })
 777    }
 778
 779    fn resume_session(
 780        self: Rc<Self>,
 781        session_id: acp::SessionId,
 782        project: Entity<Project>,
 783        work_dirs: PathList,
 784        title: Option<SharedString>,
 785        cx: &mut App,
 786    ) -> Task<Result<Entity<AcpThread>>> {
 787        if self
 788            .agent_capabilities
 789            .session_capabilities
 790            .resume
 791            .is_none()
 792        {
 793            return Task::ready(Err(anyhow!(LoadError::Other(
 794                "Resuming sessions is not supported by this agent.".into()
 795            ))));
 796        }
 797        // TODO: remove this once ACP supports multiple working directories
 798        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 799            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 800        };
 801
 802        let mcp_servers = mcp_servers_for_project(&project, cx);
 803        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 804        let title = title.unwrap_or_else(|| self.display_name.clone());
 805        let thread: Entity<AcpThread> = cx.new(|cx| {
 806            AcpThread::new(
 807                None,
 808                title,
 809                Some(work_dirs),
 810                self.clone(),
 811                project,
 812                action_log,
 813                session_id.clone(),
 814                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 815                cx,
 816            )
 817        });
 818
 819        self.sessions.borrow_mut().insert(
 820            session_id.clone(),
 821            AcpSession {
 822                thread: thread.downgrade(),
 823                suppress_abort_err: false,
 824                session_modes: None,
 825                models: None,
 826                config_options: None,
 827            },
 828        );
 829
 830        cx.spawn(async move |cx| {
 831            let response = match self
 832                .connection
 833                .resume_session(
 834                    acp::ResumeSessionRequest::new(session_id.clone(), cwd)
 835                        .mcp_servers(mcp_servers),
 836                )
 837                .await
 838            {
 839                Ok(response) => response,
 840                Err(err) => {
 841                    self.sessions.borrow_mut().remove(&session_id);
 842                    return Err(map_acp_error(err));
 843                }
 844            };
 845
 846            let (modes, models, config_options) =
 847                config_state(response.modes, response.models, response.config_options);
 848
 849            if let Some(config_opts) = config_options.as_ref() {
 850                self.apply_default_config_options(&session_id, config_opts, cx);
 851            }
 852
 853            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 854                session.session_modes = modes;
 855                session.models = models;
 856                session.config_options = config_options.map(ConfigOptions::new);
 857            }
 858
 859            Ok(thread)
 860        })
 861    }
 862
 863    fn supports_close_session(&self) -> bool {
 864        self.agent_capabilities.session_capabilities.close.is_some()
 865    }
 866
 867    fn close_session(
 868        self: Rc<Self>,
 869        session_id: &acp::SessionId,
 870        cx: &mut App,
 871    ) -> Task<Result<()>> {
 872        if !self.supports_close_session() {
 873            return Task::ready(Err(anyhow!(LoadError::Other(
 874                "Closing sessions is not supported by this agent.".into()
 875            ))));
 876        }
 877
 878        let conn = self.connection.clone();
 879        let session_id = session_id.clone();
 880        cx.foreground_executor().spawn(async move {
 881            conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
 882                .await?;
 883            self.sessions.borrow_mut().remove(&session_id);
 884            Ok(())
 885        })
 886    }
 887
 888    fn auth_methods(&self) -> &[acp::AuthMethod] {
 889        &self.auth_methods
 890    }
 891
 892    fn terminal_auth_task(
 893        &self,
 894        method_id: &acp::AuthMethodId,
 895        cx: &App,
 896    ) -> Option<SpawnInTerminal> {
 897        let method = self
 898            .auth_methods
 899            .iter()
 900            .find(|method| method.id() == method_id)?;
 901
 902        match method {
 903            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
 904                Some(terminal_auth_task(&self.command, &self.id, terminal))
 905            }
 906            _ => meta_terminal_auth_task(&self.id, method_id, method),
 907        }
 908    }
 909
 910    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 911        let conn = self.connection.clone();
 912        cx.foreground_executor().spawn(async move {
 913            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 914                .await?;
 915            Ok(())
 916        })
 917    }
 918
 919    fn prompt(
 920        &self,
 921        _id: Option<acp_thread::UserMessageId>,
 922        params: acp::PromptRequest,
 923        cx: &mut App,
 924    ) -> Task<Result<acp::PromptResponse>> {
 925        let conn = self.connection.clone();
 926        let sessions = self.sessions.clone();
 927        let session_id = params.session_id.clone();
 928        cx.foreground_executor().spawn(async move {
 929            let result = conn.prompt(params).await;
 930
 931            let mut suppress_abort_err = false;
 932
 933            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 934                suppress_abort_err = session.suppress_abort_err;
 935                session.suppress_abort_err = false;
 936            }
 937
 938            match result {
 939                Ok(response) => Ok(response),
 940                Err(err) => {
 941                    if err.code == acp::ErrorCode::AuthRequired {
 942                        return Err(anyhow!(acp::Error::auth_required()));
 943                    }
 944
 945                    if err.code != ErrorCode::InternalError {
 946                        anyhow::bail!(err)
 947                    }
 948
 949                    let Some(data) = &err.data else {
 950                        anyhow::bail!(err)
 951                    };
 952
 953                    // Temporary workaround until the following PR is generally available:
 954                    // https://github.com/google-gemini/gemini-cli/pull/6656
 955
 956                    #[derive(Deserialize)]
 957                    #[serde(deny_unknown_fields)]
 958                    struct ErrorDetails {
 959                        details: Box<str>,
 960                    }
 961
 962                    match serde_json::from_value(data.clone()) {
 963                        Ok(ErrorDetails { details }) => {
 964                            if suppress_abort_err
 965                                && (details.contains("This operation was aborted")
 966                                    || details.contains("The user aborted a request"))
 967                            {
 968                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
 969                            } else {
 970                                Err(anyhow!(details))
 971                            }
 972                        }
 973                        Err(_) => Err(anyhow!(err)),
 974                    }
 975                }
 976            }
 977        })
 978    }
 979
 980    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
 981        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
 982            session.suppress_abort_err = true;
 983        }
 984        let conn = self.connection.clone();
 985        let params = acp::CancelNotification::new(session_id.clone());
 986        cx.foreground_executor()
 987            .spawn(async move { conn.cancel(params).await })
 988            .detach();
 989    }
 990
 991    fn session_modes(
 992        &self,
 993        session_id: &acp::SessionId,
 994        _cx: &App,
 995    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
 996        let sessions = self.sessions.clone();
 997        let sessions_ref = sessions.borrow();
 998        let Some(session) = sessions_ref.get(session_id) else {
 999            return None;
1000        };
1001
1002        if let Some(modes) = session.session_modes.as_ref() {
1003            Some(Rc::new(AcpSessionModes {
1004                connection: self.connection.clone(),
1005                session_id: session_id.clone(),
1006                state: modes.clone(),
1007            }) as _)
1008        } else {
1009            None
1010        }
1011    }
1012
1013    fn model_selector(
1014        &self,
1015        session_id: &acp::SessionId,
1016    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1017        let sessions = self.sessions.clone();
1018        let sessions_ref = sessions.borrow();
1019        let Some(session) = sessions_ref.get(session_id) else {
1020            return None;
1021        };
1022
1023        if let Some(models) = session.models.as_ref() {
1024            Some(Rc::new(AcpModelSelector::new(
1025                session_id.clone(),
1026                self.connection.clone(),
1027                models.clone(),
1028            )) as _)
1029        } else {
1030            None
1031        }
1032    }
1033
1034    fn session_config_options(
1035        &self,
1036        session_id: &acp::SessionId,
1037        _cx: &App,
1038    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1039        let sessions = self.sessions.borrow();
1040        let session = sessions.get(session_id)?;
1041
1042        let config_opts = session.config_options.as_ref()?;
1043
1044        Some(Rc::new(AcpSessionConfigOptions {
1045            session_id: session_id.clone(),
1046            connection: self.connection.clone(),
1047            state: config_opts.config_options.clone(),
1048            watch_tx: config_opts.tx.clone(),
1049            watch_rx: config_opts.rx.clone(),
1050        }) as _)
1051    }
1052
1053    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1054        self.session_list.clone().map(|s| s as _)
1055    }
1056
1057    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1058        self
1059    }
1060}
1061
1062fn map_acp_error(err: acp::Error) -> anyhow::Error {
1063    if err.code == acp::ErrorCode::AuthRequired {
1064        let mut error = AuthRequired::new();
1065
1066        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1067            error = error.with_description(err.message);
1068        }
1069
1070        anyhow!(error)
1071    } else {
1072        anyhow!(err)
1073    }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079
1080    #[test]
1081    fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1082        let command = AgentServerCommand {
1083            path: "/path/to/agent".into(),
1084            args: vec!["--acp".into(), "--verbose".into()],
1085            env: Some(HashMap::from_iter([
1086                ("BASE".into(), "1".into()),
1087                ("SHARED".into(), "base".into()),
1088            ])),
1089        };
1090        let method = acp::AuthMethodTerminal::new("login", "Login")
1091            .args(vec!["/auth".into()])
1092            .env(std::collections::HashMap::from_iter([
1093                ("EXTRA".into(), "2".into()),
1094                ("SHARED".into(), "override".into()),
1095            ]));
1096
1097        let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1098
1099        assert_eq!(
1100            terminal_auth_task.command.as_deref(),
1101            Some("/path/to/agent")
1102        );
1103        assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1104        assert_eq!(
1105            terminal_auth_task.env,
1106            HashMap::from_iter([
1107                ("BASE".into(), "1".into()),
1108                ("SHARED".into(), "override".into()),
1109                ("EXTRA".into(), "2".into()),
1110            ])
1111        );
1112        assert_eq!(terminal_auth_task.label, "Login");
1113        assert_eq!(terminal_auth_task.command_label, "Login");
1114    }
1115
1116    #[test]
1117    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1118        let method_id = acp::AuthMethodId::new("legacy-login");
1119        let method = acp::AuthMethod::Agent(
1120            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1121                "terminal-auth".to_string(),
1122                serde_json::json!({
1123                    "label": "legacy /auth",
1124                    "command": "legacy-agent",
1125                    "args": ["auth", "--interactive"],
1126                    "env": {
1127                        "AUTH_MODE": "interactive",
1128                    },
1129                }),
1130            )])),
1131        );
1132
1133        let terminal_auth_task =
1134            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1135                .expect("expected legacy terminal auth task");
1136
1137        assert_eq!(
1138            terminal_auth_task.id.0,
1139            "external-agent-test-agent-legacy-login-login"
1140        );
1141        assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1142        assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1143        assert_eq!(
1144            terminal_auth_task.env,
1145            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1146        );
1147        assert_eq!(terminal_auth_task.label, "legacy /auth");
1148    }
1149
1150    #[test]
1151    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1152        let method_id = acp::AuthMethodId::new("legacy-login");
1153        let method = acp::AuthMethod::Agent(
1154            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1155                "terminal-auth".to_string(),
1156                serde_json::json!({
1157                    "label": "legacy /auth",
1158                }),
1159            )])),
1160        );
1161
1162        assert!(
1163            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1164        );
1165    }
1166
1167    #[test]
1168    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1169        let method_id = acp::AuthMethodId::new("login");
1170        let method = acp::AuthMethod::Terminal(
1171            acp::AuthMethodTerminal::new(method_id, "Login")
1172                .args(vec!["/auth".into()])
1173                .env(std::collections::HashMap::from_iter([(
1174                    "AUTH_MODE".into(),
1175                    "first-class".into(),
1176                )]))
1177                .meta(acp::Meta::from_iter([(
1178                    "terminal-auth".to_string(),
1179                    serde_json::json!({
1180                        "label": "legacy /auth",
1181                        "command": "legacy-agent",
1182                        "args": ["legacy-auth"],
1183                        "env": {
1184                            "AUTH_MODE": "legacy",
1185                        },
1186                    }),
1187                )])),
1188        );
1189
1190        let command = AgentServerCommand {
1191            path: "/path/to/agent".into(),
1192            args: vec!["--acp".into()],
1193            env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1194        };
1195
1196        let terminal_auth_task = match &method {
1197            acp::AuthMethod::Terminal(terminal) => {
1198                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1199            }
1200            _ => unreachable!(),
1201        };
1202
1203        assert_eq!(
1204            terminal_auth_task.command.as_deref(),
1205            Some("/path/to/agent")
1206        );
1207        assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1208        assert_eq!(
1209            terminal_auth_task.env,
1210            HashMap::from_iter([
1211                ("BASE".into(), "1".into()),
1212                ("AUTH_MODE".into(), "first-class".into()),
1213            ])
1214        );
1215        assert_eq!(terminal_auth_task.label, "Login");
1216    }
1217}
1218
1219fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1220    let context_server_store = project.read(cx).context_server_store().read(cx);
1221    let is_local = project.read(cx).is_local();
1222    context_server_store
1223        .configured_server_ids()
1224        .iter()
1225        .filter_map(|id| {
1226            let configuration = context_server_store.configuration_for_server(id)?;
1227            match &*configuration {
1228                project::context_server_store::ContextServerConfiguration::Custom {
1229                    command,
1230                    remote,
1231                    ..
1232                }
1233                | project::context_server_store::ContextServerConfiguration::Extension {
1234                    command,
1235                    remote,
1236                    ..
1237                } if is_local || *remote => Some(acp::McpServer::Stdio(
1238                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
1239                        .args(command.args.clone())
1240                        .env(if let Some(env) = command.env.as_ref() {
1241                            env.iter()
1242                                .map(|(name, value)| acp::EnvVariable::new(name, value))
1243                                .collect()
1244                        } else {
1245                            vec![]
1246                        }),
1247                )),
1248                project::context_server_store::ContextServerConfiguration::Http {
1249                    url,
1250                    headers,
1251                    timeout: _,
1252                } => Some(acp::McpServer::Http(
1253                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1254                        headers
1255                            .iter()
1256                            .map(|(name, value)| acp::HttpHeader::new(name, value))
1257                            .collect(),
1258                    ),
1259                )),
1260                _ => None,
1261            }
1262        })
1263        .collect()
1264}
1265
1266fn config_state(
1267    modes: Option<acp::SessionModeState>,
1268    models: Option<acp::SessionModelState>,
1269    config_options: Option<Vec<acp::SessionConfigOption>>,
1270) -> (
1271    Option<Rc<RefCell<acp::SessionModeState>>>,
1272    Option<Rc<RefCell<acp::SessionModelState>>>,
1273    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1274) {
1275    if let Some(opts) = config_options {
1276        return (None, None, Some(Rc::new(RefCell::new(opts))));
1277    }
1278
1279    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1280    let models = models.map(|models| Rc::new(RefCell::new(models)));
1281    (modes, models, None)
1282}
1283
1284struct AcpSessionModes {
1285    session_id: acp::SessionId,
1286    connection: Rc<acp::ClientSideConnection>,
1287    state: Rc<RefCell<acp::SessionModeState>>,
1288}
1289
1290impl acp_thread::AgentSessionModes for AcpSessionModes {
1291    fn current_mode(&self) -> acp::SessionModeId {
1292        self.state.borrow().current_mode_id.clone()
1293    }
1294
1295    fn all_modes(&self) -> Vec<acp::SessionMode> {
1296        self.state.borrow().available_modes.clone()
1297    }
1298
1299    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1300        let connection = self.connection.clone();
1301        let session_id = self.session_id.clone();
1302        let old_mode_id;
1303        {
1304            let mut state = self.state.borrow_mut();
1305            old_mode_id = state.current_mode_id.clone();
1306            state.current_mode_id = mode_id.clone();
1307        };
1308        let state = self.state.clone();
1309        cx.foreground_executor().spawn(async move {
1310            let result = connection
1311                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1312                .await;
1313
1314            if result.is_err() {
1315                state.borrow_mut().current_mode_id = old_mode_id;
1316            }
1317
1318            result?;
1319
1320            Ok(())
1321        })
1322    }
1323}
1324
1325struct AcpModelSelector {
1326    session_id: acp::SessionId,
1327    connection: Rc<acp::ClientSideConnection>,
1328    state: Rc<RefCell<acp::SessionModelState>>,
1329}
1330
1331impl AcpModelSelector {
1332    fn new(
1333        session_id: acp::SessionId,
1334        connection: Rc<acp::ClientSideConnection>,
1335        state: Rc<RefCell<acp::SessionModelState>>,
1336    ) -> Self {
1337        Self {
1338            session_id,
1339            connection,
1340            state,
1341        }
1342    }
1343}
1344
1345impl acp_thread::AgentModelSelector for AcpModelSelector {
1346    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1347        Task::ready(Ok(acp_thread::AgentModelList::Flat(
1348            self.state
1349                .borrow()
1350                .available_models
1351                .clone()
1352                .into_iter()
1353                .map(acp_thread::AgentModelInfo::from)
1354                .collect(),
1355        )))
1356    }
1357
1358    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1359        let connection = self.connection.clone();
1360        let session_id = self.session_id.clone();
1361        let old_model_id;
1362        {
1363            let mut state = self.state.borrow_mut();
1364            old_model_id = state.current_model_id.clone();
1365            state.current_model_id = model_id.clone();
1366        };
1367        let state = self.state.clone();
1368        cx.foreground_executor().spawn(async move {
1369            let result = connection
1370                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1371                .await;
1372
1373            if result.is_err() {
1374                state.borrow_mut().current_model_id = old_model_id;
1375            }
1376
1377            result?;
1378
1379            Ok(())
1380        })
1381    }
1382
1383    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1384        let state = self.state.borrow();
1385        Task::ready(
1386            state
1387                .available_models
1388                .iter()
1389                .find(|m| m.model_id == state.current_model_id)
1390                .cloned()
1391                .map(acp_thread::AgentModelInfo::from)
1392                .ok_or_else(|| anyhow::anyhow!("Model not found")),
1393        )
1394    }
1395}
1396
1397struct AcpSessionConfigOptions {
1398    session_id: acp::SessionId,
1399    connection: Rc<acp::ClientSideConnection>,
1400    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1401    watch_tx: Rc<RefCell<watch::Sender<()>>>,
1402    watch_rx: watch::Receiver<()>,
1403}
1404
1405impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1406    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1407        self.state.borrow().clone()
1408    }
1409
1410    fn set_config_option(
1411        &self,
1412        config_id: acp::SessionConfigId,
1413        value: acp::SessionConfigValueId,
1414        cx: &mut App,
1415    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1416        let connection = self.connection.clone();
1417        let session_id = self.session_id.clone();
1418        let state = self.state.clone();
1419
1420        let watch_tx = self.watch_tx.clone();
1421
1422        cx.foreground_executor().spawn(async move {
1423            let response = connection
1424                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1425                    session_id, config_id, value,
1426                ))
1427                .await?;
1428
1429            *state.borrow_mut() = response.config_options.clone();
1430            watch_tx.borrow_mut().send(()).ok();
1431            Ok(response.config_options)
1432        })
1433    }
1434
1435    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1436        Some(self.watch_rx.clone())
1437    }
1438}
1439
1440struct ClientDelegate {
1441    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1442    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1443    cx: AsyncApp,
1444}
1445
1446#[async_trait::async_trait(?Send)]
1447impl acp::Client for ClientDelegate {
1448    async fn request_permission(
1449        &self,
1450        arguments: acp::RequestPermissionRequest,
1451    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1452        let thread;
1453        {
1454            let sessions_ref = self.sessions.borrow();
1455            let session = sessions_ref
1456                .get(&arguments.session_id)
1457                .context("Failed to get session")?;
1458            thread = session.thread.clone();
1459        }
1460
1461        let cx = &mut self.cx.clone();
1462
1463        let task = thread.update(cx, |thread, cx| {
1464            thread.request_tool_call_authorization(
1465                arguments.tool_call,
1466                acp_thread::PermissionOptions::Flat(arguments.options),
1467                cx,
1468            )
1469        })??;
1470
1471        let outcome = task.await;
1472
1473        Ok(acp::RequestPermissionResponse::new(outcome.into()))
1474    }
1475
1476    async fn write_text_file(
1477        &self,
1478        arguments: acp::WriteTextFileRequest,
1479    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1480        let cx = &mut self.cx.clone();
1481        let task = self
1482            .session_thread(&arguments.session_id)?
1483            .update(cx, |thread, cx| {
1484                thread.write_text_file(arguments.path, arguments.content, cx)
1485            })?;
1486
1487        task.await?;
1488
1489        Ok(Default::default())
1490    }
1491
1492    async fn read_text_file(
1493        &self,
1494        arguments: acp::ReadTextFileRequest,
1495    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1496        let task = self.session_thread(&arguments.session_id)?.update(
1497            &mut self.cx.clone(),
1498            |thread, cx| {
1499                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1500            },
1501        )?;
1502
1503        let content = task.await?;
1504
1505        Ok(acp::ReadTextFileResponse::new(content))
1506    }
1507
1508    async fn session_notification(
1509        &self,
1510        notification: acp::SessionNotification,
1511    ) -> Result<(), acp::Error> {
1512        let sessions = self.sessions.borrow();
1513        let session = sessions
1514            .get(&notification.session_id)
1515            .context("Failed to get session")?;
1516
1517        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1518            current_mode_id,
1519            ..
1520        }) = &notification.update
1521        {
1522            if let Some(session_modes) = &session.session_modes {
1523                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1524            }
1525        }
1526
1527        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1528            config_options,
1529            ..
1530        }) = &notification.update
1531        {
1532            if let Some(opts) = &session.config_options {
1533                *opts.config_options.borrow_mut() = config_options.clone();
1534                opts.tx.borrow_mut().send(()).ok();
1535            }
1536        }
1537
1538        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
1539            && let Some(session_list) = self.session_list.borrow().as_ref()
1540        {
1541            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1542        }
1543
1544        // Clone so we can inspect meta both before and after handing off to the thread
1545        let update_clone = notification.update.clone();
1546
1547        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1548        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1549            if let Some(meta) = &tc.meta {
1550                if let Some(terminal_info) = meta.get("terminal_info") {
1551                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1552                    {
1553                        let terminal_id = acp::TerminalId::new(id_str);
1554                        let cwd = terminal_info
1555                            .get("cwd")
1556                            .and_then(|v| v.as_str().map(PathBuf::from));
1557
1558                        // Create a minimal display-only lower-level terminal and register it.
1559                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1560                            let builder = TerminalBuilder::new_display_only(
1561                                CursorShape::default(),
1562                                AlternateScroll::On,
1563                                None,
1564                                0,
1565                                cx.background_executor(),
1566                                thread.project().read(cx).path_style(cx),
1567                            )?;
1568                            let lower = cx.new(|cx| builder.subscribe(cx));
1569                            thread.on_terminal_provider_event(
1570                                TerminalProviderEvent::Created {
1571                                    terminal_id,
1572                                    label: tc.title.clone(),
1573                                    cwd,
1574                                    output_byte_limit: None,
1575                                    terminal: lower,
1576                                },
1577                                cx,
1578                            );
1579                            anyhow::Ok(())
1580                        });
1581                    }
1582                }
1583            }
1584        }
1585
1586        // Forward the update to the acp_thread as usual.
1587        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1588            thread.handle_session_update(notification.update.clone(), cx)
1589        })??;
1590
1591        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1592        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1593            if let Some(meta) = &tcu.meta {
1594                if let Some(term_out) = meta.get("terminal_output") {
1595                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1596                        let terminal_id = acp::TerminalId::new(id_str);
1597                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1598                            let data = s.as_bytes().to_vec();
1599                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1600                                thread.on_terminal_provider_event(
1601                                    TerminalProviderEvent::Output { terminal_id, data },
1602                                    cx,
1603                                );
1604                            });
1605                        }
1606                    }
1607                }
1608
1609                // terminal_exit
1610                if let Some(term_exit) = meta.get("terminal_exit") {
1611                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1612                        let terminal_id = acp::TerminalId::new(id_str);
1613                        let status = acp::TerminalExitStatus::new()
1614                            .exit_code(
1615                                term_exit
1616                                    .get("exit_code")
1617                                    .and_then(|v| v.as_u64())
1618                                    .map(|i| i as u32),
1619                            )
1620                            .signal(
1621                                term_exit
1622                                    .get("signal")
1623                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1624                            );
1625
1626                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1627                            thread.on_terminal_provider_event(
1628                                TerminalProviderEvent::Exit {
1629                                    terminal_id,
1630                                    status,
1631                                },
1632                                cx,
1633                            );
1634                        });
1635                    }
1636                }
1637            }
1638        }
1639
1640        Ok(())
1641    }
1642
1643    async fn create_terminal(
1644        &self,
1645        args: acp::CreateTerminalRequest,
1646    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1647        let thread = self.session_thread(&args.session_id)?;
1648        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1649
1650        let terminal_entity = acp_thread::create_terminal_entity(
1651            args.command.clone(),
1652            &args.args,
1653            args.env
1654                .into_iter()
1655                .map(|env| (env.name, env.value))
1656                .collect(),
1657            args.cwd.clone(),
1658            &project,
1659            &mut self.cx.clone(),
1660        )
1661        .await?;
1662
1663        // Register with renderer
1664        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1665            thread.register_terminal_created(
1666                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1667                format!("{} {}", args.command, args.args.join(" ")),
1668                args.cwd.clone(),
1669                args.output_byte_limit,
1670                terminal_entity,
1671                cx,
1672            )
1673        })?;
1674        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1675        Ok(acp::CreateTerminalResponse::new(terminal_id))
1676    }
1677
1678    async fn kill_terminal(
1679        &self,
1680        args: acp::KillTerminalRequest,
1681    ) -> Result<acp::KillTerminalResponse, acp::Error> {
1682        self.session_thread(&args.session_id)?
1683            .update(&mut self.cx.clone(), |thread, cx| {
1684                thread.kill_terminal(args.terminal_id, cx)
1685            })??;
1686
1687        Ok(Default::default())
1688    }
1689
1690    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1691        Err(acp::Error::method_not_found())
1692    }
1693
1694    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1695        Err(acp::Error::method_not_found())
1696    }
1697
1698    async fn release_terminal(
1699        &self,
1700        args: acp::ReleaseTerminalRequest,
1701    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1702        self.session_thread(&args.session_id)?
1703            .update(&mut self.cx.clone(), |thread, cx| {
1704                thread.release_terminal(args.terminal_id, cx)
1705            })??;
1706
1707        Ok(Default::default())
1708    }
1709
1710    async fn terminal_output(
1711        &self,
1712        args: acp::TerminalOutputRequest,
1713    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1714        self.session_thread(&args.session_id)?
1715            .read_with(&mut self.cx.clone(), |thread, cx| {
1716                let out = thread
1717                    .terminal(args.terminal_id)?
1718                    .read(cx)
1719                    .current_output(cx);
1720
1721                Ok(out)
1722            })?
1723    }
1724
1725    async fn wait_for_terminal_exit(
1726        &self,
1727        args: acp::WaitForTerminalExitRequest,
1728    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1729        let exit_status = self
1730            .session_thread(&args.session_id)?
1731            .update(&mut self.cx.clone(), |thread, cx| {
1732                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1733            })??
1734            .await;
1735
1736        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1737    }
1738}
1739
1740impl ClientDelegate {
1741    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1742        let sessions = self.sessions.borrow();
1743        sessions
1744            .get(session_id)
1745            .context("Failed to get session")
1746            .map(|session| session.thread.clone())
1747    }
1748}