acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::io::BufReader;
  13use project::agent_server_store::AgentServerCommand;
  14use project::{AgentId, Project};
  15use serde::Deserialize;
  16use settings::Settings as _;
  17use task::{ShellBuilder, SpawnInTerminal};
  18use util::ResultExt as _;
  19use util::path_list::PathList;
  20use util::process::Child;
  21
  22use std::path::PathBuf;
  23use std::process::Stdio;
  24use std::rc::Rc;
  25use std::{any::Any, cell::RefCell};
  26use thiserror::Error;
  27
  28use anyhow::{Context as _, Result};
  29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  30
  31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  32use terminal::TerminalBuilder;
  33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  34
  35use crate::GEMINI_ID;
  36
  37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  38
  39#[derive(Debug, Error)]
  40#[error("Unsupported version")]
  41pub struct UnsupportedVersion;
  42
  43pub struct AcpConnection {
  44    id: AgentId,
  45    telemetry_id: SharedString,
  46    connection: Rc<acp::ClientSideConnection>,
  47    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  48    auth_methods: Vec<acp::AuthMethod>,
  49    command: AgentServerCommand,
  50    agent_capabilities: acp::AgentCapabilities,
  51    default_mode: Option<acp::SessionModeId>,
  52    default_model: Option<acp::ModelId>,
  53    default_config_options: HashMap<String, String>,
  54    child: Child,
  55    session_list: Option<Rc<AcpSessionList>>,
  56    _io_task: Task<Result<(), acp::Error>>,
  57    _wait_task: Task<Result<()>>,
  58    _stderr_task: Task<Result<()>>,
  59}
  60
  61struct ConfigOptions {
  62    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  63    tx: Rc<RefCell<watch::Sender<()>>>,
  64    rx: watch::Receiver<()>,
  65}
  66
  67impl ConfigOptions {
  68    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  69        let (tx, rx) = watch::channel(());
  70        Self {
  71            config_options,
  72            tx: Rc::new(RefCell::new(tx)),
  73            rx,
  74        }
  75    }
  76}
  77
  78pub struct AcpSession {
  79    thread: WeakEntity<AcpThread>,
  80    suppress_abort_err: bool,
  81    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  82    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  83    config_options: Option<ConfigOptions>,
  84}
  85
  86pub struct AcpSessionList {
  87    connection: Rc<acp::ClientSideConnection>,
  88    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
  89    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
  90}
  91
  92impl AcpSessionList {
  93    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
  94        let (tx, rx) = smol::channel::unbounded();
  95        Self {
  96            connection,
  97            updates_tx: tx,
  98            updates_rx: rx,
  99        }
 100    }
 101
 102    fn notify_update(&self) {
 103        self.updates_tx
 104            .try_send(acp_thread::SessionListUpdate::Refresh)
 105            .log_err();
 106    }
 107
 108    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 109        self.updates_tx
 110            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 111            .log_err();
 112    }
 113}
 114
 115impl AgentSessionList for AcpSessionList {
 116    fn list_sessions(
 117        &self,
 118        request: AgentSessionListRequest,
 119        cx: &mut App,
 120    ) -> Task<Result<AgentSessionListResponse>> {
 121        let conn = self.connection.clone();
 122        cx.foreground_executor().spawn(async move {
 123            let acp_request = acp::ListSessionsRequest::new()
 124                .cwd(request.cwd)
 125                .cursor(request.cursor);
 126            let response = conn.list_sessions(acp_request).await?;
 127            Ok(AgentSessionListResponse {
 128                sessions: response
 129                    .sessions
 130                    .into_iter()
 131                    .map(|s| AgentSessionInfo {
 132                        session_id: s.session_id,
 133                        work_dirs: Some(PathList::new(&[s.cwd])),
 134                        title: s.title.map(Into::into),
 135                        updated_at: s.updated_at.and_then(|date_str| {
 136                            chrono::DateTime::parse_from_rfc3339(&date_str)
 137                                .ok()
 138                                .map(|dt| dt.with_timezone(&chrono::Utc))
 139                        }),
 140                        created_at: None,
 141                        meta: s.meta,
 142                    })
 143                    .collect(),
 144                next_cursor: response.next_cursor,
 145                meta: response.meta,
 146            })
 147        })
 148    }
 149
 150    fn watch(
 151        &self,
 152        _cx: &mut App,
 153    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 154        Some(self.updates_rx.clone())
 155    }
 156
 157    fn notify_refresh(&self) {
 158        self.notify_update();
 159    }
 160
 161    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 162        self
 163    }
 164}
 165
 166pub async fn connect(
 167    agent_id: AgentId,
 168    project: Entity<Project>,
 169    command: AgentServerCommand,
 170    default_mode: Option<acp::SessionModeId>,
 171    default_model: Option<acp::ModelId>,
 172    default_config_options: HashMap<String, String>,
 173    cx: &mut AsyncApp,
 174) -> Result<Rc<dyn AgentConnection>> {
 175    let conn = AcpConnection::stdio(
 176        agent_id,
 177        project,
 178        command.clone(),
 179        default_mode,
 180        default_model,
 181        default_config_options,
 182        cx,
 183    )
 184    .await?;
 185    Ok(Rc::new(conn) as _)
 186}
 187
 188const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 189
 190impl AcpConnection {
 191    pub async fn stdio(
 192        agent_id: AgentId,
 193        project: Entity<Project>,
 194        command: AgentServerCommand,
 195        default_mode: Option<acp::SessionModeId>,
 196        default_model: Option<acp::ModelId>,
 197        default_config_options: HashMap<String, String>,
 198        cx: &mut AsyncApp,
 199    ) -> Result<Self> {
 200        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 201        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 202        let mut child =
 203            builder.build_std_command(Some(command.path.display().to_string()), &command.args);
 204        child.envs(command.env.iter().flatten());
 205        if let Some(cwd) = project.update(cx, |project, cx| {
 206            if project.is_local() {
 207                project
 208                    .default_path_list(cx)
 209                    .ordered_paths()
 210                    .next()
 211                    .cloned()
 212            } else {
 213                None
 214            }
 215        }) {
 216            child.current_dir(cwd);
 217        }
 218        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 219
 220        let stdout = child.stdout.take().context("Failed to take stdout")?;
 221        let stdin = child.stdin.take().context("Failed to take stdin")?;
 222        let stderr = child.stderr.take().context("Failed to take stderr")?;
 223        log::debug!(
 224            "Spawning external agent server: {:?}, {:?}",
 225            command.path,
 226            command.args
 227        );
 228        log::trace!("Spawned (pid: {})", child.id());
 229
 230        let sessions = Rc::new(RefCell::new(HashMap::default()));
 231
 232        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 233            (
 234                release_channel::ReleaseChannel::try_global(cx)
 235                    .map(|release_channel| release_channel.display_name()),
 236                release_channel::AppVersion::global(cx).to_string(),
 237            )
 238        });
 239
 240        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 241            Rc::new(RefCell::new(None));
 242
 243        let client = ClientDelegate {
 244            sessions: sessions.clone(),
 245            session_list: client_session_list.clone(),
 246            cx: cx.clone(),
 247        };
 248        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 249            let foreground_executor = cx.foreground_executor().clone();
 250            move |fut| {
 251                foreground_executor.spawn(fut).detach();
 252            }
 253        });
 254
 255        let io_task = cx.background_spawn(io_task);
 256
 257        let stderr_task = cx.background_spawn(async move {
 258            let mut stderr = BufReader::new(stderr);
 259            let mut line = String::new();
 260            while let Ok(n) = stderr.read_line(&mut line).await
 261                && n > 0
 262            {
 263                log::warn!("agent stderr: {}", line.trim());
 264                line.clear();
 265            }
 266            Ok(())
 267        });
 268
 269        let wait_task = cx.spawn({
 270            let sessions = sessions.clone();
 271            let status_fut = child.status();
 272            async move |cx| {
 273                let status = status_fut.await?;
 274
 275                for session in sessions.borrow().values() {
 276                    session
 277                        .thread
 278                        .update(cx, |thread, cx| {
 279                            thread.emit_load_error(LoadError::Exited { status }, cx)
 280                        })
 281                        .ok();
 282                }
 283
 284                anyhow::Ok(())
 285            }
 286        });
 287
 288        let connection = Rc::new(connection);
 289
 290        cx.update(|cx| {
 291            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 292                registry.set_active_connection(agent_id.clone(), &connection, cx)
 293            });
 294        });
 295
 296        let response = connection
 297            .initialize(
 298                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 299                    .client_capabilities(
 300                        acp::ClientCapabilities::new()
 301                            .fs(acp::FileSystemCapabilities::new()
 302                                .read_text_file(true)
 303                                .write_text_file(true))
 304                            .terminal(true)
 305                            .auth(acp::AuthCapabilities::new().terminal(true))
 306                            // Experimental: Allow for rendering terminal output from the agents
 307                            .meta(acp::Meta::from_iter([
 308                                ("terminal_output".into(), true.into()),
 309                                ("terminal-auth".into(), true.into()),
 310                            ])),
 311                    )
 312                    .client_info(
 313                        acp::Implementation::new("zed", version)
 314                            .title(release_channel.map(ToOwned::to_owned)),
 315                    ),
 316            )
 317            .await?;
 318
 319        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 320            return Err(UnsupportedVersion.into());
 321        }
 322
 323        let telemetry_id = response
 324            .agent_info
 325            // Use the one the agent provides if we have one
 326            .map(|info| info.name.into())
 327            // Otherwise, just use the name
 328            .unwrap_or_else(|| agent_id.0.clone());
 329
 330        let session_list = if response
 331            .agent_capabilities
 332            .session_capabilities
 333            .list
 334            .is_some()
 335        {
 336            let list = Rc::new(AcpSessionList::new(connection.clone()));
 337            *client_session_list.borrow_mut() = Some(list.clone());
 338            Some(list)
 339        } else {
 340            None
 341        };
 342
 343        // TODO: Remove this override once Google team releases their official auth methods
 344        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 345            let mut args = command.args.clone();
 346            args.retain(|a| a != "--experimental-acp" && a != "--acp");
 347            let value = serde_json::json!({
 348                "label": "gemini /auth",
 349                "command": command.path.to_string_lossy().into_owned(),
 350                "args": args,
 351                "env": command.env.clone().unwrap_or_default(),
 352            });
 353            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 354            vec![acp::AuthMethod::Agent(
 355                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 356                    .description("Login with your Google or Vertex AI account")
 357                    .meta(meta),
 358            )]
 359        } else {
 360            response.auth_methods
 361        };
 362        Ok(Self {
 363            id: agent_id,
 364            auth_methods,
 365            command,
 366            connection,
 367            telemetry_id,
 368            sessions,
 369            agent_capabilities: response.agent_capabilities,
 370            default_mode,
 371            default_model,
 372            default_config_options,
 373            session_list,
 374            _io_task: io_task,
 375            _wait_task: wait_task,
 376            _stderr_task: stderr_task,
 377            child,
 378        })
 379    }
 380
 381    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 382        &self.agent_capabilities.prompt_capabilities
 383    }
 384
 385    fn apply_default_config_options(
 386        &self,
 387        session_id: &acp::SessionId,
 388        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 389        cx: &mut AsyncApp,
 390    ) {
 391        let id = self.id.clone();
 392        let defaults_to_apply: Vec<_> = {
 393            let config_opts_ref = config_options.borrow();
 394            config_opts_ref
 395                .iter()
 396                .filter_map(|config_option| {
 397                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 398
 399                    let is_valid = match &config_option.kind {
 400                        acp::SessionConfigKind::Select(select) => match &select.options {
 401                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 402                                .iter()
 403                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 404                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 405                                groups.iter().any(|g| {
 406                                    g.options
 407                                        .iter()
 408                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 409                                })
 410                            }
 411                            _ => false,
 412                        },
 413                        _ => false,
 414                    };
 415
 416                    if is_valid {
 417                        let initial_value = match &config_option.kind {
 418                            acp::SessionConfigKind::Select(select) => {
 419                                Some(select.current_value.clone())
 420                            }
 421                            _ => None,
 422                        };
 423                        Some((
 424                            config_option.id.clone(),
 425                            default_value.clone(),
 426                            initial_value,
 427                        ))
 428                    } else {
 429                        log::warn!(
 430                            "`{}` is not a valid value for config option `{}` in {}",
 431                            default_value,
 432                            config_option.id.0,
 433                            id
 434                        );
 435                        None
 436                    }
 437                })
 438                .collect()
 439        };
 440
 441        for (config_id, default_value, initial_value) in defaults_to_apply {
 442            cx.spawn({
 443                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 444                let session_id = session_id.clone();
 445                let config_id_clone = config_id.clone();
 446                let config_opts = config_options.clone();
 447                let conn = self.connection.clone();
 448                async move |_| {
 449                    let result = conn
 450                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 451                            session_id,
 452                            config_id_clone.clone(),
 453                            default_value_id,
 454                        ))
 455                        .await
 456                        .log_err();
 457
 458                    if result.is_none() {
 459                        if let Some(initial) = initial_value {
 460                            let mut opts = config_opts.borrow_mut();
 461                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 462                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 463                                    select.current_value = initial;
 464                                }
 465                            }
 466                        }
 467                    }
 468                }
 469            })
 470            .detach();
 471
 472            let mut opts = config_options.borrow_mut();
 473            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 474                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 475                    select.current_value = acp::SessionConfigValueId::new(default_value);
 476                }
 477            }
 478        }
 479    }
 480}
 481
 482impl Drop for AcpConnection {
 483    fn drop(&mut self) {
 484        self.child.kill().log_err();
 485    }
 486}
 487
 488fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 489    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 490}
 491
 492fn terminal_auth_task(
 493    command: &AgentServerCommand,
 494    agent_id: &AgentId,
 495    method: &acp::AuthMethodTerminal,
 496) -> SpawnInTerminal {
 497    let mut args = command.args.clone();
 498    args.extend(method.args.clone());
 499
 500    let mut env = command.env.clone().unwrap_or_default();
 501    env.extend(method.env.clone());
 502
 503    acp_thread::build_terminal_auth_task(
 504        terminal_auth_task_id(agent_id, &method.id),
 505        method.name.clone(),
 506        command.path.to_string_lossy().into_owned(),
 507        args,
 508        env,
 509    )
 510}
 511
 512/// Used to support the _meta method prior to stabilization
 513fn meta_terminal_auth_task(
 514    agent_id: &AgentId,
 515    method_id: &acp::AuthMethodId,
 516    method: &acp::AuthMethod,
 517) -> Option<SpawnInTerminal> {
 518    #[derive(Deserialize)]
 519    struct MetaTerminalAuth {
 520        label: String,
 521        command: String,
 522        #[serde(default)]
 523        args: Vec<String>,
 524        #[serde(default)]
 525        env: HashMap<String, String>,
 526    }
 527
 528    let meta = match method {
 529        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 530        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 531        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 532        _ => None,
 533    }?;
 534    let terminal_auth =
 535        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 536
 537    Some(acp_thread::build_terminal_auth_task(
 538        terminal_auth_task_id(agent_id, method_id),
 539        terminal_auth.label.clone(),
 540        terminal_auth.command,
 541        terminal_auth.args,
 542        terminal_auth.env,
 543    ))
 544}
 545
 546impl AgentConnection for AcpConnection {
 547    fn agent_id(&self) -> AgentId {
 548        self.id.clone()
 549    }
 550
 551    fn telemetry_id(&self) -> SharedString {
 552        self.telemetry_id.clone()
 553    }
 554
 555    fn new_session(
 556        self: Rc<Self>,
 557        project: Entity<Project>,
 558        work_dirs: PathList,
 559        cx: &mut App,
 560    ) -> Task<Result<Entity<AcpThread>>> {
 561        // TODO: remove this once ACP supports multiple working directories
 562        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 563            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 564        };
 565        let name = self.id.0.clone();
 566        let mcp_servers = mcp_servers_for_project(&project, cx);
 567
 568        cx.spawn(async move |cx| {
 569            let response = self.connection
 570                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
 571                .await
 572                .map_err(map_acp_error)?;
 573
 574            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
 575
 576            if let Some(default_mode) = self.default_mode.clone() {
 577                if let Some(modes) = modes.as_ref() {
 578                    let mut modes_ref = modes.borrow_mut();
 579                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 580
 581                    if has_mode {
 582                        let initial_mode_id = modes_ref.current_mode_id.clone();
 583
 584                        cx.spawn({
 585                            let default_mode = default_mode.clone();
 586                            let session_id = response.session_id.clone();
 587                            let modes = modes.clone();
 588                            let conn = self.connection.clone();
 589                            async move |_| {
 590                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 591                                .await.log_err();
 592
 593                                if result.is_none() {
 594                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 595                                }
 596                            }
 597                        }).detach();
 598
 599                        modes_ref.current_mode_id = default_mode;
 600                    } else {
 601                        let available_modes = modes_ref
 602                            .available_modes
 603                            .iter()
 604                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 605                            .collect::<Vec<_>>()
 606                            .join("\n");
 607
 608                        log::warn!(
 609                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 610                        );
 611                    }
 612                }
 613            }
 614
 615            if let Some(default_model) = self.default_model.clone() {
 616                if let Some(models) = models.as_ref() {
 617                    let mut models_ref = models.borrow_mut();
 618                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 619
 620                    if has_model {
 621                        let initial_model_id = models_ref.current_model_id.clone();
 622
 623                        cx.spawn({
 624                            let default_model = default_model.clone();
 625                            let session_id = response.session_id.clone();
 626                            let models = models.clone();
 627                            let conn = self.connection.clone();
 628                            async move |_| {
 629                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 630                                .await.log_err();
 631
 632                                if result.is_none() {
 633                                    models.borrow_mut().current_model_id = initial_model_id;
 634                                }
 635                            }
 636                        }).detach();
 637
 638                        models_ref.current_model_id = default_model;
 639                    } else {
 640                        let available_models = models_ref
 641                            .available_models
 642                            .iter()
 643                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 644                            .collect::<Vec<_>>()
 645                            .join("\n");
 646
 647                        log::warn!(
 648                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 649                        );
 650                    }
 651                }
 652            }
 653
 654            if let Some(config_opts) = config_options.as_ref() {
 655                self.apply_default_config_options(&response.session_id, config_opts, cx);
 656            }
 657
 658            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 659            let thread: Entity<AcpThread> = cx.new(|cx| {
 660                AcpThread::new(
 661                    None,
 662                    None,
 663                    Some(work_dirs),
 664                    self.clone(),
 665                    project,
 666                    action_log,
 667                    response.session_id.clone(),
 668                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 669                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 670                    cx,
 671                )
 672            });
 673
 674            self.sessions.borrow_mut().insert(
 675                response.session_id,
 676                AcpSession {
 677                    thread: thread.downgrade(),
 678                    suppress_abort_err: false,
 679                    session_modes: modes,
 680                    models,
 681                    config_options: config_options.map(ConfigOptions::new),
 682                },
 683            );
 684
 685            Ok(thread)
 686        })
 687    }
 688
 689    fn supports_load_session(&self) -> bool {
 690        self.agent_capabilities.load_session
 691    }
 692
 693    fn supports_resume_session(&self) -> bool {
 694        self.agent_capabilities
 695            .session_capabilities
 696            .resume
 697            .is_some()
 698    }
 699
 700    fn load_session(
 701        self: Rc<Self>,
 702        session_id: acp::SessionId,
 703        project: Entity<Project>,
 704        work_dirs: PathList,
 705        title: Option<SharedString>,
 706        cx: &mut App,
 707    ) -> Task<Result<Entity<AcpThread>>> {
 708        if !self.agent_capabilities.load_session {
 709            return Task::ready(Err(anyhow!(LoadError::Other(
 710                "Loading sessions is not supported by this agent.".into()
 711            ))));
 712        }
 713        // TODO: remove this once ACP supports multiple working directories
 714        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 715            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 716        };
 717
 718        let mcp_servers = mcp_servers_for_project(&project, cx);
 719        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 720        let thread: Entity<AcpThread> = cx.new(|cx| {
 721            AcpThread::new(
 722                None,
 723                title,
 724                Some(work_dirs.clone()),
 725                self.clone(),
 726                project,
 727                action_log,
 728                session_id.clone(),
 729                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 730                cx,
 731            )
 732        });
 733
 734        self.sessions.borrow_mut().insert(
 735            session_id.clone(),
 736            AcpSession {
 737                thread: thread.downgrade(),
 738                suppress_abort_err: false,
 739                session_modes: None,
 740                models: None,
 741                config_options: None,
 742            },
 743        );
 744
 745        cx.spawn(async move |cx| {
 746            let response = match self
 747                .connection
 748                .load_session(
 749                    acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
 750                )
 751                .await
 752            {
 753                Ok(response) => response,
 754                Err(err) => {
 755                    self.sessions.borrow_mut().remove(&session_id);
 756                    return Err(map_acp_error(err));
 757                }
 758            };
 759
 760            let (modes, models, config_options) =
 761                config_state(response.modes, response.models, response.config_options);
 762
 763            if let Some(config_opts) = config_options.as_ref() {
 764                self.apply_default_config_options(&session_id, config_opts, cx);
 765            }
 766
 767            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 768                session.session_modes = modes;
 769                session.models = models;
 770                session.config_options = config_options.map(ConfigOptions::new);
 771            }
 772
 773            Ok(thread)
 774        })
 775    }
 776
 777    fn resume_session(
 778        self: Rc<Self>,
 779        session_id: acp::SessionId,
 780        project: Entity<Project>,
 781        work_dirs: PathList,
 782        title: Option<SharedString>,
 783        cx: &mut App,
 784    ) -> Task<Result<Entity<AcpThread>>> {
 785        if self
 786            .agent_capabilities
 787            .session_capabilities
 788            .resume
 789            .is_none()
 790        {
 791            return Task::ready(Err(anyhow!(LoadError::Other(
 792                "Resuming sessions is not supported by this agent.".into()
 793            ))));
 794        }
 795        // TODO: remove this once ACP supports multiple working directories
 796        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 797            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 798        };
 799
 800        let mcp_servers = mcp_servers_for_project(&project, cx);
 801        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 802        let thread: Entity<AcpThread> = cx.new(|cx| {
 803            AcpThread::new(
 804                None,
 805                title,
 806                Some(work_dirs),
 807                self.clone(),
 808                project,
 809                action_log,
 810                session_id.clone(),
 811                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 812                cx,
 813            )
 814        });
 815
 816        self.sessions.borrow_mut().insert(
 817            session_id.clone(),
 818            AcpSession {
 819                thread: thread.downgrade(),
 820                suppress_abort_err: false,
 821                session_modes: None,
 822                models: None,
 823                config_options: None,
 824            },
 825        );
 826
 827        cx.spawn(async move |cx| {
 828            let response = match self
 829                .connection
 830                .resume_session(
 831                    acp::ResumeSessionRequest::new(session_id.clone(), cwd)
 832                        .mcp_servers(mcp_servers),
 833                )
 834                .await
 835            {
 836                Ok(response) => response,
 837                Err(err) => {
 838                    self.sessions.borrow_mut().remove(&session_id);
 839                    return Err(map_acp_error(err));
 840                }
 841            };
 842
 843            let (modes, models, config_options) =
 844                config_state(response.modes, response.models, response.config_options);
 845
 846            if let Some(config_opts) = config_options.as_ref() {
 847                self.apply_default_config_options(&session_id, config_opts, cx);
 848            }
 849
 850            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 851                session.session_modes = modes;
 852                session.models = models;
 853                session.config_options = config_options.map(ConfigOptions::new);
 854            }
 855
 856            Ok(thread)
 857        })
 858    }
 859
 860    fn supports_close_session(&self) -> bool {
 861        self.agent_capabilities.session_capabilities.close.is_some()
 862    }
 863
 864    fn close_session(
 865        self: Rc<Self>,
 866        session_id: &acp::SessionId,
 867        cx: &mut App,
 868    ) -> Task<Result<()>> {
 869        if !self.supports_close_session() {
 870            return Task::ready(Err(anyhow!(LoadError::Other(
 871                "Closing sessions is not supported by this agent.".into()
 872            ))));
 873        }
 874
 875        let conn = self.connection.clone();
 876        let session_id = session_id.clone();
 877        cx.foreground_executor().spawn(async move {
 878            conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
 879                .await?;
 880            self.sessions.borrow_mut().remove(&session_id);
 881            Ok(())
 882        })
 883    }
 884
 885    fn auth_methods(&self) -> &[acp::AuthMethod] {
 886        &self.auth_methods
 887    }
 888
 889    fn terminal_auth_task(
 890        &self,
 891        method_id: &acp::AuthMethodId,
 892        cx: &App,
 893    ) -> Option<SpawnInTerminal> {
 894        let method = self
 895            .auth_methods
 896            .iter()
 897            .find(|method| method.id() == method_id)?;
 898
 899        match method {
 900            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
 901                Some(terminal_auth_task(&self.command, &self.id, terminal))
 902            }
 903            _ => meta_terminal_auth_task(&self.id, method_id, method),
 904        }
 905    }
 906
 907    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 908        let conn = self.connection.clone();
 909        cx.foreground_executor().spawn(async move {
 910            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 911                .await?;
 912            Ok(())
 913        })
 914    }
 915
 916    fn prompt(
 917        &self,
 918        _id: Option<acp_thread::UserMessageId>,
 919        params: acp::PromptRequest,
 920        cx: &mut App,
 921    ) -> Task<Result<acp::PromptResponse>> {
 922        let conn = self.connection.clone();
 923        let sessions = self.sessions.clone();
 924        let session_id = params.session_id.clone();
 925        cx.foreground_executor().spawn(async move {
 926            let result = conn.prompt(params).await;
 927
 928            let mut suppress_abort_err = false;
 929
 930            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 931                suppress_abort_err = session.suppress_abort_err;
 932                session.suppress_abort_err = false;
 933            }
 934
 935            match result {
 936                Ok(response) => Ok(response),
 937                Err(err) => {
 938                    if err.code == acp::ErrorCode::AuthRequired {
 939                        return Err(anyhow!(acp::Error::auth_required()));
 940                    }
 941
 942                    if err.code != ErrorCode::InternalError {
 943                        anyhow::bail!(err)
 944                    }
 945
 946                    let Some(data) = &err.data else {
 947                        anyhow::bail!(err)
 948                    };
 949
 950                    // Temporary workaround until the following PR is generally available:
 951                    // https://github.com/google-gemini/gemini-cli/pull/6656
 952
 953                    #[derive(Deserialize)]
 954                    #[serde(deny_unknown_fields)]
 955                    struct ErrorDetails {
 956                        details: Box<str>,
 957                    }
 958
 959                    match serde_json::from_value(data.clone()) {
 960                        Ok(ErrorDetails { details }) => {
 961                            if suppress_abort_err
 962                                && (details.contains("This operation was aborted")
 963                                    || details.contains("The user aborted a request"))
 964                            {
 965                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
 966                            } else {
 967                                Err(anyhow!(details))
 968                            }
 969                        }
 970                        Err(_) => Err(anyhow!(err)),
 971                    }
 972                }
 973            }
 974        })
 975    }
 976
 977    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
 978        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
 979            session.suppress_abort_err = true;
 980        }
 981        let conn = self.connection.clone();
 982        let params = acp::CancelNotification::new(session_id.clone());
 983        cx.foreground_executor()
 984            .spawn(async move { conn.cancel(params).await })
 985            .detach();
 986    }
 987
 988    fn session_modes(
 989        &self,
 990        session_id: &acp::SessionId,
 991        _cx: &App,
 992    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
 993        let sessions = self.sessions.clone();
 994        let sessions_ref = sessions.borrow();
 995        let Some(session) = sessions_ref.get(session_id) else {
 996            return None;
 997        };
 998
 999        if let Some(modes) = session.session_modes.as_ref() {
1000            Some(Rc::new(AcpSessionModes {
1001                connection: self.connection.clone(),
1002                session_id: session_id.clone(),
1003                state: modes.clone(),
1004            }) as _)
1005        } else {
1006            None
1007        }
1008    }
1009
1010    fn model_selector(
1011        &self,
1012        session_id: &acp::SessionId,
1013    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1014        let sessions = self.sessions.clone();
1015        let sessions_ref = sessions.borrow();
1016        let Some(session) = sessions_ref.get(session_id) else {
1017            return None;
1018        };
1019
1020        if let Some(models) = session.models.as_ref() {
1021            Some(Rc::new(AcpModelSelector::new(
1022                session_id.clone(),
1023                self.connection.clone(),
1024                models.clone(),
1025            )) as _)
1026        } else {
1027            None
1028        }
1029    }
1030
1031    fn session_config_options(
1032        &self,
1033        session_id: &acp::SessionId,
1034        _cx: &App,
1035    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1036        let sessions = self.sessions.borrow();
1037        let session = sessions.get(session_id)?;
1038
1039        let config_opts = session.config_options.as_ref()?;
1040
1041        Some(Rc::new(AcpSessionConfigOptions {
1042            session_id: session_id.clone(),
1043            connection: self.connection.clone(),
1044            state: config_opts.config_options.clone(),
1045            watch_tx: config_opts.tx.clone(),
1046            watch_rx: config_opts.rx.clone(),
1047        }) as _)
1048    }
1049
1050    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1051        self.session_list.clone().map(|s| s as _)
1052    }
1053
1054    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1055        self
1056    }
1057}
1058
1059fn map_acp_error(err: acp::Error) -> anyhow::Error {
1060    if err.code == acp::ErrorCode::AuthRequired {
1061        let mut error = AuthRequired::new();
1062
1063        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1064            error = error.with_description(err.message);
1065        }
1066
1067        anyhow!(error)
1068    } else {
1069        anyhow!(err)
1070    }
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075    use super::*;
1076
1077    #[test]
1078    fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1079        let command = AgentServerCommand {
1080            path: "/path/to/agent".into(),
1081            args: vec!["--acp".into(), "--verbose".into()],
1082            env: Some(HashMap::from_iter([
1083                ("BASE".into(), "1".into()),
1084                ("SHARED".into(), "base".into()),
1085            ])),
1086        };
1087        let method = acp::AuthMethodTerminal::new("login", "Login")
1088            .args(vec!["/auth".into()])
1089            .env(std::collections::HashMap::from_iter([
1090                ("EXTRA".into(), "2".into()),
1091                ("SHARED".into(), "override".into()),
1092            ]));
1093
1094        let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1095
1096        assert_eq!(
1097            terminal_auth_task.command.as_deref(),
1098            Some("/path/to/agent")
1099        );
1100        assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1101        assert_eq!(
1102            terminal_auth_task.env,
1103            HashMap::from_iter([
1104                ("BASE".into(), "1".into()),
1105                ("SHARED".into(), "override".into()),
1106                ("EXTRA".into(), "2".into()),
1107            ])
1108        );
1109        assert_eq!(terminal_auth_task.label, "Login");
1110        assert_eq!(terminal_auth_task.command_label, "Login");
1111    }
1112
1113    #[test]
1114    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1115        let method_id = acp::AuthMethodId::new("legacy-login");
1116        let method = acp::AuthMethod::Agent(
1117            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1118                "terminal-auth".to_string(),
1119                serde_json::json!({
1120                    "label": "legacy /auth",
1121                    "command": "legacy-agent",
1122                    "args": ["auth", "--interactive"],
1123                    "env": {
1124                        "AUTH_MODE": "interactive",
1125                    },
1126                }),
1127            )])),
1128        );
1129
1130        let terminal_auth_task =
1131            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1132                .expect("expected legacy terminal auth task");
1133
1134        assert_eq!(
1135            terminal_auth_task.id.0,
1136            "external-agent-test-agent-legacy-login-login"
1137        );
1138        assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1139        assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1140        assert_eq!(
1141            terminal_auth_task.env,
1142            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1143        );
1144        assert_eq!(terminal_auth_task.label, "legacy /auth");
1145    }
1146
1147    #[test]
1148    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1149        let method_id = acp::AuthMethodId::new("legacy-login");
1150        let method = acp::AuthMethod::Agent(
1151            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1152                "terminal-auth".to_string(),
1153                serde_json::json!({
1154                    "label": "legacy /auth",
1155                }),
1156            )])),
1157        );
1158
1159        assert!(
1160            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1161        );
1162    }
1163
1164    #[test]
1165    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1166        let method_id = acp::AuthMethodId::new("login");
1167        let method = acp::AuthMethod::Terminal(
1168            acp::AuthMethodTerminal::new(method_id, "Login")
1169                .args(vec!["/auth".into()])
1170                .env(std::collections::HashMap::from_iter([(
1171                    "AUTH_MODE".into(),
1172                    "first-class".into(),
1173                )]))
1174                .meta(acp::Meta::from_iter([(
1175                    "terminal-auth".to_string(),
1176                    serde_json::json!({
1177                        "label": "legacy /auth",
1178                        "command": "legacy-agent",
1179                        "args": ["legacy-auth"],
1180                        "env": {
1181                            "AUTH_MODE": "legacy",
1182                        },
1183                    }),
1184                )])),
1185        );
1186
1187        let command = AgentServerCommand {
1188            path: "/path/to/agent".into(),
1189            args: vec!["--acp".into()],
1190            env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1191        };
1192
1193        let terminal_auth_task = match &method {
1194            acp::AuthMethod::Terminal(terminal) => {
1195                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1196            }
1197            _ => unreachable!(),
1198        };
1199
1200        assert_eq!(
1201            terminal_auth_task.command.as_deref(),
1202            Some("/path/to/agent")
1203        );
1204        assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1205        assert_eq!(
1206            terminal_auth_task.env,
1207            HashMap::from_iter([
1208                ("BASE".into(), "1".into()),
1209                ("AUTH_MODE".into(), "first-class".into()),
1210            ])
1211        );
1212        assert_eq!(terminal_auth_task.label, "Login");
1213    }
1214}
1215
1216fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1217    let context_server_store = project.read(cx).context_server_store().read(cx);
1218    let is_local = project.read(cx).is_local();
1219    context_server_store
1220        .configured_server_ids()
1221        .iter()
1222        .filter_map(|id| {
1223            let configuration = context_server_store.configuration_for_server(id)?;
1224            match &*configuration {
1225                project::context_server_store::ContextServerConfiguration::Custom {
1226                    command,
1227                    remote,
1228                    ..
1229                }
1230                | project::context_server_store::ContextServerConfiguration::Extension {
1231                    command,
1232                    remote,
1233                    ..
1234                } if is_local || *remote => Some(acp::McpServer::Stdio(
1235                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
1236                        .args(command.args.clone())
1237                        .env(if let Some(env) = command.env.as_ref() {
1238                            env.iter()
1239                                .map(|(name, value)| acp::EnvVariable::new(name, value))
1240                                .collect()
1241                        } else {
1242                            vec![]
1243                        }),
1244                )),
1245                project::context_server_store::ContextServerConfiguration::Http {
1246                    url,
1247                    headers,
1248                    timeout: _,
1249                } => Some(acp::McpServer::Http(
1250                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1251                        headers
1252                            .iter()
1253                            .map(|(name, value)| acp::HttpHeader::new(name, value))
1254                            .collect(),
1255                    ),
1256                )),
1257                _ => None,
1258            }
1259        })
1260        .collect()
1261}
1262
1263fn config_state(
1264    modes: Option<acp::SessionModeState>,
1265    models: Option<acp::SessionModelState>,
1266    config_options: Option<Vec<acp::SessionConfigOption>>,
1267) -> (
1268    Option<Rc<RefCell<acp::SessionModeState>>>,
1269    Option<Rc<RefCell<acp::SessionModelState>>>,
1270    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1271) {
1272    if let Some(opts) = config_options {
1273        return (None, None, Some(Rc::new(RefCell::new(opts))));
1274    }
1275
1276    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1277    let models = models.map(|models| Rc::new(RefCell::new(models)));
1278    (modes, models, None)
1279}
1280
1281struct AcpSessionModes {
1282    session_id: acp::SessionId,
1283    connection: Rc<acp::ClientSideConnection>,
1284    state: Rc<RefCell<acp::SessionModeState>>,
1285}
1286
1287impl acp_thread::AgentSessionModes for AcpSessionModes {
1288    fn current_mode(&self) -> acp::SessionModeId {
1289        self.state.borrow().current_mode_id.clone()
1290    }
1291
1292    fn all_modes(&self) -> Vec<acp::SessionMode> {
1293        self.state.borrow().available_modes.clone()
1294    }
1295
1296    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1297        let connection = self.connection.clone();
1298        let session_id = self.session_id.clone();
1299        let old_mode_id;
1300        {
1301            let mut state = self.state.borrow_mut();
1302            old_mode_id = state.current_mode_id.clone();
1303            state.current_mode_id = mode_id.clone();
1304        };
1305        let state = self.state.clone();
1306        cx.foreground_executor().spawn(async move {
1307            let result = connection
1308                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1309                .await;
1310
1311            if result.is_err() {
1312                state.borrow_mut().current_mode_id = old_mode_id;
1313            }
1314
1315            result?;
1316
1317            Ok(())
1318        })
1319    }
1320}
1321
1322struct AcpModelSelector {
1323    session_id: acp::SessionId,
1324    connection: Rc<acp::ClientSideConnection>,
1325    state: Rc<RefCell<acp::SessionModelState>>,
1326}
1327
1328impl AcpModelSelector {
1329    fn new(
1330        session_id: acp::SessionId,
1331        connection: Rc<acp::ClientSideConnection>,
1332        state: Rc<RefCell<acp::SessionModelState>>,
1333    ) -> Self {
1334        Self {
1335            session_id,
1336            connection,
1337            state,
1338        }
1339    }
1340}
1341
1342impl acp_thread::AgentModelSelector for AcpModelSelector {
1343    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1344        Task::ready(Ok(acp_thread::AgentModelList::Flat(
1345            self.state
1346                .borrow()
1347                .available_models
1348                .clone()
1349                .into_iter()
1350                .map(acp_thread::AgentModelInfo::from)
1351                .collect(),
1352        )))
1353    }
1354
1355    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1356        let connection = self.connection.clone();
1357        let session_id = self.session_id.clone();
1358        let old_model_id;
1359        {
1360            let mut state = self.state.borrow_mut();
1361            old_model_id = state.current_model_id.clone();
1362            state.current_model_id = model_id.clone();
1363        };
1364        let state = self.state.clone();
1365        cx.foreground_executor().spawn(async move {
1366            let result = connection
1367                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1368                .await;
1369
1370            if result.is_err() {
1371                state.borrow_mut().current_model_id = old_model_id;
1372            }
1373
1374            result?;
1375
1376            Ok(())
1377        })
1378    }
1379
1380    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1381        let state = self.state.borrow();
1382        Task::ready(
1383            state
1384                .available_models
1385                .iter()
1386                .find(|m| m.model_id == state.current_model_id)
1387                .cloned()
1388                .map(acp_thread::AgentModelInfo::from)
1389                .ok_or_else(|| anyhow::anyhow!("Model not found")),
1390        )
1391    }
1392}
1393
1394struct AcpSessionConfigOptions {
1395    session_id: acp::SessionId,
1396    connection: Rc<acp::ClientSideConnection>,
1397    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1398    watch_tx: Rc<RefCell<watch::Sender<()>>>,
1399    watch_rx: watch::Receiver<()>,
1400}
1401
1402impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1403    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1404        self.state.borrow().clone()
1405    }
1406
1407    fn set_config_option(
1408        &self,
1409        config_id: acp::SessionConfigId,
1410        value: acp::SessionConfigValueId,
1411        cx: &mut App,
1412    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1413        let connection = self.connection.clone();
1414        let session_id = self.session_id.clone();
1415        let state = self.state.clone();
1416
1417        let watch_tx = self.watch_tx.clone();
1418
1419        cx.foreground_executor().spawn(async move {
1420            let response = connection
1421                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1422                    session_id, config_id, value,
1423                ))
1424                .await?;
1425
1426            *state.borrow_mut() = response.config_options.clone();
1427            watch_tx.borrow_mut().send(()).ok();
1428            Ok(response.config_options)
1429        })
1430    }
1431
1432    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1433        Some(self.watch_rx.clone())
1434    }
1435}
1436
1437struct ClientDelegate {
1438    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1439    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1440    cx: AsyncApp,
1441}
1442
1443#[async_trait::async_trait(?Send)]
1444impl acp::Client for ClientDelegate {
1445    async fn request_permission(
1446        &self,
1447        arguments: acp::RequestPermissionRequest,
1448    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1449        let thread;
1450        {
1451            let sessions_ref = self.sessions.borrow();
1452            let session = sessions_ref
1453                .get(&arguments.session_id)
1454                .context("Failed to get session")?;
1455            thread = session.thread.clone();
1456        }
1457
1458        let cx = &mut self.cx.clone();
1459
1460        let task = thread.update(cx, |thread, cx| {
1461            thread.request_tool_call_authorization(
1462                arguments.tool_call,
1463                acp_thread::PermissionOptions::Flat(arguments.options),
1464                cx,
1465            )
1466        })??;
1467
1468        let outcome = task.await;
1469
1470        Ok(acp::RequestPermissionResponse::new(outcome.into()))
1471    }
1472
1473    async fn write_text_file(
1474        &self,
1475        arguments: acp::WriteTextFileRequest,
1476    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1477        let cx = &mut self.cx.clone();
1478        let task = self
1479            .session_thread(&arguments.session_id)?
1480            .update(cx, |thread, cx| {
1481                thread.write_text_file(arguments.path, arguments.content, cx)
1482            })?;
1483
1484        task.await?;
1485
1486        Ok(Default::default())
1487    }
1488
1489    async fn read_text_file(
1490        &self,
1491        arguments: acp::ReadTextFileRequest,
1492    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1493        let task = self.session_thread(&arguments.session_id)?.update(
1494            &mut self.cx.clone(),
1495            |thread, cx| {
1496                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1497            },
1498        )?;
1499
1500        let content = task.await?;
1501
1502        Ok(acp::ReadTextFileResponse::new(content))
1503    }
1504
1505    async fn session_notification(
1506        &self,
1507        notification: acp::SessionNotification,
1508    ) -> Result<(), acp::Error> {
1509        let sessions = self.sessions.borrow();
1510        let session = sessions
1511            .get(&notification.session_id)
1512            .context("Failed to get session")?;
1513
1514        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1515            current_mode_id,
1516            ..
1517        }) = &notification.update
1518        {
1519            if let Some(session_modes) = &session.session_modes {
1520                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1521            }
1522        }
1523
1524        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1525            config_options,
1526            ..
1527        }) = &notification.update
1528        {
1529            if let Some(opts) = &session.config_options {
1530                *opts.config_options.borrow_mut() = config_options.clone();
1531                opts.tx.borrow_mut().send(()).ok();
1532            }
1533        }
1534
1535        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
1536            && let Some(session_list) = self.session_list.borrow().as_ref()
1537        {
1538            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1539        }
1540
1541        // Clone so we can inspect meta both before and after handing off to the thread
1542        let update_clone = notification.update.clone();
1543
1544        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1545        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1546            if let Some(meta) = &tc.meta {
1547                if let Some(terminal_info) = meta.get("terminal_info") {
1548                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1549                    {
1550                        let terminal_id = acp::TerminalId::new(id_str);
1551                        let cwd = terminal_info
1552                            .get("cwd")
1553                            .and_then(|v| v.as_str().map(PathBuf::from));
1554
1555                        // Create a minimal display-only lower-level terminal and register it.
1556                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1557                            let builder = TerminalBuilder::new_display_only(
1558                                CursorShape::default(),
1559                                AlternateScroll::On,
1560                                None,
1561                                0,
1562                                cx.background_executor(),
1563                                thread.project().read(cx).path_style(cx),
1564                            )?;
1565                            let lower = cx.new(|cx| builder.subscribe(cx));
1566                            thread.on_terminal_provider_event(
1567                                TerminalProviderEvent::Created {
1568                                    terminal_id,
1569                                    label: tc.title.clone(),
1570                                    cwd,
1571                                    output_byte_limit: None,
1572                                    terminal: lower,
1573                                },
1574                                cx,
1575                            );
1576                            anyhow::Ok(())
1577                        });
1578                    }
1579                }
1580            }
1581        }
1582
1583        // Forward the update to the acp_thread as usual.
1584        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1585            thread.handle_session_update(notification.update.clone(), cx)
1586        })??;
1587
1588        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1589        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1590            if let Some(meta) = &tcu.meta {
1591                if let Some(term_out) = meta.get("terminal_output") {
1592                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1593                        let terminal_id = acp::TerminalId::new(id_str);
1594                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1595                            let data = s.as_bytes().to_vec();
1596                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1597                                thread.on_terminal_provider_event(
1598                                    TerminalProviderEvent::Output { terminal_id, data },
1599                                    cx,
1600                                );
1601                            });
1602                        }
1603                    }
1604                }
1605
1606                // terminal_exit
1607                if let Some(term_exit) = meta.get("terminal_exit") {
1608                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1609                        let terminal_id = acp::TerminalId::new(id_str);
1610                        let status = acp::TerminalExitStatus::new()
1611                            .exit_code(
1612                                term_exit
1613                                    .get("exit_code")
1614                                    .and_then(|v| v.as_u64())
1615                                    .map(|i| i as u32),
1616                            )
1617                            .signal(
1618                                term_exit
1619                                    .get("signal")
1620                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1621                            );
1622
1623                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1624                            thread.on_terminal_provider_event(
1625                                TerminalProviderEvent::Exit {
1626                                    terminal_id,
1627                                    status,
1628                                },
1629                                cx,
1630                            );
1631                        });
1632                    }
1633                }
1634            }
1635        }
1636
1637        Ok(())
1638    }
1639
1640    async fn create_terminal(
1641        &self,
1642        args: acp::CreateTerminalRequest,
1643    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1644        let thread = self.session_thread(&args.session_id)?;
1645        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1646
1647        let terminal_entity = acp_thread::create_terminal_entity(
1648            args.command.clone(),
1649            &args.args,
1650            args.env
1651                .into_iter()
1652                .map(|env| (env.name, env.value))
1653                .collect(),
1654            args.cwd.clone(),
1655            &project,
1656            &mut self.cx.clone(),
1657        )
1658        .await?;
1659
1660        // Register with renderer
1661        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1662            thread.register_terminal_created(
1663                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1664                format!("{} {}", args.command, args.args.join(" ")),
1665                args.cwd.clone(),
1666                args.output_byte_limit,
1667                terminal_entity,
1668                cx,
1669            )
1670        })?;
1671        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1672        Ok(acp::CreateTerminalResponse::new(terminal_id))
1673    }
1674
1675    async fn kill_terminal(
1676        &self,
1677        args: acp::KillTerminalRequest,
1678    ) -> Result<acp::KillTerminalResponse, acp::Error> {
1679        self.session_thread(&args.session_id)?
1680            .update(&mut self.cx.clone(), |thread, cx| {
1681                thread.kill_terminal(args.terminal_id, cx)
1682            })??;
1683
1684        Ok(Default::default())
1685    }
1686
1687    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1688        Err(acp::Error::method_not_found())
1689    }
1690
1691    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1692        Err(acp::Error::method_not_found())
1693    }
1694
1695    async fn release_terminal(
1696        &self,
1697        args: acp::ReleaseTerminalRequest,
1698    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1699        self.session_thread(&args.session_id)?
1700            .update(&mut self.cx.clone(), |thread, cx| {
1701                thread.release_terminal(args.terminal_id, cx)
1702            })??;
1703
1704        Ok(Default::default())
1705    }
1706
1707    async fn terminal_output(
1708        &self,
1709        args: acp::TerminalOutputRequest,
1710    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1711        self.session_thread(&args.session_id)?
1712            .read_with(&mut self.cx.clone(), |thread, cx| {
1713                let out = thread
1714                    .terminal(args.terminal_id)?
1715                    .read(cx)
1716                    .current_output(cx);
1717
1718                Ok(out)
1719            })?
1720    }
1721
1722    async fn wait_for_terminal_exit(
1723        &self,
1724        args: acp::WaitForTerminalExitRequest,
1725    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1726        let exit_status = self
1727            .session_thread(&args.session_id)?
1728            .update(&mut self.cx.clone(), |thread, cx| {
1729                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1730            })??
1731            .await;
1732
1733        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1734    }
1735}
1736
1737impl ClientDelegate {
1738    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1739        let sessions = self.sessions.borrow();
1740        sessions
1741            .get(session_id)
1742            .context("Failed to get session")
1743            .map(|session| session.thread.clone())
1744    }
1745}