acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::io::BufReader;
  13use project::agent_server_store::AgentServerCommand;
  14use project::{AgentId, Project};
  15use serde::Deserialize;
  16use settings::Settings as _;
  17use task::{ShellBuilder, SpawnInTerminal};
  18use util::ResultExt as _;
  19use util::path_list::PathList;
  20use util::process::Child;
  21
  22use std::path::PathBuf;
  23use std::process::Stdio;
  24use std::rc::Rc;
  25use std::{any::Any, cell::RefCell};
  26use thiserror::Error;
  27
  28use anyhow::{Context as _, Result};
  29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  30
  31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  32use terminal::TerminalBuilder;
  33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  34
  35use crate::GEMINI_ID;
  36
  37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  38
  39#[derive(Debug, Error)]
  40#[error("Unsupported version")]
  41pub struct UnsupportedVersion;
  42
  43pub struct AcpConnection {
  44    id: AgentId,
  45    telemetry_id: SharedString,
  46    connection: Rc<acp::ClientSideConnection>,
  47    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  48    auth_methods: Vec<acp::AuthMethod>,
  49    command: AgentServerCommand,
  50    agent_capabilities: acp::AgentCapabilities,
  51    default_mode: Option<acp::SessionModeId>,
  52    default_model: Option<acp::ModelId>,
  53    default_config_options: HashMap<String, String>,
  54    child: Child,
  55    session_list: Option<Rc<AcpSessionList>>,
  56    _io_task: Task<Result<(), acp::Error>>,
  57    _wait_task: Task<Result<()>>,
  58    _stderr_task: Task<Result<()>>,
  59}
  60
  61struct ConfigOptions {
  62    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  63    tx: Rc<RefCell<watch::Sender<()>>>,
  64    rx: watch::Receiver<()>,
  65}
  66
  67impl ConfigOptions {
  68    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  69        let (tx, rx) = watch::channel(());
  70        Self {
  71            config_options,
  72            tx: Rc::new(RefCell::new(tx)),
  73            rx,
  74        }
  75    }
  76}
  77
  78pub struct AcpSession {
  79    thread: WeakEntity<AcpThread>,
  80    suppress_abort_err: bool,
  81    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  82    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  83    config_options: Option<ConfigOptions>,
  84}
  85
  86pub struct AcpSessionList {
  87    connection: Rc<acp::ClientSideConnection>,
  88    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
  89    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
  90}
  91
  92impl AcpSessionList {
  93    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
  94        let (tx, rx) = smol::channel::unbounded();
  95        Self {
  96            connection,
  97            updates_tx: tx,
  98            updates_rx: rx,
  99        }
 100    }
 101
 102    fn notify_update(&self) {
 103        self.updates_tx
 104            .try_send(acp_thread::SessionListUpdate::Refresh)
 105            .log_err();
 106    }
 107
 108    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 109        self.updates_tx
 110            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 111            .log_err();
 112    }
 113}
 114
 115impl AgentSessionList for AcpSessionList {
 116    fn list_sessions(
 117        &self,
 118        request: AgentSessionListRequest,
 119        cx: &mut App,
 120    ) -> Task<Result<AgentSessionListResponse>> {
 121        let conn = self.connection.clone();
 122        cx.foreground_executor().spawn(async move {
 123            let acp_request = acp::ListSessionsRequest::new()
 124                .cwd(request.cwd)
 125                .cursor(request.cursor);
 126            let response = conn.list_sessions(acp_request).await?;
 127            Ok(AgentSessionListResponse {
 128                sessions: response
 129                    .sessions
 130                    .into_iter()
 131                    .map(|s| AgentSessionInfo {
 132                        session_id: s.session_id,
 133                        work_dirs: Some(PathList::new(&[s.cwd])),
 134                        title: s.title.map(Into::into),
 135                        updated_at: s.updated_at.and_then(|date_str| {
 136                            chrono::DateTime::parse_from_rfc3339(&date_str)
 137                                .ok()
 138                                .map(|dt| dt.with_timezone(&chrono::Utc))
 139                        }),
 140                        created_at: None,
 141                        meta: s.meta,
 142                    })
 143                    .collect(),
 144                next_cursor: response.next_cursor,
 145                meta: response.meta,
 146            })
 147        })
 148    }
 149
 150    fn watch(
 151        &self,
 152        _cx: &mut App,
 153    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 154        Some(self.updates_rx.clone())
 155    }
 156
 157    fn notify_refresh(&self) {
 158        self.notify_update();
 159    }
 160
 161    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 162        self
 163    }
 164}
 165
 166pub async fn connect(
 167    agent_id: AgentId,
 168    project: Entity<Project>,
 169    command: AgentServerCommand,
 170    default_mode: Option<acp::SessionModeId>,
 171    default_model: Option<acp::ModelId>,
 172    default_config_options: HashMap<String, String>,
 173    cx: &mut AsyncApp,
 174) -> Result<Rc<dyn AgentConnection>> {
 175    let conn = AcpConnection::stdio(
 176        agent_id,
 177        project,
 178        command.clone(),
 179        default_mode,
 180        default_model,
 181        default_config_options,
 182        cx,
 183    )
 184    .await?;
 185    Ok(Rc::new(conn) as _)
 186}
 187
 188const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 189
 190impl AcpConnection {
 191    pub async fn stdio(
 192        agent_id: AgentId,
 193        project: Entity<Project>,
 194        command: AgentServerCommand,
 195        default_mode: Option<acp::SessionModeId>,
 196        default_model: Option<acp::ModelId>,
 197        default_config_options: HashMap<String, String>,
 198        cx: &mut AsyncApp,
 199    ) -> Result<Self> {
 200        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 201        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 202        let mut child =
 203            builder.build_std_command(Some(command.path.display().to_string()), &command.args);
 204        child.envs(command.env.iter().flatten());
 205        if let Some(cwd) = project.update(cx, |project, cx| {
 206            project
 207                .default_path_list(cx)
 208                .ordered_paths()
 209                .next()
 210                .cloned()
 211        }) {
 212            child.current_dir(cwd);
 213        }
 214        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 215
 216        let stdout = child.stdout.take().context("Failed to take stdout")?;
 217        let stdin = child.stdin.take().context("Failed to take stdin")?;
 218        let stderr = child.stderr.take().context("Failed to take stderr")?;
 219        log::debug!(
 220            "Spawning external agent server: {:?}, {:?}",
 221            command.path,
 222            command.args
 223        );
 224        log::trace!("Spawned (pid: {})", child.id());
 225
 226        let sessions = Rc::new(RefCell::new(HashMap::default()));
 227
 228        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 229            (
 230                release_channel::ReleaseChannel::try_global(cx)
 231                    .map(|release_channel| release_channel.display_name()),
 232                release_channel::AppVersion::global(cx).to_string(),
 233            )
 234        });
 235
 236        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 237            Rc::new(RefCell::new(None));
 238
 239        let client = ClientDelegate {
 240            sessions: sessions.clone(),
 241            session_list: client_session_list.clone(),
 242            cx: cx.clone(),
 243        };
 244        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 245            let foreground_executor = cx.foreground_executor().clone();
 246            move |fut| {
 247                foreground_executor.spawn(fut).detach();
 248            }
 249        });
 250
 251        let io_task = cx.background_spawn(io_task);
 252
 253        let stderr_task = cx.background_spawn(async move {
 254            let mut stderr = BufReader::new(stderr);
 255            let mut line = String::new();
 256            while let Ok(n) = stderr.read_line(&mut line).await
 257                && n > 0
 258            {
 259                log::warn!("agent stderr: {}", line.trim());
 260                line.clear();
 261            }
 262            Ok(())
 263        });
 264
 265        let wait_task = cx.spawn({
 266            let sessions = sessions.clone();
 267            let status_fut = child.status();
 268            async move |cx| {
 269                let status = status_fut.await?;
 270
 271                for session in sessions.borrow().values() {
 272                    session
 273                        .thread
 274                        .update(cx, |thread, cx| {
 275                            thread.emit_load_error(LoadError::Exited { status }, cx)
 276                        })
 277                        .ok();
 278                }
 279
 280                anyhow::Ok(())
 281            }
 282        });
 283
 284        let connection = Rc::new(connection);
 285
 286        cx.update(|cx| {
 287            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 288                registry.set_active_connection(agent_id.clone(), &connection, cx)
 289            });
 290        });
 291
 292        let response = connection
 293            .initialize(
 294                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 295                    .client_capabilities(
 296                        acp::ClientCapabilities::new()
 297                            .fs(acp::FileSystemCapabilities::new()
 298                                .read_text_file(true)
 299                                .write_text_file(true))
 300                            .terminal(true)
 301                            .auth(acp::AuthCapabilities::new().terminal(true))
 302                            // Experimental: Allow for rendering terminal output from the agents
 303                            .meta(acp::Meta::from_iter([
 304                                ("terminal_output".into(), true.into()),
 305                                ("terminal-auth".into(), true.into()),
 306                            ])),
 307                    )
 308                    .client_info(
 309                        acp::Implementation::new("zed", version)
 310                            .title(release_channel.map(ToOwned::to_owned)),
 311                    ),
 312            )
 313            .await?;
 314
 315        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 316            return Err(UnsupportedVersion.into());
 317        }
 318
 319        let telemetry_id = response
 320            .agent_info
 321            // Use the one the agent provides if we have one
 322            .map(|info| info.name.into())
 323            // Otherwise, just use the name
 324            .unwrap_or_else(|| agent_id.0.to_string().into());
 325
 326        let session_list = if response
 327            .agent_capabilities
 328            .session_capabilities
 329            .list
 330            .is_some()
 331        {
 332            let list = Rc::new(AcpSessionList::new(connection.clone()));
 333            *client_session_list.borrow_mut() = Some(list.clone());
 334            Some(list)
 335        } else {
 336            None
 337        };
 338
 339        // TODO: Remove this override once Google team releases their official auth methods
 340        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 341            let mut args = command.args.clone();
 342            args.retain(|a| a != "--experimental-acp" && a != "--acp");
 343            let value = serde_json::json!({
 344                "label": "gemini /auth",
 345                "command": command.path.to_string_lossy().into_owned(),
 346                "args": args,
 347                "env": command.env.clone().unwrap_or_default(),
 348            });
 349            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 350            vec![acp::AuthMethod::Agent(
 351                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 352                    .description("Login with your Google or Vertex AI account")
 353                    .meta(meta),
 354            )]
 355        } else {
 356            response.auth_methods
 357        };
 358        Ok(Self {
 359            id: agent_id,
 360            auth_methods,
 361            command,
 362            connection,
 363            telemetry_id,
 364            sessions,
 365            agent_capabilities: response.agent_capabilities,
 366            default_mode,
 367            default_model,
 368            default_config_options,
 369            session_list,
 370            _io_task: io_task,
 371            _wait_task: wait_task,
 372            _stderr_task: stderr_task,
 373            child,
 374        })
 375    }
 376
 377    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 378        &self.agent_capabilities.prompt_capabilities
 379    }
 380
 381    fn apply_default_config_options(
 382        &self,
 383        session_id: &acp::SessionId,
 384        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 385        cx: &mut AsyncApp,
 386    ) {
 387        let id = self.id.clone();
 388        let defaults_to_apply: Vec<_> = {
 389            let config_opts_ref = config_options.borrow();
 390            config_opts_ref
 391                .iter()
 392                .filter_map(|config_option| {
 393                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 394
 395                    let is_valid = match &config_option.kind {
 396                        acp::SessionConfigKind::Select(select) => match &select.options {
 397                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 398                                .iter()
 399                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 400                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 401                                groups.iter().any(|g| {
 402                                    g.options
 403                                        .iter()
 404                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 405                                })
 406                            }
 407                            _ => false,
 408                        },
 409                        _ => false,
 410                    };
 411
 412                    if is_valid {
 413                        let initial_value = match &config_option.kind {
 414                            acp::SessionConfigKind::Select(select) => {
 415                                Some(select.current_value.clone())
 416                            }
 417                            _ => None,
 418                        };
 419                        Some((
 420                            config_option.id.clone(),
 421                            default_value.clone(),
 422                            initial_value,
 423                        ))
 424                    } else {
 425                        log::warn!(
 426                            "`{}` is not a valid value for config option `{}` in {}",
 427                            default_value,
 428                            config_option.id.0,
 429                            id
 430                        );
 431                        None
 432                    }
 433                })
 434                .collect()
 435        };
 436
 437        for (config_id, default_value, initial_value) in defaults_to_apply {
 438            cx.spawn({
 439                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 440                let session_id = session_id.clone();
 441                let config_id_clone = config_id.clone();
 442                let config_opts = config_options.clone();
 443                let conn = self.connection.clone();
 444                async move |_| {
 445                    let result = conn
 446                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 447                            session_id,
 448                            config_id_clone.clone(),
 449                            default_value_id,
 450                        ))
 451                        .await
 452                        .log_err();
 453
 454                    if result.is_none() {
 455                        if let Some(initial) = initial_value {
 456                            let mut opts = config_opts.borrow_mut();
 457                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 458                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 459                                    select.current_value = initial;
 460                                }
 461                            }
 462                        }
 463                    }
 464                }
 465            })
 466            .detach();
 467
 468            let mut opts = config_options.borrow_mut();
 469            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 470                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 471                    select.current_value = acp::SessionConfigValueId::new(default_value);
 472                }
 473            }
 474        }
 475    }
 476}
 477
 478impl Drop for AcpConnection {
 479    fn drop(&mut self) {
 480        self.child.kill().log_err();
 481    }
 482}
 483
 484fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 485    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 486}
 487
 488fn terminal_auth_task(
 489    command: &AgentServerCommand,
 490    agent_id: &AgentId,
 491    method: &acp::AuthMethodTerminal,
 492) -> SpawnInTerminal {
 493    let mut args = command.args.clone();
 494    args.extend(method.args.clone());
 495
 496    let mut env = command.env.clone().unwrap_or_default();
 497    env.extend(method.env.clone());
 498
 499    acp_thread::build_terminal_auth_task(
 500        terminal_auth_task_id(agent_id, &method.id),
 501        method.name.clone(),
 502        command.path.to_string_lossy().into_owned(),
 503        args,
 504        env,
 505    )
 506}
 507
 508/// Used to support the _meta method prior to stabilization
 509fn meta_terminal_auth_task(
 510    agent_id: &AgentId,
 511    method_id: &acp::AuthMethodId,
 512    method: &acp::AuthMethod,
 513) -> Option<SpawnInTerminal> {
 514    #[derive(Deserialize)]
 515    struct MetaTerminalAuth {
 516        label: String,
 517        command: String,
 518        #[serde(default)]
 519        args: Vec<String>,
 520        #[serde(default)]
 521        env: HashMap<String, String>,
 522    }
 523
 524    let meta = match method {
 525        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 526        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 527        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 528        _ => None,
 529    }?;
 530    let terminal_auth =
 531        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 532
 533    Some(acp_thread::build_terminal_auth_task(
 534        terminal_auth_task_id(agent_id, method_id),
 535        terminal_auth.label.clone(),
 536        terminal_auth.command,
 537        terminal_auth.args,
 538        terminal_auth.env,
 539    ))
 540}
 541
 542impl AgentConnection for AcpConnection {
 543    fn agent_id(&self) -> AgentId {
 544        self.id.clone()
 545    }
 546
 547    fn telemetry_id(&self) -> SharedString {
 548        self.telemetry_id.clone()
 549    }
 550
 551    fn new_session(
 552        self: Rc<Self>,
 553        project: Entity<Project>,
 554        work_dirs: PathList,
 555        cx: &mut App,
 556    ) -> Task<Result<Entity<AcpThread>>> {
 557        // TODO: remove this once ACP supports multiple working directories
 558        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 559            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 560        };
 561        let name = self.id.0.clone();
 562        let mcp_servers = mcp_servers_for_project(&project, cx);
 563
 564        cx.spawn(async move |cx| {
 565            let response = self.connection
 566                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
 567                .await
 568                .map_err(map_acp_error)?;
 569
 570            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
 571
 572            if let Some(default_mode) = self.default_mode.clone() {
 573                if let Some(modes) = modes.as_ref() {
 574                    let mut modes_ref = modes.borrow_mut();
 575                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 576
 577                    if has_mode {
 578                        let initial_mode_id = modes_ref.current_mode_id.clone();
 579
 580                        cx.spawn({
 581                            let default_mode = default_mode.clone();
 582                            let session_id = response.session_id.clone();
 583                            let modes = modes.clone();
 584                            let conn = self.connection.clone();
 585                            async move |_| {
 586                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 587                                .await.log_err();
 588
 589                                if result.is_none() {
 590                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 591                                }
 592                            }
 593                        }).detach();
 594
 595                        modes_ref.current_mode_id = default_mode;
 596                    } else {
 597                        let available_modes = modes_ref
 598                            .available_modes
 599                            .iter()
 600                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 601                            .collect::<Vec<_>>()
 602                            .join("\n");
 603
 604                        log::warn!(
 605                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 606                        );
 607                    }
 608                }
 609            }
 610
 611            if let Some(default_model) = self.default_model.clone() {
 612                if let Some(models) = models.as_ref() {
 613                    let mut models_ref = models.borrow_mut();
 614                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 615
 616                    if has_model {
 617                        let initial_model_id = models_ref.current_model_id.clone();
 618
 619                        cx.spawn({
 620                            let default_model = default_model.clone();
 621                            let session_id = response.session_id.clone();
 622                            let models = models.clone();
 623                            let conn = self.connection.clone();
 624                            async move |_| {
 625                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 626                                .await.log_err();
 627
 628                                if result.is_none() {
 629                                    models.borrow_mut().current_model_id = initial_model_id;
 630                                }
 631                            }
 632                        }).detach();
 633
 634                        models_ref.current_model_id = default_model;
 635                    } else {
 636                        let available_models = models_ref
 637                            .available_models
 638                            .iter()
 639                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 640                            .collect::<Vec<_>>()
 641                            .join("\n");
 642
 643                        log::warn!(
 644                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 645                        );
 646                    }
 647                }
 648            }
 649
 650            if let Some(config_opts) = config_options.as_ref() {
 651                self.apply_default_config_options(&response.session_id, config_opts, cx);
 652            }
 653
 654            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 655            let thread: Entity<AcpThread> = cx.new(|cx| {
 656                AcpThread::new(
 657                    None,
 658                    None,
 659                    Some(work_dirs),
 660                    self.clone(),
 661                    project,
 662                    action_log,
 663                    response.session_id.clone(),
 664                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 665                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 666                    cx,
 667                )
 668            });
 669
 670            self.sessions.borrow_mut().insert(
 671                response.session_id,
 672                AcpSession {
 673                    thread: thread.downgrade(),
 674                    suppress_abort_err: false,
 675                    session_modes: modes,
 676                    models,
 677                    config_options: config_options.map(ConfigOptions::new),
 678                },
 679            );
 680
 681            Ok(thread)
 682        })
 683    }
 684
 685    fn supports_load_session(&self) -> bool {
 686        self.agent_capabilities.load_session
 687    }
 688
 689    fn supports_resume_session(&self) -> bool {
 690        self.agent_capabilities
 691            .session_capabilities
 692            .resume
 693            .is_some()
 694    }
 695
 696    fn load_session(
 697        self: Rc<Self>,
 698        session_id: acp::SessionId,
 699        project: Entity<Project>,
 700        work_dirs: PathList,
 701        title: Option<SharedString>,
 702        cx: &mut App,
 703    ) -> Task<Result<Entity<AcpThread>>> {
 704        if !self.agent_capabilities.load_session {
 705            return Task::ready(Err(anyhow!(LoadError::Other(
 706                "Loading sessions is not supported by this agent.".into()
 707            ))));
 708        }
 709        // TODO: remove this once ACP supports multiple working directories
 710        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 711            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 712        };
 713
 714        let mcp_servers = mcp_servers_for_project(&project, cx);
 715        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 716        let thread: Entity<AcpThread> = cx.new(|cx| {
 717            AcpThread::new(
 718                None,
 719                title,
 720                Some(work_dirs.clone()),
 721                self.clone(),
 722                project,
 723                action_log,
 724                session_id.clone(),
 725                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 726                cx,
 727            )
 728        });
 729
 730        self.sessions.borrow_mut().insert(
 731            session_id.clone(),
 732            AcpSession {
 733                thread: thread.downgrade(),
 734                suppress_abort_err: false,
 735                session_modes: None,
 736                models: None,
 737                config_options: None,
 738            },
 739        );
 740
 741        cx.spawn(async move |cx| {
 742            let response = match self
 743                .connection
 744                .load_session(
 745                    acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
 746                )
 747                .await
 748            {
 749                Ok(response) => response,
 750                Err(err) => {
 751                    self.sessions.borrow_mut().remove(&session_id);
 752                    return Err(map_acp_error(err));
 753                }
 754            };
 755
 756            let (modes, models, config_options) =
 757                config_state(response.modes, response.models, response.config_options);
 758
 759            if let Some(config_opts) = config_options.as_ref() {
 760                self.apply_default_config_options(&session_id, config_opts, cx);
 761            }
 762
 763            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 764                session.session_modes = modes;
 765                session.models = models;
 766                session.config_options = config_options.map(ConfigOptions::new);
 767            }
 768
 769            Ok(thread)
 770        })
 771    }
 772
 773    fn resume_session(
 774        self: Rc<Self>,
 775        session_id: acp::SessionId,
 776        project: Entity<Project>,
 777        work_dirs: PathList,
 778        title: Option<SharedString>,
 779        cx: &mut App,
 780    ) -> Task<Result<Entity<AcpThread>>> {
 781        if self
 782            .agent_capabilities
 783            .session_capabilities
 784            .resume
 785            .is_none()
 786        {
 787            return Task::ready(Err(anyhow!(LoadError::Other(
 788                "Resuming sessions is not supported by this agent.".into()
 789            ))));
 790        }
 791        // TODO: remove this once ACP supports multiple working directories
 792        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 793            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 794        };
 795
 796        let mcp_servers = mcp_servers_for_project(&project, cx);
 797        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 798        let thread: Entity<AcpThread> = cx.new(|cx| {
 799            AcpThread::new(
 800                None,
 801                title,
 802                Some(work_dirs),
 803                self.clone(),
 804                project,
 805                action_log,
 806                session_id.clone(),
 807                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 808                cx,
 809            )
 810        });
 811
 812        self.sessions.borrow_mut().insert(
 813            session_id.clone(),
 814            AcpSession {
 815                thread: thread.downgrade(),
 816                suppress_abort_err: false,
 817                session_modes: None,
 818                models: None,
 819                config_options: None,
 820            },
 821        );
 822
 823        cx.spawn(async move |cx| {
 824            let response = match self
 825                .connection
 826                .resume_session(
 827                    acp::ResumeSessionRequest::new(session_id.clone(), cwd)
 828                        .mcp_servers(mcp_servers),
 829                )
 830                .await
 831            {
 832                Ok(response) => response,
 833                Err(err) => {
 834                    self.sessions.borrow_mut().remove(&session_id);
 835                    return Err(map_acp_error(err));
 836                }
 837            };
 838
 839            let (modes, models, config_options) =
 840                config_state(response.modes, response.models, response.config_options);
 841
 842            if let Some(config_opts) = config_options.as_ref() {
 843                self.apply_default_config_options(&session_id, config_opts, cx);
 844            }
 845
 846            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 847                session.session_modes = modes;
 848                session.models = models;
 849                session.config_options = config_options.map(ConfigOptions::new);
 850            }
 851
 852            Ok(thread)
 853        })
 854    }
 855
 856    fn supports_close_session(&self) -> bool {
 857        self.agent_capabilities.session_capabilities.close.is_some()
 858    }
 859
 860    fn close_session(
 861        self: Rc<Self>,
 862        session_id: &acp::SessionId,
 863        cx: &mut App,
 864    ) -> Task<Result<()>> {
 865        if !self.supports_close_session() {
 866            return Task::ready(Err(anyhow!(LoadError::Other(
 867                "Closing sessions is not supported by this agent.".into()
 868            ))));
 869        }
 870
 871        let conn = self.connection.clone();
 872        let session_id = session_id.clone();
 873        cx.foreground_executor().spawn(async move {
 874            conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
 875                .await?;
 876            self.sessions.borrow_mut().remove(&session_id);
 877            Ok(())
 878        })
 879    }
 880
 881    fn auth_methods(&self) -> &[acp::AuthMethod] {
 882        &self.auth_methods
 883    }
 884
 885    fn terminal_auth_task(
 886        &self,
 887        method_id: &acp::AuthMethodId,
 888        cx: &App,
 889    ) -> Option<SpawnInTerminal> {
 890        let method = self
 891            .auth_methods
 892            .iter()
 893            .find(|method| method.id() == method_id)?;
 894
 895        match method {
 896            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
 897                Some(terminal_auth_task(&self.command, &self.id, terminal))
 898            }
 899            _ => meta_terminal_auth_task(&self.id, method_id, method),
 900        }
 901    }
 902
 903    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
 904        let conn = self.connection.clone();
 905        cx.foreground_executor().spawn(async move {
 906            conn.authenticate(acp::AuthenticateRequest::new(method_id))
 907                .await?;
 908            Ok(())
 909        })
 910    }
 911
 912    fn prompt(
 913        &self,
 914        _id: Option<acp_thread::UserMessageId>,
 915        params: acp::PromptRequest,
 916        cx: &mut App,
 917    ) -> Task<Result<acp::PromptResponse>> {
 918        let conn = self.connection.clone();
 919        let sessions = self.sessions.clone();
 920        let session_id = params.session_id.clone();
 921        cx.foreground_executor().spawn(async move {
 922            let result = conn.prompt(params).await;
 923
 924            let mut suppress_abort_err = false;
 925
 926            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
 927                suppress_abort_err = session.suppress_abort_err;
 928                session.suppress_abort_err = false;
 929            }
 930
 931            match result {
 932                Ok(response) => Ok(response),
 933                Err(err) => {
 934                    if err.code == acp::ErrorCode::AuthRequired {
 935                        return Err(anyhow!(acp::Error::auth_required()));
 936                    }
 937
 938                    if err.code != ErrorCode::InternalError {
 939                        anyhow::bail!(err)
 940                    }
 941
 942                    let Some(data) = &err.data else {
 943                        anyhow::bail!(err)
 944                    };
 945
 946                    // Temporary workaround until the following PR is generally available:
 947                    // https://github.com/google-gemini/gemini-cli/pull/6656
 948
 949                    #[derive(Deserialize)]
 950                    #[serde(deny_unknown_fields)]
 951                    struct ErrorDetails {
 952                        details: Box<str>,
 953                    }
 954
 955                    match serde_json::from_value(data.clone()) {
 956                        Ok(ErrorDetails { details }) => {
 957                            if suppress_abort_err
 958                                && (details.contains("This operation was aborted")
 959                                    || details.contains("The user aborted a request"))
 960                            {
 961                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
 962                            } else {
 963                                Err(anyhow!(details))
 964                            }
 965                        }
 966                        Err(_) => Err(anyhow!(err)),
 967                    }
 968                }
 969            }
 970        })
 971    }
 972
 973    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
 974        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
 975            session.suppress_abort_err = true;
 976        }
 977        let conn = self.connection.clone();
 978        let params = acp::CancelNotification::new(session_id.clone());
 979        cx.foreground_executor()
 980            .spawn(async move { conn.cancel(params).await })
 981            .detach();
 982    }
 983
 984    fn session_modes(
 985        &self,
 986        session_id: &acp::SessionId,
 987        _cx: &App,
 988    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
 989        let sessions = self.sessions.clone();
 990        let sessions_ref = sessions.borrow();
 991        let Some(session) = sessions_ref.get(session_id) else {
 992            return None;
 993        };
 994
 995        if let Some(modes) = session.session_modes.as_ref() {
 996            Some(Rc::new(AcpSessionModes {
 997                connection: self.connection.clone(),
 998                session_id: session_id.clone(),
 999                state: modes.clone(),
1000            }) as _)
1001        } else {
1002            None
1003        }
1004    }
1005
1006    fn model_selector(
1007        &self,
1008        session_id: &acp::SessionId,
1009    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1010        let sessions = self.sessions.clone();
1011        let sessions_ref = sessions.borrow();
1012        let Some(session) = sessions_ref.get(session_id) else {
1013            return None;
1014        };
1015
1016        if let Some(models) = session.models.as_ref() {
1017            Some(Rc::new(AcpModelSelector::new(
1018                session_id.clone(),
1019                self.connection.clone(),
1020                models.clone(),
1021            )) as _)
1022        } else {
1023            None
1024        }
1025    }
1026
1027    fn session_config_options(
1028        &self,
1029        session_id: &acp::SessionId,
1030        _cx: &App,
1031    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1032        let sessions = self.sessions.borrow();
1033        let session = sessions.get(session_id)?;
1034
1035        let config_opts = session.config_options.as_ref()?;
1036
1037        Some(Rc::new(AcpSessionConfigOptions {
1038            session_id: session_id.clone(),
1039            connection: self.connection.clone(),
1040            state: config_opts.config_options.clone(),
1041            watch_tx: config_opts.tx.clone(),
1042            watch_rx: config_opts.rx.clone(),
1043        }) as _)
1044    }
1045
1046    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1047        self.session_list.clone().map(|s| s as _)
1048    }
1049
1050    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1051        self
1052    }
1053}
1054
1055fn map_acp_error(err: acp::Error) -> anyhow::Error {
1056    if err.code == acp::ErrorCode::AuthRequired {
1057        let mut error = AuthRequired::new();
1058
1059        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1060            error = error.with_description(err.message);
1061        }
1062
1063        anyhow!(error)
1064    } else {
1065        anyhow!(err)
1066    }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072
1073    #[test]
1074    fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1075        let command = AgentServerCommand {
1076            path: "/path/to/agent".into(),
1077            args: vec!["--acp".into(), "--verbose".into()],
1078            env: Some(HashMap::from_iter([
1079                ("BASE".into(), "1".into()),
1080                ("SHARED".into(), "base".into()),
1081            ])),
1082        };
1083        let method = acp::AuthMethodTerminal::new("login", "Login")
1084            .args(vec!["/auth".into()])
1085            .env(std::collections::HashMap::from_iter([
1086                ("EXTRA".into(), "2".into()),
1087                ("SHARED".into(), "override".into()),
1088            ]));
1089
1090        let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1091
1092        assert_eq!(
1093            terminal_auth_task.command.as_deref(),
1094            Some("/path/to/agent")
1095        );
1096        assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1097        assert_eq!(
1098            terminal_auth_task.env,
1099            HashMap::from_iter([
1100                ("BASE".into(), "1".into()),
1101                ("SHARED".into(), "override".into()),
1102                ("EXTRA".into(), "2".into()),
1103            ])
1104        );
1105        assert_eq!(terminal_auth_task.label, "Login");
1106        assert_eq!(terminal_auth_task.command_label, "Login");
1107    }
1108
1109    #[test]
1110    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1111        let method_id = acp::AuthMethodId::new("legacy-login");
1112        let method = acp::AuthMethod::Agent(
1113            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1114                "terminal-auth".to_string(),
1115                serde_json::json!({
1116                    "label": "legacy /auth",
1117                    "command": "legacy-agent",
1118                    "args": ["auth", "--interactive"],
1119                    "env": {
1120                        "AUTH_MODE": "interactive",
1121                    },
1122                }),
1123            )])),
1124        );
1125
1126        let terminal_auth_task =
1127            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1128                .expect("expected legacy terminal auth task");
1129
1130        assert_eq!(
1131            terminal_auth_task.id.0,
1132            "external-agent-test-agent-legacy-login-login"
1133        );
1134        assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1135        assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1136        assert_eq!(
1137            terminal_auth_task.env,
1138            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1139        );
1140        assert_eq!(terminal_auth_task.label, "legacy /auth");
1141    }
1142
1143    #[test]
1144    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1145        let method_id = acp::AuthMethodId::new("legacy-login");
1146        let method = acp::AuthMethod::Agent(
1147            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1148                "terminal-auth".to_string(),
1149                serde_json::json!({
1150                    "label": "legacy /auth",
1151                }),
1152            )])),
1153        );
1154
1155        assert!(
1156            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1157        );
1158    }
1159
1160    #[test]
1161    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1162        let method_id = acp::AuthMethodId::new("login");
1163        let method = acp::AuthMethod::Terminal(
1164            acp::AuthMethodTerminal::new(method_id, "Login")
1165                .args(vec!["/auth".into()])
1166                .env(std::collections::HashMap::from_iter([(
1167                    "AUTH_MODE".into(),
1168                    "first-class".into(),
1169                )]))
1170                .meta(acp::Meta::from_iter([(
1171                    "terminal-auth".to_string(),
1172                    serde_json::json!({
1173                        "label": "legacy /auth",
1174                        "command": "legacy-agent",
1175                        "args": ["legacy-auth"],
1176                        "env": {
1177                            "AUTH_MODE": "legacy",
1178                        },
1179                    }),
1180                )])),
1181        );
1182
1183        let command = AgentServerCommand {
1184            path: "/path/to/agent".into(),
1185            args: vec!["--acp".into()],
1186            env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1187        };
1188
1189        let terminal_auth_task = match &method {
1190            acp::AuthMethod::Terminal(terminal) => {
1191                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1192            }
1193            _ => unreachable!(),
1194        };
1195
1196        assert_eq!(
1197            terminal_auth_task.command.as_deref(),
1198            Some("/path/to/agent")
1199        );
1200        assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1201        assert_eq!(
1202            terminal_auth_task.env,
1203            HashMap::from_iter([
1204                ("BASE".into(), "1".into()),
1205                ("AUTH_MODE".into(), "first-class".into()),
1206            ])
1207        );
1208        assert_eq!(terminal_auth_task.label, "Login");
1209    }
1210}
1211
1212fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1213    let context_server_store = project.read(cx).context_server_store().read(cx);
1214    let is_local = project.read(cx).is_local();
1215    context_server_store
1216        .configured_server_ids()
1217        .iter()
1218        .filter_map(|id| {
1219            let configuration = context_server_store.configuration_for_server(id)?;
1220            match &*configuration {
1221                project::context_server_store::ContextServerConfiguration::Custom {
1222                    command,
1223                    remote,
1224                    ..
1225                }
1226                | project::context_server_store::ContextServerConfiguration::Extension {
1227                    command,
1228                    remote,
1229                    ..
1230                } if is_local || *remote => Some(acp::McpServer::Stdio(
1231                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
1232                        .args(command.args.clone())
1233                        .env(if let Some(env) = command.env.as_ref() {
1234                            env.iter()
1235                                .map(|(name, value)| acp::EnvVariable::new(name, value))
1236                                .collect()
1237                        } else {
1238                            vec![]
1239                        }),
1240                )),
1241                project::context_server_store::ContextServerConfiguration::Http {
1242                    url,
1243                    headers,
1244                    timeout: _,
1245                } => Some(acp::McpServer::Http(
1246                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1247                        headers
1248                            .iter()
1249                            .map(|(name, value)| acp::HttpHeader::new(name, value))
1250                            .collect(),
1251                    ),
1252                )),
1253                _ => None,
1254            }
1255        })
1256        .collect()
1257}
1258
1259fn config_state(
1260    modes: Option<acp::SessionModeState>,
1261    models: Option<acp::SessionModelState>,
1262    config_options: Option<Vec<acp::SessionConfigOption>>,
1263) -> (
1264    Option<Rc<RefCell<acp::SessionModeState>>>,
1265    Option<Rc<RefCell<acp::SessionModelState>>>,
1266    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1267) {
1268    if let Some(opts) = config_options {
1269        return (None, None, Some(Rc::new(RefCell::new(opts))));
1270    }
1271
1272    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1273    let models = models.map(|models| Rc::new(RefCell::new(models)));
1274    (modes, models, None)
1275}
1276
1277struct AcpSessionModes {
1278    session_id: acp::SessionId,
1279    connection: Rc<acp::ClientSideConnection>,
1280    state: Rc<RefCell<acp::SessionModeState>>,
1281}
1282
1283impl acp_thread::AgentSessionModes for AcpSessionModes {
1284    fn current_mode(&self) -> acp::SessionModeId {
1285        self.state.borrow().current_mode_id.clone()
1286    }
1287
1288    fn all_modes(&self) -> Vec<acp::SessionMode> {
1289        self.state.borrow().available_modes.clone()
1290    }
1291
1292    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1293        let connection = self.connection.clone();
1294        let session_id = self.session_id.clone();
1295        let old_mode_id;
1296        {
1297            let mut state = self.state.borrow_mut();
1298            old_mode_id = state.current_mode_id.clone();
1299            state.current_mode_id = mode_id.clone();
1300        };
1301        let state = self.state.clone();
1302        cx.foreground_executor().spawn(async move {
1303            let result = connection
1304                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1305                .await;
1306
1307            if result.is_err() {
1308                state.borrow_mut().current_mode_id = old_mode_id;
1309            }
1310
1311            result?;
1312
1313            Ok(())
1314        })
1315    }
1316}
1317
1318struct AcpModelSelector {
1319    session_id: acp::SessionId,
1320    connection: Rc<acp::ClientSideConnection>,
1321    state: Rc<RefCell<acp::SessionModelState>>,
1322}
1323
1324impl AcpModelSelector {
1325    fn new(
1326        session_id: acp::SessionId,
1327        connection: Rc<acp::ClientSideConnection>,
1328        state: Rc<RefCell<acp::SessionModelState>>,
1329    ) -> Self {
1330        Self {
1331            session_id,
1332            connection,
1333            state,
1334        }
1335    }
1336}
1337
1338impl acp_thread::AgentModelSelector for AcpModelSelector {
1339    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1340        Task::ready(Ok(acp_thread::AgentModelList::Flat(
1341            self.state
1342                .borrow()
1343                .available_models
1344                .clone()
1345                .into_iter()
1346                .map(acp_thread::AgentModelInfo::from)
1347                .collect(),
1348        )))
1349    }
1350
1351    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1352        let connection = self.connection.clone();
1353        let session_id = self.session_id.clone();
1354        let old_model_id;
1355        {
1356            let mut state = self.state.borrow_mut();
1357            old_model_id = state.current_model_id.clone();
1358            state.current_model_id = model_id.clone();
1359        };
1360        let state = self.state.clone();
1361        cx.foreground_executor().spawn(async move {
1362            let result = connection
1363                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1364                .await;
1365
1366            if result.is_err() {
1367                state.borrow_mut().current_model_id = old_model_id;
1368            }
1369
1370            result?;
1371
1372            Ok(())
1373        })
1374    }
1375
1376    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1377        let state = self.state.borrow();
1378        Task::ready(
1379            state
1380                .available_models
1381                .iter()
1382                .find(|m| m.model_id == state.current_model_id)
1383                .cloned()
1384                .map(acp_thread::AgentModelInfo::from)
1385                .ok_or_else(|| anyhow::anyhow!("Model not found")),
1386        )
1387    }
1388}
1389
1390struct AcpSessionConfigOptions {
1391    session_id: acp::SessionId,
1392    connection: Rc<acp::ClientSideConnection>,
1393    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1394    watch_tx: Rc<RefCell<watch::Sender<()>>>,
1395    watch_rx: watch::Receiver<()>,
1396}
1397
1398impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1399    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1400        self.state.borrow().clone()
1401    }
1402
1403    fn set_config_option(
1404        &self,
1405        config_id: acp::SessionConfigId,
1406        value: acp::SessionConfigValueId,
1407        cx: &mut App,
1408    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1409        let connection = self.connection.clone();
1410        let session_id = self.session_id.clone();
1411        let state = self.state.clone();
1412
1413        let watch_tx = self.watch_tx.clone();
1414
1415        cx.foreground_executor().spawn(async move {
1416            let response = connection
1417                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1418                    session_id, config_id, value,
1419                ))
1420                .await?;
1421
1422            *state.borrow_mut() = response.config_options.clone();
1423            watch_tx.borrow_mut().send(()).ok();
1424            Ok(response.config_options)
1425        })
1426    }
1427
1428    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1429        Some(self.watch_rx.clone())
1430    }
1431}
1432
1433struct ClientDelegate {
1434    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1435    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1436    cx: AsyncApp,
1437}
1438
1439#[async_trait::async_trait(?Send)]
1440impl acp::Client for ClientDelegate {
1441    async fn request_permission(
1442        &self,
1443        arguments: acp::RequestPermissionRequest,
1444    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1445        let thread;
1446        {
1447            let sessions_ref = self.sessions.borrow();
1448            let session = sessions_ref
1449                .get(&arguments.session_id)
1450                .context("Failed to get session")?;
1451            thread = session.thread.clone();
1452        }
1453
1454        let cx = &mut self.cx.clone();
1455
1456        let task = thread.update(cx, |thread, cx| {
1457            thread.request_tool_call_authorization(
1458                arguments.tool_call,
1459                acp_thread::PermissionOptions::Flat(arguments.options),
1460                cx,
1461            )
1462        })??;
1463
1464        let outcome = task.await;
1465
1466        Ok(acp::RequestPermissionResponse::new(outcome.into()))
1467    }
1468
1469    async fn write_text_file(
1470        &self,
1471        arguments: acp::WriteTextFileRequest,
1472    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1473        let cx = &mut self.cx.clone();
1474        let task = self
1475            .session_thread(&arguments.session_id)?
1476            .update(cx, |thread, cx| {
1477                thread.write_text_file(arguments.path, arguments.content, cx)
1478            })?;
1479
1480        task.await?;
1481
1482        Ok(Default::default())
1483    }
1484
1485    async fn read_text_file(
1486        &self,
1487        arguments: acp::ReadTextFileRequest,
1488    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1489        let task = self.session_thread(&arguments.session_id)?.update(
1490            &mut self.cx.clone(),
1491            |thread, cx| {
1492                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1493            },
1494        )?;
1495
1496        let content = task.await?;
1497
1498        Ok(acp::ReadTextFileResponse::new(content))
1499    }
1500
1501    async fn session_notification(
1502        &self,
1503        notification: acp::SessionNotification,
1504    ) -> Result<(), acp::Error> {
1505        let sessions = self.sessions.borrow();
1506        let session = sessions
1507            .get(&notification.session_id)
1508            .context("Failed to get session")?;
1509
1510        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1511            current_mode_id,
1512            ..
1513        }) = &notification.update
1514        {
1515            if let Some(session_modes) = &session.session_modes {
1516                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1517            }
1518        }
1519
1520        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1521            config_options,
1522            ..
1523        }) = &notification.update
1524        {
1525            if let Some(opts) = &session.config_options {
1526                *opts.config_options.borrow_mut() = config_options.clone();
1527                opts.tx.borrow_mut().send(()).ok();
1528            }
1529        }
1530
1531        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
1532            && let Some(session_list) = self.session_list.borrow().as_ref()
1533        {
1534            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1535        }
1536
1537        // Clone so we can inspect meta both before and after handing off to the thread
1538        let update_clone = notification.update.clone();
1539
1540        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1541        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1542            if let Some(meta) = &tc.meta {
1543                if let Some(terminal_info) = meta.get("terminal_info") {
1544                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1545                    {
1546                        let terminal_id = acp::TerminalId::new(id_str);
1547                        let cwd = terminal_info
1548                            .get("cwd")
1549                            .and_then(|v| v.as_str().map(PathBuf::from));
1550
1551                        // Create a minimal display-only lower-level terminal and register it.
1552                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1553                            let builder = TerminalBuilder::new_display_only(
1554                                CursorShape::default(),
1555                                AlternateScroll::On,
1556                                None,
1557                                0,
1558                                cx.background_executor(),
1559                                thread.project().read(cx).path_style(cx),
1560                            )?;
1561                            let lower = cx.new(|cx| builder.subscribe(cx));
1562                            thread.on_terminal_provider_event(
1563                                TerminalProviderEvent::Created {
1564                                    terminal_id,
1565                                    label: tc.title.clone(),
1566                                    cwd,
1567                                    output_byte_limit: None,
1568                                    terminal: lower,
1569                                },
1570                                cx,
1571                            );
1572                            anyhow::Ok(())
1573                        });
1574                    }
1575                }
1576            }
1577        }
1578
1579        // Forward the update to the acp_thread as usual.
1580        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1581            thread.handle_session_update(notification.update.clone(), cx)
1582        })??;
1583
1584        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1585        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1586            if let Some(meta) = &tcu.meta {
1587                if let Some(term_out) = meta.get("terminal_output") {
1588                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1589                        let terminal_id = acp::TerminalId::new(id_str);
1590                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1591                            let data = s.as_bytes().to_vec();
1592                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1593                                thread.on_terminal_provider_event(
1594                                    TerminalProviderEvent::Output { terminal_id, data },
1595                                    cx,
1596                                );
1597                            });
1598                        }
1599                    }
1600                }
1601
1602                // terminal_exit
1603                if let Some(term_exit) = meta.get("terminal_exit") {
1604                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1605                        let terminal_id = acp::TerminalId::new(id_str);
1606                        let status = acp::TerminalExitStatus::new()
1607                            .exit_code(
1608                                term_exit
1609                                    .get("exit_code")
1610                                    .and_then(|v| v.as_u64())
1611                                    .map(|i| i as u32),
1612                            )
1613                            .signal(
1614                                term_exit
1615                                    .get("signal")
1616                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1617                            );
1618
1619                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1620                            thread.on_terminal_provider_event(
1621                                TerminalProviderEvent::Exit {
1622                                    terminal_id,
1623                                    status,
1624                                },
1625                                cx,
1626                            );
1627                        });
1628                    }
1629                }
1630            }
1631        }
1632
1633        Ok(())
1634    }
1635
1636    async fn create_terminal(
1637        &self,
1638        args: acp::CreateTerminalRequest,
1639    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1640        let thread = self.session_thread(&args.session_id)?;
1641        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1642
1643        let terminal_entity = acp_thread::create_terminal_entity(
1644            args.command.clone(),
1645            &args.args,
1646            args.env
1647                .into_iter()
1648                .map(|env| (env.name, env.value))
1649                .collect(),
1650            args.cwd.clone(),
1651            &project,
1652            &mut self.cx.clone(),
1653        )
1654        .await?;
1655
1656        // Register with renderer
1657        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1658            thread.register_terminal_created(
1659                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1660                format!("{} {}", args.command, args.args.join(" ")),
1661                args.cwd.clone(),
1662                args.output_byte_limit,
1663                terminal_entity,
1664                cx,
1665            )
1666        })?;
1667        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1668        Ok(acp::CreateTerminalResponse::new(terminal_id))
1669    }
1670
1671    async fn kill_terminal(
1672        &self,
1673        args: acp::KillTerminalRequest,
1674    ) -> Result<acp::KillTerminalResponse, acp::Error> {
1675        self.session_thread(&args.session_id)?
1676            .update(&mut self.cx.clone(), |thread, cx| {
1677                thread.kill_terminal(args.terminal_id, cx)
1678            })??;
1679
1680        Ok(Default::default())
1681    }
1682
1683    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1684        Err(acp::Error::method_not_found())
1685    }
1686
1687    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1688        Err(acp::Error::method_not_found())
1689    }
1690
1691    async fn release_terminal(
1692        &self,
1693        args: acp::ReleaseTerminalRequest,
1694    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1695        self.session_thread(&args.session_id)?
1696            .update(&mut self.cx.clone(), |thread, cx| {
1697                thread.release_terminal(args.terminal_id, cx)
1698            })??;
1699
1700        Ok(Default::default())
1701    }
1702
1703    async fn terminal_output(
1704        &self,
1705        args: acp::TerminalOutputRequest,
1706    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1707        self.session_thread(&args.session_id)?
1708            .read_with(&mut self.cx.clone(), |thread, cx| {
1709                let out = thread
1710                    .terminal(args.terminal_id)?
1711                    .read(cx)
1712                    .current_output(cx);
1713
1714                Ok(out)
1715            })?
1716    }
1717
1718    async fn wait_for_terminal_exit(
1719        &self,
1720        args: acp::WaitForTerminalExitRequest,
1721    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1722        let exit_status = self
1723            .session_thread(&args.session_id)?
1724            .update(&mut self.cx.clone(), |thread, cx| {
1725                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1726            })??
1727            .await;
1728
1729        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1730    }
1731}
1732
1733impl ClientDelegate {
1734    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1735        let sessions = self.sessions.borrow();
1736        sessions
1737            .get(session_id)
1738            .context("Failed to get session")
1739            .map(|session| session.thread.clone())
1740    }
1741}