acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::io::BufReader;
  13use project::agent_server_store::{AgentServerCommand, AgentServerStore};
  14use project::{AgentId, Project};
  15use remote::remote_client::Interactive;
  16use serde::Deserialize;
  17use settings::Settings as _;
  18use std::path::PathBuf;
  19use std::process::Stdio;
  20use std::rc::Rc;
  21use std::{any::Any, cell::RefCell};
  22use task::{ShellBuilder, SpawnInTerminal};
  23use thiserror::Error;
  24use util::ResultExt as _;
  25use util::path_list::PathList;
  26use util::process::Child;
  27
  28use anyhow::{Context as _, Result};
  29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  30
  31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  32use terminal::TerminalBuilder;
  33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  34
  35use crate::GEMINI_ID;
  36
  37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  38
  39#[derive(Debug, Error)]
  40#[error("Unsupported version")]
  41pub struct UnsupportedVersion;
  42
  43pub struct AcpConnection {
  44    id: AgentId,
  45    telemetry_id: SharedString,
  46    connection: Rc<acp::ClientSideConnection>,
  47    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  48    auth_methods: Vec<acp::AuthMethod>,
  49    agent_server_store: WeakEntity<AgentServerStore>,
  50    agent_capabilities: acp::AgentCapabilities,
  51    default_mode: Option<acp::SessionModeId>,
  52    default_model: Option<acp::ModelId>,
  53    default_config_options: HashMap<String, String>,
  54    child: Child,
  55    session_list: Option<Rc<AcpSessionList>>,
  56    _io_task: Task<Result<(), acp::Error>>,
  57    _wait_task: Task<Result<()>>,
  58    _stderr_task: Task<Result<()>>,
  59}
  60
  61struct ConfigOptions {
  62    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  63    tx: Rc<RefCell<watch::Sender<()>>>,
  64    rx: watch::Receiver<()>,
  65}
  66
  67impl ConfigOptions {
  68    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  69        let (tx, rx) = watch::channel(());
  70        Self {
  71            config_options,
  72            tx: Rc::new(RefCell::new(tx)),
  73            rx,
  74        }
  75    }
  76}
  77
  78pub struct AcpSession {
  79    thread: WeakEntity<AcpThread>,
  80    suppress_abort_err: bool,
  81    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  82    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  83    config_options: Option<ConfigOptions>,
  84}
  85
  86pub struct AcpSessionList {
  87    connection: Rc<acp::ClientSideConnection>,
  88    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
  89    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
  90}
  91
  92impl AcpSessionList {
  93    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
  94        let (tx, rx) = smol::channel::unbounded();
  95        Self {
  96            connection,
  97            updates_tx: tx,
  98            updates_rx: rx,
  99        }
 100    }
 101
 102    fn notify_update(&self) {
 103        self.updates_tx
 104            .try_send(acp_thread::SessionListUpdate::Refresh)
 105            .log_err();
 106    }
 107
 108    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 109        self.updates_tx
 110            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 111            .log_err();
 112    }
 113}
 114
 115impl AgentSessionList for AcpSessionList {
 116    fn list_sessions(
 117        &self,
 118        request: AgentSessionListRequest,
 119        cx: &mut App,
 120    ) -> Task<Result<AgentSessionListResponse>> {
 121        let conn = self.connection.clone();
 122        cx.foreground_executor().spawn(async move {
 123            let acp_request = acp::ListSessionsRequest::new()
 124                .cwd(request.cwd)
 125                .cursor(request.cursor);
 126            let response = conn.list_sessions(acp_request).await?;
 127            Ok(AgentSessionListResponse {
 128                sessions: response
 129                    .sessions
 130                    .into_iter()
 131                    .map(|s| AgentSessionInfo {
 132                        session_id: s.session_id,
 133                        work_dirs: Some(PathList::new(&[s.cwd])),
 134                        title: s.title.map(Into::into),
 135                        updated_at: s.updated_at.and_then(|date_str| {
 136                            chrono::DateTime::parse_from_rfc3339(&date_str)
 137                                .ok()
 138                                .map(|dt| dt.with_timezone(&chrono::Utc))
 139                        }),
 140                        created_at: None,
 141                        meta: s.meta,
 142                    })
 143                    .collect(),
 144                next_cursor: response.next_cursor,
 145                meta: response.meta,
 146            })
 147        })
 148    }
 149
 150    fn watch(
 151        &self,
 152        _cx: &mut App,
 153    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 154        Some(self.updates_rx.clone())
 155    }
 156
 157    fn notify_refresh(&self) {
 158        self.notify_update();
 159    }
 160
 161    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 162        self
 163    }
 164}
 165
 166pub async fn connect(
 167    agent_id: AgentId,
 168    project: Entity<Project>,
 169    command: AgentServerCommand,
 170    agent_server_store: WeakEntity<AgentServerStore>,
 171    default_mode: Option<acp::SessionModeId>,
 172    default_model: Option<acp::ModelId>,
 173    default_config_options: HashMap<String, String>,
 174    cx: &mut AsyncApp,
 175) -> Result<Rc<dyn AgentConnection>> {
 176    let conn = AcpConnection::stdio(
 177        agent_id,
 178        project,
 179        command.clone(),
 180        agent_server_store,
 181        default_mode,
 182        default_model,
 183        default_config_options,
 184        cx,
 185    )
 186    .await?;
 187    Ok(Rc::new(conn) as _)
 188}
 189
 190const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 191
 192impl AcpConnection {
 193    pub fn client_connection(&self) -> &Rc<acp::ClientSideConnection> {
 194        &self.connection
 195    }
 196
 197    pub async fn stdio(
 198        agent_id: AgentId,
 199        project: Entity<Project>,
 200        command: AgentServerCommand,
 201        agent_server_store: WeakEntity<AgentServerStore>,
 202        default_mode: Option<acp::SessionModeId>,
 203        default_model: Option<acp::ModelId>,
 204        default_config_options: HashMap<String, String>,
 205        cx: &mut AsyncApp,
 206    ) -> Result<Self> {
 207        let root_dir = project.read_with(cx, |project, cx| {
 208            project
 209                .default_path_list(cx)
 210                .ordered_paths()
 211                .next()
 212                .cloned()
 213        });
 214        let original_command = command.clone();
 215        let (path, args, env) = project
 216            .read_with(cx, |project, cx| {
 217                project.remote_client().and_then(|client| {
 218                    let template = client
 219                        .read(cx)
 220                        .build_command_with_options(
 221                            Some(command.path.display().to_string()),
 222                            &command.args,
 223                            &command.env.clone().into_iter().flatten().collect(),
 224                            root_dir.as_ref().map(|path| path.display().to_string()),
 225                            None,
 226                            Interactive::No,
 227                        )
 228                        .log_err()?;
 229                    Some((template.program, template.args, template.env))
 230                })
 231            })
 232            .unwrap_or_else(|| {
 233                (
 234                    command.path.display().to_string(),
 235                    command.args,
 236                    command.env.unwrap_or_default(),
 237                )
 238            });
 239
 240        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 241        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 242        let mut child = builder.build_std_command(Some(path.clone()), &args);
 243        child.envs(env.clone());
 244        if let Some(cwd) = project.read_with(cx, |project, _cx| {
 245            if project.is_local() {
 246                root_dir.as_ref()
 247            } else {
 248                None
 249            }
 250        }) {
 251            child.current_dir(cwd);
 252        }
 253        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 254
 255        let stdout = child.stdout.take().context("Failed to take stdout")?;
 256        let stdin = child.stdin.take().context("Failed to take stdin")?;
 257        let stderr = child.stderr.take().context("Failed to take stderr")?;
 258        log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
 259        log::trace!("Spawned (pid: {})", child.id());
 260
 261        let sessions = Rc::new(RefCell::new(HashMap::default()));
 262
 263        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 264            (
 265                release_channel::ReleaseChannel::try_global(cx)
 266                    .map(|release_channel| release_channel.display_name()),
 267                release_channel::AppVersion::global(cx).to_string(),
 268            )
 269        });
 270
 271        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 272            Rc::new(RefCell::new(None));
 273
 274        let client = ClientDelegate {
 275            sessions: sessions.clone(),
 276            session_list: client_session_list.clone(),
 277            cx: cx.clone(),
 278        };
 279        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 280            let foreground_executor = cx.foreground_executor().clone();
 281            move |fut| {
 282                foreground_executor.spawn(fut).detach();
 283            }
 284        });
 285
 286        let io_task = cx.background_spawn(io_task);
 287
 288        let stderr_task = cx.background_spawn(async move {
 289            let mut stderr = BufReader::new(stderr);
 290            let mut line = String::new();
 291            while let Ok(n) = stderr.read_line(&mut line).await
 292                && n > 0
 293            {
 294                log::warn!("agent stderr: {}", line.trim());
 295                line.clear();
 296            }
 297            Ok(())
 298        });
 299
 300        let wait_task = cx.spawn({
 301            let sessions = sessions.clone();
 302            let status_fut = child.status();
 303            async move |cx| {
 304                let status = status_fut.await?;
 305
 306                for session in sessions.borrow().values() {
 307                    session
 308                        .thread
 309                        .update(cx, |thread, cx| {
 310                            thread.emit_load_error(LoadError::Exited { status }, cx)
 311                        })
 312                        .ok();
 313                }
 314
 315                anyhow::Ok(())
 316            }
 317        });
 318
 319        let connection = Rc::new(connection);
 320
 321        let response = connection
 322            .initialize(
 323                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 324                    .client_capabilities(
 325                        acp::ClientCapabilities::new()
 326                            .fs(acp::FileSystemCapabilities::new()
 327                                .read_text_file(true)
 328                                .write_text_file(true))
 329                            .terminal(true)
 330                            .auth(acp::AuthCapabilities::new().terminal(true))
 331                            // Experimental: Allow for rendering terminal output from the agents
 332                            .meta(acp::Meta::from_iter([
 333                                ("terminal_output".into(), true.into()),
 334                                ("terminal-auth".into(), true.into()),
 335                            ])),
 336                    )
 337                    .client_info(
 338                        acp::Implementation::new("zed", version)
 339                            .title(release_channel.map(ToOwned::to_owned)),
 340                    ),
 341            )
 342            .await?;
 343
 344        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 345            return Err(UnsupportedVersion.into());
 346        }
 347
 348        let telemetry_id = response
 349            .agent_info
 350            // Use the one the agent provides if we have one
 351            .map(|info| info.name.into())
 352            // Otherwise, just use the name
 353            .unwrap_or_else(|| agent_id.0.clone());
 354
 355        let session_list = if response
 356            .agent_capabilities
 357            .session_capabilities
 358            .list
 359            .is_some()
 360        {
 361            let list = Rc::new(AcpSessionList::new(connection.clone()));
 362            *client_session_list.borrow_mut() = Some(list.clone());
 363            Some(list)
 364        } else {
 365            None
 366        };
 367
 368        // TODO: Remove this override once Google team releases their official auth methods
 369        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 370            let mut gemini_args = original_command.args.clone();
 371            gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
 372            let value = serde_json::json!({
 373                "label": "gemini /auth",
 374                "command": original_command.path.to_string_lossy(),
 375                "args": gemini_args,
 376                "env": original_command.env.unwrap_or_default(),
 377            });
 378            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 379            vec![acp::AuthMethod::Agent(
 380                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 381                    .description("Login with your Google or Vertex AI account")
 382                    .meta(meta),
 383            )]
 384        } else {
 385            response.auth_methods
 386        };
 387        Ok(Self {
 388            id: agent_id,
 389            auth_methods,
 390            agent_server_store,
 391            connection,
 392            telemetry_id,
 393            sessions,
 394            agent_capabilities: response.agent_capabilities,
 395            default_mode,
 396            default_model,
 397            default_config_options,
 398            session_list,
 399            _io_task: io_task,
 400            _wait_task: wait_task,
 401            _stderr_task: stderr_task,
 402            child,
 403        })
 404    }
 405
 406    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 407        &self.agent_capabilities.prompt_capabilities
 408    }
 409
 410    fn apply_default_config_options(
 411        &self,
 412        session_id: &acp::SessionId,
 413        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 414        cx: &mut AsyncApp,
 415    ) {
 416        let id = self.id.clone();
 417        let defaults_to_apply: Vec<_> = {
 418            let config_opts_ref = config_options.borrow();
 419            config_opts_ref
 420                .iter()
 421                .filter_map(|config_option| {
 422                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 423
 424                    let is_valid = match &config_option.kind {
 425                        acp::SessionConfigKind::Select(select) => match &select.options {
 426                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 427                                .iter()
 428                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 429                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 430                                groups.iter().any(|g| {
 431                                    g.options
 432                                        .iter()
 433                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 434                                })
 435                            }
 436                            _ => false,
 437                        },
 438                        _ => false,
 439                    };
 440
 441                    if is_valid {
 442                        let initial_value = match &config_option.kind {
 443                            acp::SessionConfigKind::Select(select) => {
 444                                Some(select.current_value.clone())
 445                            }
 446                            _ => None,
 447                        };
 448                        Some((
 449                            config_option.id.clone(),
 450                            default_value.clone(),
 451                            initial_value,
 452                        ))
 453                    } else {
 454                        log::warn!(
 455                            "`{}` is not a valid value for config option `{}` in {}",
 456                            default_value,
 457                            config_option.id.0,
 458                            id
 459                        );
 460                        None
 461                    }
 462                })
 463                .collect()
 464        };
 465
 466        for (config_id, default_value, initial_value) in defaults_to_apply {
 467            cx.spawn({
 468                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 469                let session_id = session_id.clone();
 470                let config_id_clone = config_id.clone();
 471                let config_opts = config_options.clone();
 472                let conn = self.connection.clone();
 473                async move |_| {
 474                    let result = conn
 475                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 476                            session_id,
 477                            config_id_clone.clone(),
 478                            default_value_id,
 479                        ))
 480                        .await
 481                        .log_err();
 482
 483                    if result.is_none() {
 484                        if let Some(initial) = initial_value {
 485                            let mut opts = config_opts.borrow_mut();
 486                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 487                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 488                                    select.current_value = initial;
 489                                }
 490                            }
 491                        }
 492                    }
 493                }
 494            })
 495            .detach();
 496
 497            let mut opts = config_options.borrow_mut();
 498            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 499                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 500                    select.current_value = acp::SessionConfigValueId::new(default_value);
 501                }
 502            }
 503        }
 504    }
 505}
 506
 507impl Drop for AcpConnection {
 508    fn drop(&mut self) {
 509        self.child.kill().log_err();
 510    }
 511}
 512
 513fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 514    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 515}
 516
 517fn terminal_auth_task(
 518    command: &AgentServerCommand,
 519    agent_id: &AgentId,
 520    method: &acp::AuthMethodTerminal,
 521) -> SpawnInTerminal {
 522    acp_thread::build_terminal_auth_task(
 523        terminal_auth_task_id(agent_id, &method.id),
 524        method.name.clone(),
 525        command.path.to_string_lossy().into_owned(),
 526        command.args.clone(),
 527        command.env.clone().unwrap_or_default(),
 528    )
 529}
 530
 531/// Used to support the _meta method prior to stabilization
 532fn meta_terminal_auth_task(
 533    agent_id: &AgentId,
 534    method_id: &acp::AuthMethodId,
 535    method: &acp::AuthMethod,
 536) -> Option<SpawnInTerminal> {
 537    #[derive(Deserialize)]
 538    struct MetaTerminalAuth {
 539        label: String,
 540        command: String,
 541        #[serde(default)]
 542        args: Vec<String>,
 543        #[serde(default)]
 544        env: HashMap<String, String>,
 545    }
 546
 547    let meta = match method {
 548        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 549        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 550        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 551        _ => None,
 552    }?;
 553    let terminal_auth =
 554        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 555
 556    Some(acp_thread::build_terminal_auth_task(
 557        terminal_auth_task_id(agent_id, method_id),
 558        terminal_auth.label.clone(),
 559        terminal_auth.command,
 560        terminal_auth.args,
 561        terminal_auth.env,
 562    ))
 563}
 564
 565impl AgentConnection for AcpConnection {
 566    fn agent_id(&self) -> AgentId {
 567        self.id.clone()
 568    }
 569
 570    fn telemetry_id(&self) -> SharedString {
 571        self.telemetry_id.clone()
 572    }
 573
 574    fn new_session(
 575        self: Rc<Self>,
 576        project: Entity<Project>,
 577        work_dirs: PathList,
 578        cx: &mut App,
 579    ) -> Task<Result<Entity<AcpThread>>> {
 580        // TODO: remove this once ACP supports multiple working directories
 581        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 582            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 583        };
 584        let name = self.id.0.clone();
 585        let mcp_servers = mcp_servers_for_project(&project, cx);
 586
 587        cx.spawn(async move |cx| {
 588            let response = self.connection
 589                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
 590                .await
 591                .map_err(map_acp_error)?;
 592
 593            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
 594
 595            if let Some(default_mode) = self.default_mode.clone() {
 596                if let Some(modes) = modes.as_ref() {
 597                    let mut modes_ref = modes.borrow_mut();
 598                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 599
 600                    if has_mode {
 601                        let initial_mode_id = modes_ref.current_mode_id.clone();
 602
 603                        cx.spawn({
 604                            let default_mode = default_mode.clone();
 605                            let session_id = response.session_id.clone();
 606                            let modes = modes.clone();
 607                            let conn = self.connection.clone();
 608                            async move |_| {
 609                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 610                                .await.log_err();
 611
 612                                if result.is_none() {
 613                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 614                                }
 615                            }
 616                        }).detach();
 617
 618                        modes_ref.current_mode_id = default_mode;
 619                    } else {
 620                        let available_modes = modes_ref
 621                            .available_modes
 622                            .iter()
 623                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 624                            .collect::<Vec<_>>()
 625                            .join("\n");
 626
 627                        log::warn!(
 628                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 629                        );
 630                    }
 631                }
 632            }
 633
 634            if let Some(default_model) = self.default_model.clone() {
 635                if let Some(models) = models.as_ref() {
 636                    let mut models_ref = models.borrow_mut();
 637                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 638
 639                    if has_model {
 640                        let initial_model_id = models_ref.current_model_id.clone();
 641
 642                        cx.spawn({
 643                            let default_model = default_model.clone();
 644                            let session_id = response.session_id.clone();
 645                            let models = models.clone();
 646                            let conn = self.connection.clone();
 647                            async move |_| {
 648                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 649                                .await.log_err();
 650
 651                                if result.is_none() {
 652                                    models.borrow_mut().current_model_id = initial_model_id;
 653                                }
 654                            }
 655                        }).detach();
 656
 657                        models_ref.current_model_id = default_model;
 658                    } else {
 659                        let available_models = models_ref
 660                            .available_models
 661                            .iter()
 662                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 663                            .collect::<Vec<_>>()
 664                            .join("\n");
 665
 666                        log::warn!(
 667                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 668                        );
 669                    }
 670                }
 671            }
 672
 673            if let Some(config_opts) = config_options.as_ref() {
 674                self.apply_default_config_options(&response.session_id, config_opts, cx);
 675            }
 676
 677            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 678            let thread: Entity<AcpThread> = cx.new(|cx| {
 679                AcpThread::new(
 680                    None,
 681                    None,
 682                    Some(work_dirs),
 683                    self.clone(),
 684                    project,
 685                    action_log,
 686                    response.session_id.clone(),
 687                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 688                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 689                    cx,
 690                )
 691            });
 692
 693            self.sessions.borrow_mut().insert(
 694                response.session_id,
 695                AcpSession {
 696                    thread: thread.downgrade(),
 697                    suppress_abort_err: false,
 698                    session_modes: modes,
 699                    models,
 700                    config_options: config_options.map(ConfigOptions::new),
 701                },
 702            );
 703
 704            Ok(thread)
 705        })
 706    }
 707
 708    fn supports_load_session(&self) -> bool {
 709        self.agent_capabilities.load_session
 710    }
 711
 712    fn supports_resume_session(&self) -> bool {
 713        self.agent_capabilities
 714            .session_capabilities
 715            .resume
 716            .is_some()
 717    }
 718
 719    fn load_session(
 720        self: Rc<Self>,
 721        session_id: acp::SessionId,
 722        project: Entity<Project>,
 723        work_dirs: PathList,
 724        title: Option<SharedString>,
 725        cx: &mut App,
 726    ) -> Task<Result<Entity<AcpThread>>> {
 727        if !self.agent_capabilities.load_session {
 728            return Task::ready(Err(anyhow!(LoadError::Other(
 729                "Loading sessions is not supported by this agent.".into()
 730            ))));
 731        }
 732        // TODO: remove this once ACP supports multiple working directories
 733        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 734            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 735        };
 736
 737        let mcp_servers = mcp_servers_for_project(&project, cx);
 738        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 739        let thread: Entity<AcpThread> = cx.new(|cx| {
 740            AcpThread::new(
 741                None,
 742                title,
 743                Some(work_dirs.clone()),
 744                self.clone(),
 745                project,
 746                action_log,
 747                session_id.clone(),
 748                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 749                cx,
 750            )
 751        });
 752
 753        self.sessions.borrow_mut().insert(
 754            session_id.clone(),
 755            AcpSession {
 756                thread: thread.downgrade(),
 757                suppress_abort_err: false,
 758                session_modes: None,
 759                models: None,
 760                config_options: None,
 761            },
 762        );
 763
 764        cx.spawn(async move |cx| {
 765            let response = match self
 766                .connection
 767                .load_session(
 768                    acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
 769                )
 770                .await
 771            {
 772                Ok(response) => response,
 773                Err(err) => {
 774                    self.sessions.borrow_mut().remove(&session_id);
 775                    return Err(map_acp_error(err));
 776                }
 777            };
 778
 779            let (modes, models, config_options) =
 780                config_state(response.modes, response.models, response.config_options);
 781
 782            if let Some(config_opts) = config_options.as_ref() {
 783                self.apply_default_config_options(&session_id, config_opts, cx);
 784            }
 785
 786            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 787                session.session_modes = modes;
 788                session.models = models;
 789                session.config_options = config_options.map(ConfigOptions::new);
 790            }
 791
 792            Ok(thread)
 793        })
 794    }
 795
 796    fn resume_session(
 797        self: Rc<Self>,
 798        session_id: acp::SessionId,
 799        project: Entity<Project>,
 800        work_dirs: PathList,
 801        title: Option<SharedString>,
 802        cx: &mut App,
 803    ) -> Task<Result<Entity<AcpThread>>> {
 804        if self
 805            .agent_capabilities
 806            .session_capabilities
 807            .resume
 808            .is_none()
 809        {
 810            return Task::ready(Err(anyhow!(LoadError::Other(
 811                "Resuming sessions is not supported by this agent.".into()
 812            ))));
 813        }
 814        // TODO: remove this once ACP supports multiple working directories
 815        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 816            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 817        };
 818
 819        let mcp_servers = mcp_servers_for_project(&project, cx);
 820        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 821        let thread: Entity<AcpThread> = cx.new(|cx| {
 822            AcpThread::new(
 823                None,
 824                title,
 825                Some(work_dirs),
 826                self.clone(),
 827                project,
 828                action_log,
 829                session_id.clone(),
 830                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 831                cx,
 832            )
 833        });
 834
 835        self.sessions.borrow_mut().insert(
 836            session_id.clone(),
 837            AcpSession {
 838                thread: thread.downgrade(),
 839                suppress_abort_err: false,
 840                session_modes: None,
 841                models: None,
 842                config_options: None,
 843            },
 844        );
 845
 846        cx.spawn(async move |cx| {
 847            let response = match self
 848                .connection
 849                .resume_session(
 850                    acp::ResumeSessionRequest::new(session_id.clone(), cwd)
 851                        .mcp_servers(mcp_servers),
 852                )
 853                .await
 854            {
 855                Ok(response) => response,
 856                Err(err) => {
 857                    self.sessions.borrow_mut().remove(&session_id);
 858                    return Err(map_acp_error(err));
 859                }
 860            };
 861
 862            let (modes, models, config_options) =
 863                config_state(response.modes, response.models, response.config_options);
 864
 865            if let Some(config_opts) = config_options.as_ref() {
 866                self.apply_default_config_options(&session_id, config_opts, cx);
 867            }
 868
 869            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 870                session.session_modes = modes;
 871                session.models = models;
 872                session.config_options = config_options.map(ConfigOptions::new);
 873            }
 874
 875            Ok(thread)
 876        })
 877    }
 878
 879    fn supports_close_session(&self) -> bool {
 880        self.agent_capabilities.session_capabilities.close.is_some()
 881    }
 882
 883    fn close_session(
 884        self: Rc<Self>,
 885        session_id: &acp::SessionId,
 886        cx: &mut App,
 887    ) -> Task<Result<()>> {
 888        if !self.supports_close_session() {
 889            return Task::ready(Err(anyhow!(LoadError::Other(
 890                "Closing sessions is not supported by this agent.".into()
 891            ))));
 892        }
 893
 894        let conn = self.connection.clone();
 895        let session_id = session_id.clone();
 896        cx.foreground_executor().spawn(async move {
 897            conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
 898                .await?;
 899            self.sessions.borrow_mut().remove(&session_id);
 900            Ok(())
 901        })
 902    }
 903
 904    fn auth_methods(&self) -> &[acp::AuthMethod] {
 905        &self.auth_methods
 906    }
 907
 908    fn terminal_auth_task(
 909        &self,
 910        method_id: &acp::AuthMethodId,
 911        cx: &App,
 912    ) -> Option<Task<Result<SpawnInTerminal>>> {
 913        let method = self
 914            .auth_methods
 915            .iter()
 916            .find(|method| method.id() == method_id)?;
 917
 918        match method {
 919            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
 920                let agent_id = self.id.clone();
 921                let terminal = terminal.clone();
 922                let store = self.agent_server_store.clone();
 923                Some(cx.spawn(async move |cx| {
 924                    let command = store
 925                        .update(cx, |store, cx| {
 926                            let agent = store
 927                                .get_external_agent(&agent_id)
 928                                .context("Agent server not found")?;
 929                            anyhow::Ok(agent.get_command(
 930                                terminal.args.clone(),
 931                                HashMap::from_iter(terminal.env.clone()),
 932                                &mut cx.to_async(),
 933                            ))
 934                        })?
 935                        .context("Failed to get agent command")?
 936                        .await?;
 937                    Ok(terminal_auth_task(&command, &agent_id, &terminal))
 938                }))
 939            }
 940            _ => meta_terminal_auth_task(&self.id, method_id, method)
 941                .map(|task| Task::ready(Ok(task))),
 942        }
 943    }
 944
 945    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 946        let conn = self.connection.clone();
 947        cx.foreground_executor().spawn(async move {
 948            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 949                .await?;
 950            Ok(())
 951        })
 952    }
 953
 954    fn prompt(
 955        &self,
 956        _id: Option<acp_thread::UserMessageId>,
 957        params: acp::PromptRequest,
 958        cx: &mut App,
 959    ) -> Task<Result<acp::PromptResponse>> {
 960        let conn = self.connection.clone();
 961        let sessions = self.sessions.clone();
 962        let session_id = params.session_id.clone();
 963        cx.foreground_executor().spawn(async move {
 964            let result = conn.prompt(params).await;
 965
 966            let mut suppress_abort_err = false;
 967
 968            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 969                suppress_abort_err = session.suppress_abort_err;
 970                session.suppress_abort_err = false;
 971            }
 972
 973            match result {
 974                Ok(response) => Ok(response),
 975                Err(err) => {
 976                    if err.code == acp::ErrorCode::AuthRequired {
 977                        return Err(anyhow!(acp::Error::auth_required()));
 978                    }
 979
 980                    if err.code != ErrorCode::InternalError {
 981                        anyhow::bail!(err)
 982                    }
 983
 984                    let Some(data) = &err.data else {
 985                        anyhow::bail!(err)
 986                    };
 987
 988                    // Temporary workaround until the following PR is generally available:
 989                    // https://github.com/google-gemini/gemini-cli/pull/6656
 990
 991                    #[derive(Deserialize)]
 992                    #[serde(deny_unknown_fields)]
 993                    struct ErrorDetails {
 994                        details: Box<str>,
 995                    }
 996
 997                    match serde_json::from_value(data.clone()) {
 998                        Ok(ErrorDetails { details }) => {
 999                            if suppress_abort_err
1000                                && (details.contains("This operation was aborted")
1001                                    || details.contains("The user aborted a request"))
1002                            {
1003                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1004                            } else {
1005                                Err(anyhow!(details))
1006                            }
1007                        }
1008                        Err(_) => Err(anyhow!(err)),
1009                    }
1010                }
1011            }
1012        })
1013    }
1014
1015    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1016        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1017            session.suppress_abort_err = true;
1018        }
1019        let conn = self.connection.clone();
1020        let params = acp::CancelNotification::new(session_id.clone());
1021        cx.foreground_executor()
1022            .spawn(async move { conn.cancel(params).await })
1023            .detach();
1024    }
1025
1026    fn session_modes(
1027        &self,
1028        session_id: &acp::SessionId,
1029        _cx: &App,
1030    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1031        let sessions = self.sessions.clone();
1032        let sessions_ref = sessions.borrow();
1033        let Some(session) = sessions_ref.get(session_id) else {
1034            return None;
1035        };
1036
1037        if let Some(modes) = session.session_modes.as_ref() {
1038            Some(Rc::new(AcpSessionModes {
1039                connection: self.connection.clone(),
1040                session_id: session_id.clone(),
1041                state: modes.clone(),
1042            }) as _)
1043        } else {
1044            None
1045        }
1046    }
1047
1048    fn model_selector(
1049        &self,
1050        session_id: &acp::SessionId,
1051    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1052        let sessions = self.sessions.clone();
1053        let sessions_ref = sessions.borrow();
1054        let Some(session) = sessions_ref.get(session_id) else {
1055            return None;
1056        };
1057
1058        if let Some(models) = session.models.as_ref() {
1059            Some(Rc::new(AcpModelSelector::new(
1060                session_id.clone(),
1061                self.connection.clone(),
1062                models.clone(),
1063            )) as _)
1064        } else {
1065            None
1066        }
1067    }
1068
1069    fn session_config_options(
1070        &self,
1071        session_id: &acp::SessionId,
1072        _cx: &App,
1073    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1074        let sessions = self.sessions.borrow();
1075        let session = sessions.get(session_id)?;
1076
1077        let config_opts = session.config_options.as_ref()?;
1078
1079        Some(Rc::new(AcpSessionConfigOptions {
1080            session_id: session_id.clone(),
1081            connection: self.connection.clone(),
1082            state: config_opts.config_options.clone(),
1083            watch_tx: config_opts.tx.clone(),
1084            watch_rx: config_opts.rx.clone(),
1085        }) as _)
1086    }
1087
1088    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1089        self.session_list.clone().map(|s| s as _)
1090    }
1091
1092    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1093        self
1094    }
1095}
1096
1097fn map_acp_error(err: acp::Error) -> anyhow::Error {
1098    if err.code == acp::ErrorCode::AuthRequired {
1099        let mut error = AuthRequired::new();
1100
1101        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1102            error = error.with_description(err.message);
1103        }
1104
1105        anyhow!(error)
1106    } else {
1107        anyhow!(err)
1108    }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113    use super::*;
1114
1115    #[test]
1116    fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1117        let command = AgentServerCommand {
1118            path: "/path/to/agent".into(),
1119            args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1120            env: Some(HashMap::from_iter([
1121                ("BASE".into(), "1".into()),
1122                ("SHARED".into(), "override".into()),
1123                ("EXTRA".into(), "2".into()),
1124            ])),
1125        };
1126        let method = acp::AuthMethodTerminal::new("login", "Login");
1127
1128        let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1129
1130        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1131        assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1132        assert_eq!(
1133            task.env,
1134            HashMap::from_iter([
1135                ("BASE".into(), "1".into()),
1136                ("SHARED".into(), "override".into()),
1137                ("EXTRA".into(), "2".into()),
1138            ])
1139        );
1140        assert_eq!(task.label, "Login");
1141        assert_eq!(task.command_label, "Login");
1142    }
1143
1144    #[test]
1145    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1146        let method_id = acp::AuthMethodId::new("legacy-login");
1147        let method = acp::AuthMethod::Agent(
1148            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1149                "terminal-auth".to_string(),
1150                serde_json::json!({
1151                    "label": "legacy /auth",
1152                    "command": "legacy-agent",
1153                    "args": ["auth", "--interactive"],
1154                    "env": {
1155                        "AUTH_MODE": "interactive",
1156                    },
1157                }),
1158            )])),
1159        );
1160
1161        let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1162            .expect("expected legacy terminal auth task");
1163
1164        assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1165        assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1166        assert_eq!(task.args, vec!["auth", "--interactive"]);
1167        assert_eq!(
1168            task.env,
1169            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1170        );
1171        assert_eq!(task.label, "legacy /auth");
1172    }
1173
1174    #[test]
1175    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1176        let method_id = acp::AuthMethodId::new("legacy-login");
1177        let method = acp::AuthMethod::Agent(
1178            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1179                "terminal-auth".to_string(),
1180                serde_json::json!({
1181                    "label": "legacy /auth",
1182                }),
1183            )])),
1184        );
1185
1186        assert!(
1187            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1188        );
1189    }
1190
1191    #[test]
1192    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1193        let method_id = acp::AuthMethodId::new("login");
1194        let method = acp::AuthMethod::Terminal(
1195            acp::AuthMethodTerminal::new(method_id, "Login")
1196                .args(vec!["/auth".into()])
1197                .env(std::collections::HashMap::from_iter([(
1198                    "AUTH_MODE".into(),
1199                    "first-class".into(),
1200                )]))
1201                .meta(acp::Meta::from_iter([(
1202                    "terminal-auth".to_string(),
1203                    serde_json::json!({
1204                        "label": "legacy /auth",
1205                        "command": "legacy-agent",
1206                        "args": ["legacy-auth"],
1207                        "env": {
1208                            "AUTH_MODE": "legacy",
1209                        },
1210                    }),
1211                )])),
1212        );
1213
1214        let command = AgentServerCommand {
1215            path: "/path/to/agent".into(),
1216            args: vec!["--acp".into(), "/auth".into()],
1217            env: Some(HashMap::from_iter([
1218                ("BASE".into(), "1".into()),
1219                ("AUTH_MODE".into(), "first-class".into()),
1220            ])),
1221        };
1222
1223        let task = match &method {
1224            acp::AuthMethod::Terminal(terminal) => {
1225                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1226            }
1227            _ => unreachable!(),
1228        };
1229
1230        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1231        assert_eq!(task.args, vec!["--acp", "/auth"]);
1232        assert_eq!(
1233            task.env,
1234            HashMap::from_iter([
1235                ("BASE".into(), "1".into()),
1236                ("AUTH_MODE".into(), "first-class".into()),
1237            ])
1238        );
1239        assert_eq!(task.label, "Login");
1240    }
1241}
1242
1243fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1244    let context_server_store = project.read(cx).context_server_store().read(cx);
1245    let is_local = project.read(cx).is_local();
1246    context_server_store
1247        .configured_server_ids()
1248        .iter()
1249        .filter_map(|id| {
1250            let configuration = context_server_store.configuration_for_server(id)?;
1251            match &*configuration {
1252                project::context_server_store::ContextServerConfiguration::Custom {
1253                    command,
1254                    remote,
1255                    ..
1256                }
1257                | project::context_server_store::ContextServerConfiguration::Extension {
1258                    command,
1259                    remote,
1260                    ..
1261                } if is_local || *remote => Some(acp::McpServer::Stdio(
1262                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
1263                        .args(command.args.clone())
1264                        .env(if let Some(env) = command.env.as_ref() {
1265                            env.iter()
1266                                .map(|(name, value)| acp::EnvVariable::new(name, value))
1267                                .collect()
1268                        } else {
1269                            vec![]
1270                        }),
1271                )),
1272                project::context_server_store::ContextServerConfiguration::Http {
1273                    url,
1274                    headers,
1275                    timeout: _,
1276                } => Some(acp::McpServer::Http(
1277                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1278                        headers
1279                            .iter()
1280                            .map(|(name, value)| acp::HttpHeader::new(name, value))
1281                            .collect(),
1282                    ),
1283                )),
1284                _ => None,
1285            }
1286        })
1287        .collect()
1288}
1289
1290fn config_state(
1291    modes: Option<acp::SessionModeState>,
1292    models: Option<acp::SessionModelState>,
1293    config_options: Option<Vec<acp::SessionConfigOption>>,
1294) -> (
1295    Option<Rc<RefCell<acp::SessionModeState>>>,
1296    Option<Rc<RefCell<acp::SessionModelState>>>,
1297    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1298) {
1299    if let Some(opts) = config_options {
1300        return (None, None, Some(Rc::new(RefCell::new(opts))));
1301    }
1302
1303    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1304    let models = models.map(|models| Rc::new(RefCell::new(models)));
1305    (modes, models, None)
1306}
1307
1308struct AcpSessionModes {
1309    session_id: acp::SessionId,
1310    connection: Rc<acp::ClientSideConnection>,
1311    state: Rc<RefCell<acp::SessionModeState>>,
1312}
1313
1314impl acp_thread::AgentSessionModes for AcpSessionModes {
1315    fn current_mode(&self) -> acp::SessionModeId {
1316        self.state.borrow().current_mode_id.clone()
1317    }
1318
1319    fn all_modes(&self) -> Vec<acp::SessionMode> {
1320        self.state.borrow().available_modes.clone()
1321    }
1322
1323    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1324        let connection = self.connection.clone();
1325        let session_id = self.session_id.clone();
1326        let old_mode_id;
1327        {
1328            let mut state = self.state.borrow_mut();
1329            old_mode_id = state.current_mode_id.clone();
1330            state.current_mode_id = mode_id.clone();
1331        };
1332        let state = self.state.clone();
1333        cx.foreground_executor().spawn(async move {
1334            let result = connection
1335                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1336                .await;
1337
1338            if result.is_err() {
1339                state.borrow_mut().current_mode_id = old_mode_id;
1340            }
1341
1342            result?;
1343
1344            Ok(())
1345        })
1346    }
1347}
1348
1349struct AcpModelSelector {
1350    session_id: acp::SessionId,
1351    connection: Rc<acp::ClientSideConnection>,
1352    state: Rc<RefCell<acp::SessionModelState>>,
1353}
1354
1355impl AcpModelSelector {
1356    fn new(
1357        session_id: acp::SessionId,
1358        connection: Rc<acp::ClientSideConnection>,
1359        state: Rc<RefCell<acp::SessionModelState>>,
1360    ) -> Self {
1361        Self {
1362            session_id,
1363            connection,
1364            state,
1365        }
1366    }
1367}
1368
1369impl acp_thread::AgentModelSelector for AcpModelSelector {
1370    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1371        Task::ready(Ok(acp_thread::AgentModelList::Flat(
1372            self.state
1373                .borrow()
1374                .available_models
1375                .clone()
1376                .into_iter()
1377                .map(acp_thread::AgentModelInfo::from)
1378                .collect(),
1379        )))
1380    }
1381
1382    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1383        let connection = self.connection.clone();
1384        let session_id = self.session_id.clone();
1385        let old_model_id;
1386        {
1387            let mut state = self.state.borrow_mut();
1388            old_model_id = state.current_model_id.clone();
1389            state.current_model_id = model_id.clone();
1390        };
1391        let state = self.state.clone();
1392        cx.foreground_executor().spawn(async move {
1393            let result = connection
1394                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1395                .await;
1396
1397            if result.is_err() {
1398                state.borrow_mut().current_model_id = old_model_id;
1399            }
1400
1401            result?;
1402
1403            Ok(())
1404        })
1405    }
1406
1407    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1408        let state = self.state.borrow();
1409        Task::ready(
1410            state
1411                .available_models
1412                .iter()
1413                .find(|m| m.model_id == state.current_model_id)
1414                .cloned()
1415                .map(acp_thread::AgentModelInfo::from)
1416                .ok_or_else(|| anyhow::anyhow!("Model not found")),
1417        )
1418    }
1419}
1420
1421struct AcpSessionConfigOptions {
1422    session_id: acp::SessionId,
1423    connection: Rc<acp::ClientSideConnection>,
1424    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1425    watch_tx: Rc<RefCell<watch::Sender<()>>>,
1426    watch_rx: watch::Receiver<()>,
1427}
1428
1429impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1430    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1431        self.state.borrow().clone()
1432    }
1433
1434    fn set_config_option(
1435        &self,
1436        config_id: acp::SessionConfigId,
1437        value: acp::SessionConfigValueId,
1438        cx: &mut App,
1439    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1440        let connection = self.connection.clone();
1441        let session_id = self.session_id.clone();
1442        let state = self.state.clone();
1443
1444        let watch_tx = self.watch_tx.clone();
1445
1446        cx.foreground_executor().spawn(async move {
1447            let response = connection
1448                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1449                    session_id, config_id, value,
1450                ))
1451                .await?;
1452
1453            *state.borrow_mut() = response.config_options.clone();
1454            watch_tx.borrow_mut().send(()).ok();
1455            Ok(response.config_options)
1456        })
1457    }
1458
1459    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1460        Some(self.watch_rx.clone())
1461    }
1462}
1463
1464struct ClientDelegate {
1465    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1466    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1467    cx: AsyncApp,
1468}
1469
1470#[async_trait::async_trait(?Send)]
1471impl acp::Client for ClientDelegate {
1472    async fn request_permission(
1473        &self,
1474        arguments: acp::RequestPermissionRequest,
1475    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1476        let thread;
1477        {
1478            let sessions_ref = self.sessions.borrow();
1479            let session = sessions_ref
1480                .get(&arguments.session_id)
1481                .context("Failed to get session")?;
1482            thread = session.thread.clone();
1483        }
1484
1485        let cx = &mut self.cx.clone();
1486
1487        let task = thread.update(cx, |thread, cx| {
1488            thread.request_tool_call_authorization(
1489                arguments.tool_call,
1490                acp_thread::PermissionOptions::Flat(arguments.options),
1491                cx,
1492            )
1493        })??;
1494
1495        let outcome = task.await;
1496
1497        Ok(acp::RequestPermissionResponse::new(outcome.into()))
1498    }
1499
1500    async fn write_text_file(
1501        &self,
1502        arguments: acp::WriteTextFileRequest,
1503    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1504        let cx = &mut self.cx.clone();
1505        let task = self
1506            .session_thread(&arguments.session_id)?
1507            .update(cx, |thread, cx| {
1508                thread.write_text_file(arguments.path, arguments.content, cx)
1509            })?;
1510
1511        task.await?;
1512
1513        Ok(Default::default())
1514    }
1515
1516    async fn read_text_file(
1517        &self,
1518        arguments: acp::ReadTextFileRequest,
1519    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1520        let task = self.session_thread(&arguments.session_id)?.update(
1521            &mut self.cx.clone(),
1522            |thread, cx| {
1523                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1524            },
1525        )?;
1526
1527        let content = task.await?;
1528
1529        Ok(acp::ReadTextFileResponse::new(content))
1530    }
1531
1532    async fn session_notification(
1533        &self,
1534        notification: acp::SessionNotification,
1535    ) -> Result<(), acp::Error> {
1536        let sessions = self.sessions.borrow();
1537        let session = sessions
1538            .get(&notification.session_id)
1539            .context("Failed to get session")?;
1540
1541        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1542            current_mode_id,
1543            ..
1544        }) = &notification.update
1545        {
1546            if let Some(session_modes) = &session.session_modes {
1547                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1548            }
1549        }
1550
1551        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1552            config_options,
1553            ..
1554        }) = &notification.update
1555        {
1556            if let Some(opts) = &session.config_options {
1557                *opts.config_options.borrow_mut() = config_options.clone();
1558                opts.tx.borrow_mut().send(()).ok();
1559            }
1560        }
1561
1562        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
1563            && let Some(session_list) = self.session_list.borrow().as_ref()
1564        {
1565            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1566        }
1567
1568        // Clone so we can inspect meta both before and after handing off to the thread
1569        let update_clone = notification.update.clone();
1570
1571        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1572        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1573            if let Some(meta) = &tc.meta {
1574                if let Some(terminal_info) = meta.get("terminal_info") {
1575                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1576                    {
1577                        let terminal_id = acp::TerminalId::new(id_str);
1578                        let cwd = terminal_info
1579                            .get("cwd")
1580                            .and_then(|v| v.as_str().map(PathBuf::from));
1581
1582                        // Create a minimal display-only lower-level terminal and register it.
1583                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1584                            let builder = TerminalBuilder::new_display_only(
1585                                CursorShape::default(),
1586                                AlternateScroll::On,
1587                                None,
1588                                0,
1589                                cx.background_executor(),
1590                                thread.project().read(cx).path_style(cx),
1591                            )?;
1592                            let lower = cx.new(|cx| builder.subscribe(cx));
1593                            thread.on_terminal_provider_event(
1594                                TerminalProviderEvent::Created {
1595                                    terminal_id,
1596                                    label: tc.title.clone(),
1597                                    cwd,
1598                                    output_byte_limit: None,
1599                                    terminal: lower,
1600                                },
1601                                cx,
1602                            );
1603                            anyhow::Ok(())
1604                        });
1605                    }
1606                }
1607            }
1608        }
1609
1610        // Forward the update to the acp_thread as usual.
1611        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1612            thread.handle_session_update(notification.update.clone(), cx)
1613        })??;
1614
1615        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1616        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1617            if let Some(meta) = &tcu.meta {
1618                if let Some(term_out) = meta.get("terminal_output") {
1619                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1620                        let terminal_id = acp::TerminalId::new(id_str);
1621                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1622                            let data = s.as_bytes().to_vec();
1623                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1624                                thread.on_terminal_provider_event(
1625                                    TerminalProviderEvent::Output { terminal_id, data },
1626                                    cx,
1627                                );
1628                            });
1629                        }
1630                    }
1631                }
1632
1633                // terminal_exit
1634                if let Some(term_exit) = meta.get("terminal_exit") {
1635                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1636                        let terminal_id = acp::TerminalId::new(id_str);
1637                        let status = acp::TerminalExitStatus::new()
1638                            .exit_code(
1639                                term_exit
1640                                    .get("exit_code")
1641                                    .and_then(|v| v.as_u64())
1642                                    .map(|i| i as u32),
1643                            )
1644                            .signal(
1645                                term_exit
1646                                    .get("signal")
1647                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1648                            );
1649
1650                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1651                            thread.on_terminal_provider_event(
1652                                TerminalProviderEvent::Exit {
1653                                    terminal_id,
1654                                    status,
1655                                },
1656                                cx,
1657                            );
1658                        });
1659                    }
1660                }
1661            }
1662        }
1663
1664        Ok(())
1665    }
1666
1667    async fn create_terminal(
1668        &self,
1669        args: acp::CreateTerminalRequest,
1670    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1671        let thread = self.session_thread(&args.session_id)?;
1672        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1673
1674        let terminal_entity = acp_thread::create_terminal_entity(
1675            args.command.clone(),
1676            &args.args,
1677            args.env
1678                .into_iter()
1679                .map(|env| (env.name, env.value))
1680                .collect(),
1681            args.cwd.clone(),
1682            &project,
1683            &mut self.cx.clone(),
1684        )
1685        .await?;
1686
1687        // Register with renderer
1688        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1689            thread.register_terminal_created(
1690                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1691                format!("{} {}", args.command, args.args.join(" ")),
1692                args.cwd.clone(),
1693                args.output_byte_limit,
1694                terminal_entity,
1695                cx,
1696            )
1697        })?;
1698        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1699        Ok(acp::CreateTerminalResponse::new(terminal_id))
1700    }
1701
1702    async fn kill_terminal(
1703        &self,
1704        args: acp::KillTerminalRequest,
1705    ) -> Result<acp::KillTerminalResponse, acp::Error> {
1706        self.session_thread(&args.session_id)?
1707            .update(&mut self.cx.clone(), |thread, cx| {
1708                thread.kill_terminal(args.terminal_id, cx)
1709            })??;
1710
1711        Ok(Default::default())
1712    }
1713
1714    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1715        Err(acp::Error::method_not_found())
1716    }
1717
1718    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1719        Err(acp::Error::method_not_found())
1720    }
1721
1722    async fn release_terminal(
1723        &self,
1724        args: acp::ReleaseTerminalRequest,
1725    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1726        self.session_thread(&args.session_id)?
1727            .update(&mut self.cx.clone(), |thread, cx| {
1728                thread.release_terminal(args.terminal_id, cx)
1729            })??;
1730
1731        Ok(Default::default())
1732    }
1733
1734    async fn terminal_output(
1735        &self,
1736        args: acp::TerminalOutputRequest,
1737    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1738        self.session_thread(&args.session_id)?
1739            .read_with(&mut self.cx.clone(), |thread, cx| {
1740                let out = thread
1741                    .terminal(args.terminal_id)?
1742                    .read(cx)
1743                    .current_output(cx);
1744
1745                Ok(out)
1746            })?
1747    }
1748
1749    async fn wait_for_terminal_exit(
1750        &self,
1751        args: acp::WaitForTerminalExitRequest,
1752    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1753        let exit_status = self
1754            .session_thread(&args.session_id)?
1755            .update(&mut self.cx.clone(), |thread, cx| {
1756                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1757            })??
1758            .await;
1759
1760        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1761    }
1762}
1763
1764impl ClientDelegate {
1765    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1766        let sessions = self.sessions.borrow();
1767        sessions
1768            .get(session_id)
1769            .context("Failed to get session")
1770            .map(|session| session.thread.clone())
1771    }
1772}