acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::FutureExt as _;
  13use futures::future::Shared;
  14use futures::io::BufReader;
  15use project::agent_server_store::{AgentServerCommand, AgentServerStore};
  16use project::{AgentId, Project};
  17use remote::remote_client::Interactive;
  18use serde::Deserialize;
  19use settings::Settings as _;
  20use std::path::PathBuf;
  21use std::process::Stdio;
  22use std::rc::Rc;
  23use std::{any::Any, cell::RefCell};
  24use task::{ShellBuilder, SpawnInTerminal};
  25use thiserror::Error;
  26use util::ResultExt as _;
  27use util::path_list::PathList;
  28use util::process::Child;
  29
  30use std::sync::Arc;
  31
  32use anyhow::{Context as _, Result};
  33use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  34
  35use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  36use terminal::TerminalBuilder;
  37use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  38
  39use crate::GEMINI_ID;
  40
  41pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  42
  43#[derive(Debug, Error)]
  44#[error("Unsupported version")]
  45pub struct UnsupportedVersion;
  46
  47pub struct AcpConnection {
  48    id: AgentId,
  49    telemetry_id: SharedString,
  50    connection: Rc<acp::ClientSideConnection>,
  51    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  52    pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
  53    auth_methods: Vec<acp::AuthMethod>,
  54    agent_server_store: WeakEntity<AgentServerStore>,
  55    agent_capabilities: acp::AgentCapabilities,
  56    default_mode: Option<acp::SessionModeId>,
  57    default_model: Option<acp::ModelId>,
  58    default_config_options: HashMap<String, String>,
  59    child: Option<Child>,
  60    session_list: Option<Rc<AcpSessionList>>,
  61    _io_task: Task<Result<(), acp::Error>>,
  62    _wait_task: Task<Result<()>>,
  63    _stderr_task: Task<Result<()>>,
  64}
  65
  66struct PendingAcpSession {
  67    task: Shared<Task<Result<Entity<AcpThread>, Arc<anyhow::Error>>>>,
  68    ref_count: usize,
  69}
  70
  71struct SessionConfigResponse {
  72    modes: Option<acp::SessionModeState>,
  73    models: Option<acp::SessionModelState>,
  74    config_options: Option<Vec<acp::SessionConfigOption>>,
  75}
  76
  77struct ConfigOptions {
  78    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  79    tx: Rc<RefCell<watch::Sender<()>>>,
  80    rx: watch::Receiver<()>,
  81}
  82
  83impl ConfigOptions {
  84    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  85        let (tx, rx) = watch::channel(());
  86        Self {
  87            config_options,
  88            tx: Rc::new(RefCell::new(tx)),
  89            rx,
  90        }
  91    }
  92}
  93
  94pub struct AcpSession {
  95    thread: WeakEntity<AcpThread>,
  96    suppress_abort_err: bool,
  97    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  98    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  99    config_options: Option<ConfigOptions>,
 100    ref_count: usize,
 101}
 102
 103pub struct AcpSessionList {
 104    connection: Rc<acp::ClientSideConnection>,
 105    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
 106    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
 107}
 108
 109impl AcpSessionList {
 110    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
 111        let (tx, rx) = smol::channel::unbounded();
 112        Self {
 113            connection,
 114            updates_tx: tx,
 115            updates_rx: rx,
 116        }
 117    }
 118
 119    fn notify_update(&self) {
 120        self.updates_tx
 121            .try_send(acp_thread::SessionListUpdate::Refresh)
 122            .log_err();
 123    }
 124
 125    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 126        self.updates_tx
 127            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 128            .log_err();
 129    }
 130}
 131
 132impl AgentSessionList for AcpSessionList {
 133    fn list_sessions(
 134        &self,
 135        request: AgentSessionListRequest,
 136        cx: &mut App,
 137    ) -> Task<Result<AgentSessionListResponse>> {
 138        let conn = self.connection.clone();
 139        cx.foreground_executor().spawn(async move {
 140            let acp_request = acp::ListSessionsRequest::new()
 141                .cwd(request.cwd)
 142                .cursor(request.cursor);
 143            let response = conn.list_sessions(acp_request).await?;
 144            Ok(AgentSessionListResponse {
 145                sessions: response
 146                    .sessions
 147                    .into_iter()
 148                    .map(|s| AgentSessionInfo {
 149                        session_id: s.session_id,
 150                        work_dirs: Some(PathList::new(&[s.cwd])),
 151                        title: s.title.map(Into::into),
 152                        updated_at: s.updated_at.and_then(|date_str| {
 153                            chrono::DateTime::parse_from_rfc3339(&date_str)
 154                                .ok()
 155                                .map(|dt| dt.with_timezone(&chrono::Utc))
 156                        }),
 157                        created_at: None,
 158                        meta: s.meta,
 159                    })
 160                    .collect(),
 161                next_cursor: response.next_cursor,
 162                meta: response.meta,
 163            })
 164        })
 165    }
 166
 167    fn watch(
 168        &self,
 169        _cx: &mut App,
 170    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 171        Some(self.updates_rx.clone())
 172    }
 173
 174    fn notify_refresh(&self) {
 175        self.notify_update();
 176    }
 177
 178    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 179        self
 180    }
 181}
 182
 183pub async fn connect(
 184    agent_id: AgentId,
 185    project: Entity<Project>,
 186    command: AgentServerCommand,
 187    agent_server_store: WeakEntity<AgentServerStore>,
 188    default_mode: Option<acp::SessionModeId>,
 189    default_model: Option<acp::ModelId>,
 190    default_config_options: HashMap<String, String>,
 191    cx: &mut AsyncApp,
 192) -> Result<Rc<dyn AgentConnection>> {
 193    let conn = AcpConnection::stdio(
 194        agent_id,
 195        project,
 196        command.clone(),
 197        agent_server_store,
 198        default_mode,
 199        default_model,
 200        default_config_options,
 201        cx,
 202    )
 203    .await?;
 204    Ok(Rc::new(conn) as _)
 205}
 206
 207const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 208
 209impl AcpConnection {
 210    pub async fn stdio(
 211        agent_id: AgentId,
 212        project: Entity<Project>,
 213        command: AgentServerCommand,
 214        agent_server_store: WeakEntity<AgentServerStore>,
 215        default_mode: Option<acp::SessionModeId>,
 216        default_model: Option<acp::ModelId>,
 217        default_config_options: HashMap<String, String>,
 218        cx: &mut AsyncApp,
 219    ) -> Result<Self> {
 220        let root_dir = project.read_with(cx, |project, cx| {
 221            project
 222                .default_path_list(cx)
 223                .ordered_paths()
 224                .next()
 225                .cloned()
 226        });
 227        let original_command = command.clone();
 228        let (path, args, env) = project
 229            .read_with(cx, |project, cx| {
 230                project.remote_client().and_then(|client| {
 231                    let template = client
 232                        .read(cx)
 233                        .build_command_with_options(
 234                            Some(command.path.display().to_string()),
 235                            &command.args,
 236                            &command.env.clone().into_iter().flatten().collect(),
 237                            root_dir.as_ref().map(|path| path.display().to_string()),
 238                            None,
 239                            Interactive::No,
 240                        )
 241                        .log_err()?;
 242                    Some((template.program, template.args, template.env))
 243                })
 244            })
 245            .unwrap_or_else(|| {
 246                (
 247                    command.path.display().to_string(),
 248                    command.args,
 249                    command.env.unwrap_or_default(),
 250                )
 251            });
 252
 253        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 254        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 255        let mut child = builder.build_std_command(Some(path.clone()), &args);
 256        child.envs(env.clone());
 257        if let Some(cwd) = project.read_with(cx, |project, _cx| {
 258            if project.is_local() {
 259                root_dir.as_ref()
 260            } else {
 261                None
 262            }
 263        }) {
 264            child.current_dir(cwd);
 265        }
 266        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 267
 268        let stdout = child.stdout.take().context("Failed to take stdout")?;
 269        let stdin = child.stdin.take().context("Failed to take stdin")?;
 270        let stderr = child.stderr.take().context("Failed to take stderr")?;
 271        log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
 272        log::trace!("Spawned (pid: {})", child.id());
 273
 274        let sessions = Rc::new(RefCell::new(HashMap::default()));
 275
 276        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 277            (
 278                release_channel::ReleaseChannel::try_global(cx)
 279                    .map(|release_channel| release_channel.display_name()),
 280                release_channel::AppVersion::global(cx).to_string(),
 281            )
 282        });
 283
 284        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 285            Rc::new(RefCell::new(None));
 286
 287        let client = ClientDelegate {
 288            sessions: sessions.clone(),
 289            session_list: client_session_list.clone(),
 290            cx: cx.clone(),
 291        };
 292        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 293            let foreground_executor = cx.foreground_executor().clone();
 294            move |fut| {
 295                foreground_executor.spawn(fut).detach();
 296            }
 297        });
 298
 299        let io_task = cx.background_spawn(io_task);
 300
 301        let stderr_task = cx.background_spawn(async move {
 302            let mut stderr = BufReader::new(stderr);
 303            let mut line = String::new();
 304            while let Ok(n) = stderr.read_line(&mut line).await
 305                && n > 0
 306            {
 307                log::warn!("agent stderr: {}", line.trim());
 308                line.clear();
 309            }
 310            Ok(())
 311        });
 312
 313        let wait_task = cx.spawn({
 314            let sessions = sessions.clone();
 315            let status_fut = child.status();
 316            async move |cx| {
 317                let status = status_fut.await?;
 318
 319                for session in sessions.borrow().values() {
 320                    session
 321                        .thread
 322                        .update(cx, |thread, cx| {
 323                            thread.emit_load_error(LoadError::Exited { status }, cx)
 324                        })
 325                        .ok();
 326                }
 327
 328                anyhow::Ok(())
 329            }
 330        });
 331
 332        let connection = Rc::new(connection);
 333
 334        cx.update(|cx| {
 335            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 336                registry.set_active_connection(agent_id.clone(), &connection, cx)
 337            });
 338        });
 339
 340        let response = connection
 341            .initialize(
 342                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 343                    .client_capabilities(
 344                        acp::ClientCapabilities::new()
 345                            .fs(acp::FileSystemCapabilities::new()
 346                                .read_text_file(true)
 347                                .write_text_file(true))
 348                            .terminal(true)
 349                            .auth(acp::AuthCapabilities::new().terminal(true))
 350                            // Experimental: Allow for rendering terminal output from the agents
 351                            .meta(acp::Meta::from_iter([
 352                                ("terminal_output".into(), true.into()),
 353                                ("terminal-auth".into(), true.into()),
 354                            ])),
 355                    )
 356                    .client_info(
 357                        acp::Implementation::new("zed", version)
 358                            .title(release_channel.map(ToOwned::to_owned)),
 359                    ),
 360            )
 361            .await?;
 362
 363        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 364            return Err(UnsupportedVersion.into());
 365        }
 366
 367        let telemetry_id = response
 368            .agent_info
 369            // Use the one the agent provides if we have one
 370            .map(|info| info.name.into())
 371            // Otherwise, just use the name
 372            .unwrap_or_else(|| agent_id.0.clone());
 373
 374        let session_list = if response
 375            .agent_capabilities
 376            .session_capabilities
 377            .list
 378            .is_some()
 379        {
 380            let list = Rc::new(AcpSessionList::new(connection.clone()));
 381            *client_session_list.borrow_mut() = Some(list.clone());
 382            Some(list)
 383        } else {
 384            None
 385        };
 386
 387        // TODO: Remove this override once Google team releases their official auth methods
 388        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 389            let mut gemini_args = original_command.args.clone();
 390            gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
 391            let value = serde_json::json!({
 392                "label": "gemini /auth",
 393                "command": original_command.path.to_string_lossy(),
 394                "args": gemini_args,
 395                "env": original_command.env.unwrap_or_default(),
 396            });
 397            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 398            vec![acp::AuthMethod::Agent(
 399                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 400                    .description("Login with your Google or Vertex AI account")
 401                    .meta(meta),
 402            )]
 403        } else {
 404            response.auth_methods
 405        };
 406        Ok(Self {
 407            id: agent_id,
 408            auth_methods,
 409            agent_server_store,
 410            connection,
 411            telemetry_id,
 412            sessions,
 413            pending_sessions: Rc::new(RefCell::new(HashMap::default())),
 414            agent_capabilities: response.agent_capabilities,
 415            default_mode,
 416            default_model,
 417            default_config_options,
 418            session_list,
 419            _io_task: io_task,
 420            _wait_task: wait_task,
 421            _stderr_task: stderr_task,
 422            child: Some(child),
 423        })
 424    }
 425
 426    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 427        &self.agent_capabilities.prompt_capabilities
 428    }
 429
 430    #[cfg(test)]
 431    fn new_for_test(
 432        connection: Rc<acp::ClientSideConnection>,
 433        sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 434        agent_capabilities: acp::AgentCapabilities,
 435        agent_server_store: WeakEntity<AgentServerStore>,
 436        io_task: Task<Result<(), acp::Error>>,
 437        _cx: &mut App,
 438    ) -> Self {
 439        Self {
 440            id: AgentId::new("test"),
 441            telemetry_id: "test".into(),
 442            connection,
 443            sessions,
 444            pending_sessions: Rc::new(RefCell::new(HashMap::default())),
 445            auth_methods: vec![],
 446            agent_server_store,
 447            agent_capabilities,
 448            default_mode: None,
 449            default_model: None,
 450            default_config_options: HashMap::default(),
 451            child: None,
 452            session_list: None,
 453            _io_task: io_task,
 454            _wait_task: Task::ready(Ok(())),
 455            _stderr_task: Task::ready(Ok(())),
 456        }
 457    }
 458
 459    fn open_or_create_session(
 460        self: Rc<Self>,
 461        session_id: acp::SessionId,
 462        project: Entity<Project>,
 463        work_dirs: PathList,
 464        title: Option<SharedString>,
 465        rpc_call: impl FnOnce(
 466            Rc<acp::ClientSideConnection>,
 467            acp::SessionId,
 468            PathBuf,
 469        )
 470            -> futures::future::LocalBoxFuture<'static, Result<SessionConfigResponse>>
 471        + 'static,
 472        cx: &mut App,
 473    ) -> Task<Result<Entity<AcpThread>>> {
 474        if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 475            session.ref_count += 1;
 476            if let Some(thread) = session.thread.upgrade() {
 477                return Task::ready(Ok(thread));
 478            }
 479        }
 480
 481        if let Some(pending) = self.pending_sessions.borrow_mut().get_mut(&session_id) {
 482            pending.ref_count += 1;
 483            let task = pending.task.clone();
 484            return cx
 485                .foreground_executor()
 486                .spawn(async move { task.await.map_err(|err| anyhow!(err)) });
 487        }
 488
 489        // TODO: remove this once ACP supports multiple working directories
 490        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 491            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 492        };
 493
 494        let shared_task = cx
 495            .spawn({
 496                let session_id = session_id.clone();
 497                let this = self.clone();
 498                async move |cx| {
 499                    let action_log = cx.new(|_| ActionLog::new(project.clone()));
 500                    let thread: Entity<AcpThread> = cx.new(|cx| {
 501                        AcpThread::new(
 502                            None,
 503                            title,
 504                            Some(work_dirs),
 505                            this.clone(),
 506                            project,
 507                            action_log,
 508                            session_id.clone(),
 509                            watch::Receiver::constant(
 510                                this.agent_capabilities.prompt_capabilities.clone(),
 511                            ),
 512                            cx,
 513                        )
 514                    });
 515
 516                    let response =
 517                        match rpc_call(this.connection.clone(), session_id.clone(), cwd).await {
 518                            Ok(response) => response,
 519                            Err(err) => {
 520                                this.pending_sessions.borrow_mut().remove(&session_id);
 521                                return Err(Arc::new(err));
 522                            }
 523                        };
 524
 525                    let (modes, models, config_options) =
 526                        config_state(response.modes, response.models, response.config_options);
 527
 528                    if let Some(config_opts) = config_options.as_ref() {
 529                        this.apply_default_config_options(&session_id, config_opts, cx);
 530                    }
 531
 532                    let ref_count = this
 533                        .pending_sessions
 534                        .borrow_mut()
 535                        .remove(&session_id)
 536                        .map_or(1, |pending| pending.ref_count);
 537
 538                    this.sessions.borrow_mut().insert(
 539                        session_id,
 540                        AcpSession {
 541                            thread: thread.downgrade(),
 542                            suppress_abort_err: false,
 543                            session_modes: modes,
 544                            models,
 545                            config_options: config_options.map(ConfigOptions::new),
 546                            ref_count,
 547                        },
 548                    );
 549
 550                    Ok(thread)
 551                }
 552            })
 553            .shared();
 554
 555        self.pending_sessions.borrow_mut().insert(
 556            session_id,
 557            PendingAcpSession {
 558                task: shared_task.clone(),
 559                ref_count: 1,
 560            },
 561        );
 562
 563        cx.foreground_executor()
 564            .spawn(async move { shared_task.await.map_err(|err| anyhow!(err)) })
 565    }
 566
 567    fn apply_default_config_options(
 568        &self,
 569        session_id: &acp::SessionId,
 570        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 571        cx: &mut AsyncApp,
 572    ) {
 573        let id = self.id.clone();
 574        let defaults_to_apply: Vec<_> = {
 575            let config_opts_ref = config_options.borrow();
 576            config_opts_ref
 577                .iter()
 578                .filter_map(|config_option| {
 579                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 580
 581                    let is_valid = match &config_option.kind {
 582                        acp::SessionConfigKind::Select(select) => match &select.options {
 583                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 584                                .iter()
 585                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 586                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 587                                groups.iter().any(|g| {
 588                                    g.options
 589                                        .iter()
 590                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 591                                })
 592                            }
 593                            _ => false,
 594                        },
 595                        _ => false,
 596                    };
 597
 598                    if is_valid {
 599                        let initial_value = match &config_option.kind {
 600                            acp::SessionConfigKind::Select(select) => {
 601                                Some(select.current_value.clone())
 602                            }
 603                            _ => None,
 604                        };
 605                        Some((
 606                            config_option.id.clone(),
 607                            default_value.clone(),
 608                            initial_value,
 609                        ))
 610                    } else {
 611                        log::warn!(
 612                            "`{}` is not a valid value for config option `{}` in {}",
 613                            default_value,
 614                            config_option.id.0,
 615                            id
 616                        );
 617                        None
 618                    }
 619                })
 620                .collect()
 621        };
 622
 623        for (config_id, default_value, initial_value) in defaults_to_apply {
 624            cx.spawn({
 625                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 626                let session_id = session_id.clone();
 627                let config_id_clone = config_id.clone();
 628                let config_opts = config_options.clone();
 629                let conn = self.connection.clone();
 630                async move |_| {
 631                    let result = conn
 632                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 633                            session_id,
 634                            config_id_clone.clone(),
 635                            default_value_id,
 636                        ))
 637                        .await
 638                        .log_err();
 639
 640                    if result.is_none() {
 641                        if let Some(initial) = initial_value {
 642                            let mut opts = config_opts.borrow_mut();
 643                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 644                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 645                                    select.current_value = initial;
 646                                }
 647                            }
 648                        }
 649                    }
 650                }
 651            })
 652            .detach();
 653
 654            let mut opts = config_options.borrow_mut();
 655            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 656                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 657                    select.current_value = acp::SessionConfigValueId::new(default_value);
 658                }
 659            }
 660        }
 661    }
 662}
 663
 664impl Drop for AcpConnection {
 665    fn drop(&mut self) {
 666        if let Some(ref mut child) = self.child {
 667            child.kill().log_err();
 668        }
 669    }
 670}
 671
 672fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 673    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 674}
 675
 676fn terminal_auth_task(
 677    command: &AgentServerCommand,
 678    agent_id: &AgentId,
 679    method: &acp::AuthMethodTerminal,
 680) -> SpawnInTerminal {
 681    acp_thread::build_terminal_auth_task(
 682        terminal_auth_task_id(agent_id, &method.id),
 683        method.name.clone(),
 684        command.path.to_string_lossy().into_owned(),
 685        command.args.clone(),
 686        command.env.clone().unwrap_or_default(),
 687    )
 688}
 689
 690/// Used to support the _meta method prior to stabilization
 691fn meta_terminal_auth_task(
 692    agent_id: &AgentId,
 693    method_id: &acp::AuthMethodId,
 694    method: &acp::AuthMethod,
 695) -> Option<SpawnInTerminal> {
 696    #[derive(Deserialize)]
 697    struct MetaTerminalAuth {
 698        label: String,
 699        command: String,
 700        #[serde(default)]
 701        args: Vec<String>,
 702        #[serde(default)]
 703        env: HashMap<String, String>,
 704    }
 705
 706    let meta = match method {
 707        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 708        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 709        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 710        _ => None,
 711    }?;
 712    let terminal_auth =
 713        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 714
 715    Some(acp_thread::build_terminal_auth_task(
 716        terminal_auth_task_id(agent_id, method_id),
 717        terminal_auth.label.clone(),
 718        terminal_auth.command,
 719        terminal_auth.args,
 720        terminal_auth.env,
 721    ))
 722}
 723
 724impl AgentConnection for AcpConnection {
 725    fn agent_id(&self) -> AgentId {
 726        self.id.clone()
 727    }
 728
 729    fn telemetry_id(&self) -> SharedString {
 730        self.telemetry_id.clone()
 731    }
 732
 733    fn new_session(
 734        self: Rc<Self>,
 735        project: Entity<Project>,
 736        work_dirs: PathList,
 737        cx: &mut App,
 738    ) -> Task<Result<Entity<AcpThread>>> {
 739        // TODO: remove this once ACP supports multiple working directories
 740        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 741            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 742        };
 743        let name = self.id.0.clone();
 744        let mcp_servers = mcp_servers_for_project(&project, cx);
 745
 746        cx.spawn(async move |cx| {
 747            let response = self.connection
 748                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
 749                .await
 750                .map_err(map_acp_error)?;
 751
 752            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
 753
 754            if let Some(default_mode) = self.default_mode.clone() {
 755                if let Some(modes) = modes.as_ref() {
 756                    let mut modes_ref = modes.borrow_mut();
 757                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 758
 759                    if has_mode {
 760                        let initial_mode_id = modes_ref.current_mode_id.clone();
 761
 762                        cx.spawn({
 763                            let default_mode = default_mode.clone();
 764                            let session_id = response.session_id.clone();
 765                            let modes = modes.clone();
 766                            let conn = self.connection.clone();
 767                            async move |_| {
 768                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 769                                .await.log_err();
 770
 771                                if result.is_none() {
 772                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 773                                }
 774                            }
 775                        }).detach();
 776
 777                        modes_ref.current_mode_id = default_mode;
 778                    } else {
 779                        let available_modes = modes_ref
 780                            .available_modes
 781                            .iter()
 782                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 783                            .collect::<Vec<_>>()
 784                            .join("\n");
 785
 786                        log::warn!(
 787                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 788                        );
 789                    }
 790                }
 791            }
 792
 793            if let Some(default_model) = self.default_model.clone() {
 794                if let Some(models) = models.as_ref() {
 795                    let mut models_ref = models.borrow_mut();
 796                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 797
 798                    if has_model {
 799                        let initial_model_id = models_ref.current_model_id.clone();
 800
 801                        cx.spawn({
 802                            let default_model = default_model.clone();
 803                            let session_id = response.session_id.clone();
 804                            let models = models.clone();
 805                            let conn = self.connection.clone();
 806                            async move |_| {
 807                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 808                                .await.log_err();
 809
 810                                if result.is_none() {
 811                                    models.borrow_mut().current_model_id = initial_model_id;
 812                                }
 813                            }
 814                        }).detach();
 815
 816                        models_ref.current_model_id = default_model;
 817                    } else {
 818                        let available_models = models_ref
 819                            .available_models
 820                            .iter()
 821                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 822                            .collect::<Vec<_>>()
 823                            .join("\n");
 824
 825                        log::warn!(
 826                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 827                        );
 828                    }
 829                }
 830            }
 831
 832            if let Some(config_opts) = config_options.as_ref() {
 833                self.apply_default_config_options(&response.session_id, config_opts, cx);
 834            }
 835
 836            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 837            let thread: Entity<AcpThread> = cx.new(|cx| {
 838                AcpThread::new(
 839                    None,
 840                    None,
 841                    Some(work_dirs),
 842                    self.clone(),
 843                    project,
 844                    action_log,
 845                    response.session_id.clone(),
 846                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 847                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 848                    cx,
 849                )
 850            });
 851
 852            self.sessions.borrow_mut().insert(
 853                response.session_id,
 854                AcpSession {
 855                    thread: thread.downgrade(),
 856                    suppress_abort_err: false,
 857                    session_modes: modes,
 858                    models,
 859                    config_options: config_options.map(ConfigOptions::new),
 860                    ref_count: 1,
 861                },
 862            );
 863
 864            Ok(thread)
 865        })
 866    }
 867
 868    fn supports_load_session(&self) -> bool {
 869        self.agent_capabilities.load_session
 870    }
 871
 872    fn supports_resume_session(&self) -> bool {
 873        self.agent_capabilities
 874            .session_capabilities
 875            .resume
 876            .is_some()
 877    }
 878
 879    fn load_session(
 880        self: Rc<Self>,
 881        session_id: acp::SessionId,
 882        project: Entity<Project>,
 883        work_dirs: PathList,
 884        title: Option<SharedString>,
 885        cx: &mut App,
 886    ) -> Task<Result<Entity<AcpThread>>> {
 887        if !self.agent_capabilities.load_session {
 888            return Task::ready(Err(anyhow!(LoadError::Other(
 889                "Loading sessions is not supported by this agent.".into()
 890            ))));
 891        }
 892
 893        let mcp_servers = mcp_servers_for_project(&project, cx);
 894        self.open_or_create_session(
 895            session_id,
 896            project,
 897            work_dirs,
 898            title,
 899            move |connection, session_id, cwd| {
 900                Box::pin(async move {
 901                    let response = connection
 902                        .load_session(
 903                            acp::LoadSessionRequest::new(session_id, cwd).mcp_servers(mcp_servers),
 904                        )
 905                        .await
 906                        .map_err(map_acp_error)?;
 907                    Ok(SessionConfigResponse {
 908                        modes: response.modes,
 909                        models: response.models,
 910                        config_options: response.config_options,
 911                    })
 912                })
 913            },
 914            cx,
 915        )
 916    }
 917
 918    fn resume_session(
 919        self: Rc<Self>,
 920        session_id: acp::SessionId,
 921        project: Entity<Project>,
 922        work_dirs: PathList,
 923        title: Option<SharedString>,
 924        cx: &mut App,
 925    ) -> Task<Result<Entity<AcpThread>>> {
 926        if self
 927            .agent_capabilities
 928            .session_capabilities
 929            .resume
 930            .is_none()
 931        {
 932            return Task::ready(Err(anyhow!(LoadError::Other(
 933                "Resuming sessions is not supported by this agent.".into()
 934            ))));
 935        }
 936
 937        let mcp_servers = mcp_servers_for_project(&project, cx);
 938        self.open_or_create_session(
 939            session_id,
 940            project,
 941            work_dirs,
 942            title,
 943            move |connection, session_id, cwd| {
 944                Box::pin(async move {
 945                    let response = connection
 946                        .resume_session(
 947                            acp::ResumeSessionRequest::new(session_id, cwd)
 948                                .mcp_servers(mcp_servers),
 949                        )
 950                        .await
 951                        .map_err(map_acp_error)?;
 952                    Ok(SessionConfigResponse {
 953                        modes: response.modes,
 954                        models: response.models,
 955                        config_options: response.config_options,
 956                    })
 957                })
 958            },
 959            cx,
 960        )
 961    }
 962
 963    fn supports_close_session(&self) -> bool {
 964        self.agent_capabilities.session_capabilities.close.is_some()
 965    }
 966
 967    fn close_session(
 968        self: Rc<Self>,
 969        session_id: &acp::SessionId,
 970        cx: &mut App,
 971    ) -> Task<Result<()>> {
 972        if !self.supports_close_session() {
 973            return Task::ready(Err(anyhow!(LoadError::Other(
 974                "Closing sessions is not supported by this agent.".into()
 975            ))));
 976        }
 977
 978        let mut sessions = self.sessions.borrow_mut();
 979        let Some(session) = sessions.get_mut(session_id) else {
 980            return Task::ready(Ok(()));
 981        };
 982
 983        session.ref_count -= 1;
 984        if session.ref_count > 0 {
 985            return Task::ready(Ok(()));
 986        }
 987
 988        sessions.remove(session_id);
 989        drop(sessions);
 990
 991        let conn = self.connection.clone();
 992        let session_id = session_id.clone();
 993        cx.foreground_executor().spawn(async move {
 994            conn.close_session(acp::CloseSessionRequest::new(session_id))
 995                .await?;
 996            Ok(())
 997        })
 998    }
 999
1000    fn auth_methods(&self) -> &[acp::AuthMethod] {
1001        &self.auth_methods
1002    }
1003
1004    fn terminal_auth_task(
1005        &self,
1006        method_id: &acp::AuthMethodId,
1007        cx: &App,
1008    ) -> Option<Task<Result<SpawnInTerminal>>> {
1009        let method = self
1010            .auth_methods
1011            .iter()
1012            .find(|method| method.id() == method_id)?;
1013
1014        match method {
1015            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1016                let agent_id = self.id.clone();
1017                let terminal = terminal.clone();
1018                let store = self.agent_server_store.clone();
1019                Some(cx.spawn(async move |cx| {
1020                    let command = store
1021                        .update(cx, |store, cx| {
1022                            let agent = store
1023                                .get_external_agent(&agent_id)
1024                                .context("Agent server not found")?;
1025                            anyhow::Ok(agent.get_command(
1026                                terminal.args.clone(),
1027                                HashMap::from_iter(terminal.env.clone()),
1028                                &mut cx.to_async(),
1029                            ))
1030                        })?
1031                        .context("Failed to get agent command")?
1032                        .await?;
1033                    Ok(terminal_auth_task(&command, &agent_id, &terminal))
1034                }))
1035            }
1036            _ => meta_terminal_auth_task(&self.id, method_id, method)
1037                .map(|task| Task::ready(Ok(task))),
1038        }
1039    }
1040
1041    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1042        let conn = self.connection.clone();
1043        cx.foreground_executor().spawn(async move {
1044            conn.authenticate(acp::AuthenticateRequest::new(method_id))
1045                .await?;
1046            Ok(())
1047        })
1048    }
1049
1050    fn prompt(
1051        &self,
1052        _id: acp_thread::UserMessageId,
1053        params: acp::PromptRequest,
1054        cx: &mut App,
1055    ) -> Task<Result<acp::PromptResponse>> {
1056        let conn = self.connection.clone();
1057        let sessions = self.sessions.clone();
1058        let session_id = params.session_id.clone();
1059        cx.foreground_executor().spawn(async move {
1060            let result = conn.prompt(params).await;
1061
1062            let mut suppress_abort_err = false;
1063
1064            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1065                suppress_abort_err = session.suppress_abort_err;
1066                session.suppress_abort_err = false;
1067            }
1068
1069            match result {
1070                Ok(response) => Ok(response),
1071                Err(err) => {
1072                    if err.code == acp::ErrorCode::AuthRequired {
1073                        return Err(anyhow!(acp::Error::auth_required()));
1074                    }
1075
1076                    if err.code != ErrorCode::InternalError {
1077                        anyhow::bail!(err)
1078                    }
1079
1080                    let Some(data) = &err.data else {
1081                        anyhow::bail!(err)
1082                    };
1083
1084                    // Temporary workaround until the following PR is generally available:
1085                    // https://github.com/google-gemini/gemini-cli/pull/6656
1086
1087                    #[derive(Deserialize)]
1088                    #[serde(deny_unknown_fields)]
1089                    struct ErrorDetails {
1090                        details: Box<str>,
1091                    }
1092
1093                    match serde_json::from_value(data.clone()) {
1094                        Ok(ErrorDetails { details }) => {
1095                            if suppress_abort_err
1096                                && (details.contains("This operation was aborted")
1097                                    || details.contains("The user aborted a request"))
1098                            {
1099                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1100                            } else {
1101                                Err(anyhow!(details))
1102                            }
1103                        }
1104                        Err(_) => Err(anyhow!(err)),
1105                    }
1106                }
1107            }
1108        })
1109    }
1110
1111    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1112        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1113            session.suppress_abort_err = true;
1114        }
1115        let conn = self.connection.clone();
1116        let params = acp::CancelNotification::new(session_id.clone());
1117        cx.foreground_executor()
1118            .spawn(async move { conn.cancel(params).await })
1119            .detach();
1120    }
1121
1122    fn session_modes(
1123        &self,
1124        session_id: &acp::SessionId,
1125        _cx: &App,
1126    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1127        let sessions = self.sessions.clone();
1128        let sessions_ref = sessions.borrow();
1129        let Some(session) = sessions_ref.get(session_id) else {
1130            return None;
1131        };
1132
1133        if let Some(modes) = session.session_modes.as_ref() {
1134            Some(Rc::new(AcpSessionModes {
1135                connection: self.connection.clone(),
1136                session_id: session_id.clone(),
1137                state: modes.clone(),
1138            }) as _)
1139        } else {
1140            None
1141        }
1142    }
1143
1144    fn model_selector(
1145        &self,
1146        session_id: &acp::SessionId,
1147    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1148        let sessions = self.sessions.clone();
1149        let sessions_ref = sessions.borrow();
1150        let Some(session) = sessions_ref.get(session_id) else {
1151            return None;
1152        };
1153
1154        if let Some(models) = session.models.as_ref() {
1155            Some(Rc::new(AcpModelSelector::new(
1156                session_id.clone(),
1157                self.connection.clone(),
1158                models.clone(),
1159            )) as _)
1160        } else {
1161            None
1162        }
1163    }
1164
1165    fn session_config_options(
1166        &self,
1167        session_id: &acp::SessionId,
1168        _cx: &App,
1169    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1170        let sessions = self.sessions.borrow();
1171        let session = sessions.get(session_id)?;
1172
1173        let config_opts = session.config_options.as_ref()?;
1174
1175        Some(Rc::new(AcpSessionConfigOptions {
1176            session_id: session_id.clone(),
1177            connection: self.connection.clone(),
1178            state: config_opts.config_options.clone(),
1179            watch_tx: config_opts.tx.clone(),
1180            watch_rx: config_opts.rx.clone(),
1181        }) as _)
1182    }
1183
1184    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1185        self.session_list.clone().map(|s| s as _)
1186    }
1187
1188    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1189        self
1190    }
1191}
1192
1193fn map_acp_error(err: acp::Error) -> anyhow::Error {
1194    if err.code == acp::ErrorCode::AuthRequired {
1195        let mut error = AuthRequired::new();
1196
1197        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1198            error = error.with_description(err.message);
1199        }
1200
1201        anyhow!(error)
1202    } else {
1203        anyhow!(err)
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use std::sync::atomic::{AtomicUsize, Ordering};
1210
1211    use super::*;
1212
1213    #[test]
1214    fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1215        let command = AgentServerCommand {
1216            path: "/path/to/agent".into(),
1217            args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1218            env: Some(HashMap::from_iter([
1219                ("BASE".into(), "1".into()),
1220                ("SHARED".into(), "override".into()),
1221                ("EXTRA".into(), "2".into()),
1222            ])),
1223        };
1224        let method = acp::AuthMethodTerminal::new("login", "Login");
1225
1226        let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1227
1228        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1229        assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1230        assert_eq!(
1231            task.env,
1232            HashMap::from_iter([
1233                ("BASE".into(), "1".into()),
1234                ("SHARED".into(), "override".into()),
1235                ("EXTRA".into(), "2".into()),
1236            ])
1237        );
1238        assert_eq!(task.label, "Login");
1239        assert_eq!(task.command_label, "Login");
1240    }
1241
1242    #[test]
1243    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1244        let method_id = acp::AuthMethodId::new("legacy-login");
1245        let method = acp::AuthMethod::Agent(
1246            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1247                "terminal-auth".to_string(),
1248                serde_json::json!({
1249                    "label": "legacy /auth",
1250                    "command": "legacy-agent",
1251                    "args": ["auth", "--interactive"],
1252                    "env": {
1253                        "AUTH_MODE": "interactive",
1254                    },
1255                }),
1256            )])),
1257        );
1258
1259        let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1260            .expect("expected legacy terminal auth task");
1261
1262        assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1263        assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1264        assert_eq!(task.args, vec!["auth", "--interactive"]);
1265        assert_eq!(
1266            task.env,
1267            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1268        );
1269        assert_eq!(task.label, "legacy /auth");
1270    }
1271
1272    #[test]
1273    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1274        let method_id = acp::AuthMethodId::new("legacy-login");
1275        let method = acp::AuthMethod::Agent(
1276            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1277                "terminal-auth".to_string(),
1278                serde_json::json!({
1279                    "label": "legacy /auth",
1280                }),
1281            )])),
1282        );
1283
1284        assert!(
1285            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1286        );
1287    }
1288
1289    #[test]
1290    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1291        let method_id = acp::AuthMethodId::new("login");
1292        let method = acp::AuthMethod::Terminal(
1293            acp::AuthMethodTerminal::new(method_id, "Login")
1294                .args(vec!["/auth".into()])
1295                .env(std::collections::HashMap::from_iter([(
1296                    "AUTH_MODE".into(),
1297                    "first-class".into(),
1298                )]))
1299                .meta(acp::Meta::from_iter([(
1300                    "terminal-auth".to_string(),
1301                    serde_json::json!({
1302                        "label": "legacy /auth",
1303                        "command": "legacy-agent",
1304                        "args": ["legacy-auth"],
1305                        "env": {
1306                            "AUTH_MODE": "legacy",
1307                        },
1308                    }),
1309                )])),
1310        );
1311
1312        let command = AgentServerCommand {
1313            path: "/path/to/agent".into(),
1314            args: vec!["--acp".into(), "/auth".into()],
1315            env: Some(HashMap::from_iter([
1316                ("BASE".into(), "1".into()),
1317                ("AUTH_MODE".into(), "first-class".into()),
1318            ])),
1319        };
1320
1321        let task = match &method {
1322            acp::AuthMethod::Terminal(terminal) => {
1323                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1324            }
1325            _ => unreachable!(),
1326        };
1327
1328        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1329        assert_eq!(task.args, vec!["--acp", "/auth"]);
1330        assert_eq!(
1331            task.env,
1332            HashMap::from_iter([
1333                ("BASE".into(), "1".into()),
1334                ("AUTH_MODE".into(), "first-class".into()),
1335            ])
1336        );
1337        assert_eq!(task.label, "Login");
1338    }
1339
1340    struct FakeAcpAgent {
1341        load_session_count: Arc<AtomicUsize>,
1342        close_session_count: Arc<AtomicUsize>,
1343    }
1344
1345    #[async_trait::async_trait(?Send)]
1346    impl acp::Agent for FakeAcpAgent {
1347        async fn initialize(
1348            &self,
1349            args: acp::InitializeRequest,
1350        ) -> acp::Result<acp::InitializeResponse> {
1351            Ok(
1352                acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
1353                    acp::AgentCapabilities::default()
1354                        .load_session(true)
1355                        .session_capabilities(
1356                            acp::SessionCapabilities::default()
1357                                .close(acp::SessionCloseCapabilities::new()),
1358                        ),
1359                ),
1360            )
1361        }
1362
1363        async fn authenticate(
1364            &self,
1365            _: acp::AuthenticateRequest,
1366        ) -> acp::Result<acp::AuthenticateResponse> {
1367            Ok(Default::default())
1368        }
1369
1370        async fn new_session(
1371            &self,
1372            _: acp::NewSessionRequest,
1373        ) -> acp::Result<acp::NewSessionResponse> {
1374            Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1375        }
1376
1377        async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
1378            Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
1379        }
1380
1381        async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
1382            Ok(())
1383        }
1384
1385        async fn load_session(
1386            &self,
1387            _: acp::LoadSessionRequest,
1388        ) -> acp::Result<acp::LoadSessionResponse> {
1389            self.load_session_count.fetch_add(1, Ordering::SeqCst);
1390            Ok(acp::LoadSessionResponse::new())
1391        }
1392
1393        async fn close_session(
1394            &self,
1395            _: acp::CloseSessionRequest,
1396        ) -> acp::Result<acp::CloseSessionResponse> {
1397            self.close_session_count.fetch_add(1, Ordering::SeqCst);
1398            Ok(acp::CloseSessionResponse::new())
1399        }
1400    }
1401
1402    async fn connect_fake_agent(
1403        cx: &mut gpui::TestAppContext,
1404    ) -> (
1405        Rc<AcpConnection>,
1406        Entity<project::Project>,
1407        Arc<AtomicUsize>,
1408        Arc<AtomicUsize>,
1409        Task<anyhow::Result<()>>,
1410    ) {
1411        cx.update(|cx| {
1412            let store = settings::SettingsStore::test(cx);
1413            cx.set_global(store);
1414        });
1415
1416        let fs = fs::FakeFs::new(cx.executor());
1417        fs.insert_tree("/", serde_json::json!({ "a": {} })).await;
1418        let project = project::Project::test(fs, [std::path::Path::new("/a")], cx).await;
1419
1420        let load_count = Arc::new(AtomicUsize::new(0));
1421        let close_count = Arc::new(AtomicUsize::new(0));
1422
1423        let (c2a_reader, c2a_writer) = piper::pipe(4096);
1424        let (a2c_reader, a2c_writer) = piper::pipe(4096);
1425
1426        let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1427            Rc::new(RefCell::new(HashMap::default()));
1428        let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1429            Rc::new(RefCell::new(None));
1430
1431        let foreground = cx.foreground_executor().clone();
1432
1433        let client_delegate = ClientDelegate {
1434            sessions: sessions.clone(),
1435            session_list: session_list_container,
1436            cx: cx.to_async(),
1437        };
1438
1439        let (client_conn, client_io_task) =
1440            acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
1441                let foreground = foreground.clone();
1442                move |fut| {
1443                    foreground.spawn(fut).detach();
1444                }
1445            });
1446
1447        let fake_agent = FakeAcpAgent {
1448            load_session_count: load_count.clone(),
1449            close_session_count: close_count.clone(),
1450        };
1451
1452        let (_, agent_io_task) =
1453            acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
1454                let foreground = foreground.clone();
1455                move |fut| {
1456                    foreground.spawn(fut).detach();
1457                }
1458            });
1459
1460        let client_io_task = cx.background_spawn(client_io_task);
1461        let agent_io_task = cx.background_spawn(agent_io_task);
1462
1463        let response = client_conn
1464            .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
1465            .await
1466            .expect("failed to initialize ACP connection");
1467
1468        let agent_capabilities = response.agent_capabilities;
1469
1470        let agent_server_store =
1471            project.read_with(cx, |project, _| project.agent_server_store().downgrade());
1472
1473        let connection = cx.update(|cx| {
1474            AcpConnection::new_for_test(
1475                Rc::new(client_conn),
1476                sessions,
1477                agent_capabilities,
1478                agent_server_store,
1479                client_io_task,
1480                cx,
1481            )
1482        });
1483
1484        let keep_agent_alive = cx.background_spawn(async move {
1485            agent_io_task.await.ok();
1486            anyhow::Ok(())
1487        });
1488
1489        (
1490            Rc::new(connection),
1491            project,
1492            load_count,
1493            close_count,
1494            keep_agent_alive,
1495        )
1496    }
1497
1498    #[gpui::test]
1499    async fn test_loaded_sessions_keep_state_until_last_close(cx: &mut gpui::TestAppContext) {
1500        let (connection, project, load_count, close_count, _keep_agent_alive) =
1501            connect_fake_agent(cx).await;
1502
1503        let session_id = acp::SessionId::new("session-1");
1504        let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
1505
1506        // Load the same session twice concurrently — the second call should join
1507        // the pending task rather than issuing a second ACP load_session RPC.
1508        let first_load = cx.update(|cx| {
1509            connection.clone().load_session(
1510                session_id.clone(),
1511                project.clone(),
1512                work_dirs.clone(),
1513                None,
1514                cx,
1515            )
1516        });
1517        let second_load = cx.update(|cx| {
1518            connection.clone().load_session(
1519                session_id.clone(),
1520                project.clone(),
1521                work_dirs.clone(),
1522                None,
1523                cx,
1524            )
1525        });
1526
1527        let first_thread = first_load.await.expect("first load failed");
1528        let second_thread = second_load.await.expect("second load failed");
1529        cx.run_until_parked();
1530
1531        assert_eq!(
1532            first_thread.entity_id(),
1533            second_thread.entity_id(),
1534            "concurrent loads for the same session should share one AcpThread"
1535        );
1536        assert_eq!(
1537            load_count.load(Ordering::SeqCst),
1538            1,
1539            "underlying ACP load_session should be called exactly once for concurrent loads"
1540        );
1541
1542        // The session has ref_count 2. The first close should not send the ACP
1543        // close_session RPC — the session is still referenced.
1544        cx.update(|cx| connection.clone().close_session(&session_id, cx))
1545            .await
1546            .expect("first close failed");
1547
1548        assert_eq!(
1549            close_count.load(Ordering::SeqCst),
1550            0,
1551            "ACP close_session should not be sent while ref_count > 0"
1552        );
1553        assert!(
1554            connection.sessions.borrow().contains_key(&session_id),
1555            "session should still be tracked after first close"
1556        );
1557
1558        // The second close drops ref_count to 0 — now the ACP RPC must be sent.
1559        cx.update(|cx| connection.clone().close_session(&session_id, cx))
1560            .await
1561            .expect("second close failed");
1562        cx.run_until_parked();
1563
1564        assert_eq!(
1565            close_count.load(Ordering::SeqCst),
1566            1,
1567            "ACP close_session should be sent exactly once when ref_count reaches 0"
1568        );
1569        assert!(
1570            !connection.sessions.borrow().contains_key(&session_id),
1571            "session should be removed after final close"
1572        );
1573    }
1574}
1575
1576fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1577    let context_server_store = project.read(cx).context_server_store().read(cx);
1578    let is_local = project.read(cx).is_local();
1579    context_server_store
1580        .configured_server_ids()
1581        .iter()
1582        .filter_map(|id| {
1583            let configuration = context_server_store.configuration_for_server(id)?;
1584            match &*configuration {
1585                project::context_server_store::ContextServerConfiguration::Custom {
1586                    command,
1587                    remote,
1588                    ..
1589                }
1590                | project::context_server_store::ContextServerConfiguration::Extension {
1591                    command,
1592                    remote,
1593                    ..
1594                } if is_local || *remote => Some(acp::McpServer::Stdio(
1595                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
1596                        .args(command.args.clone())
1597                        .env(if let Some(env) = command.env.as_ref() {
1598                            env.iter()
1599                                .map(|(name, value)| acp::EnvVariable::new(name, value))
1600                                .collect()
1601                        } else {
1602                            vec![]
1603                        }),
1604                )),
1605                project::context_server_store::ContextServerConfiguration::Http {
1606                    url,
1607                    headers,
1608                    timeout: _,
1609                } => Some(acp::McpServer::Http(
1610                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1611                        headers
1612                            .iter()
1613                            .map(|(name, value)| acp::HttpHeader::new(name, value))
1614                            .collect(),
1615                    ),
1616                )),
1617                _ => None,
1618            }
1619        })
1620        .collect()
1621}
1622
1623fn config_state(
1624    modes: Option<acp::SessionModeState>,
1625    models: Option<acp::SessionModelState>,
1626    config_options: Option<Vec<acp::SessionConfigOption>>,
1627) -> (
1628    Option<Rc<RefCell<acp::SessionModeState>>>,
1629    Option<Rc<RefCell<acp::SessionModelState>>>,
1630    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1631) {
1632    if let Some(opts) = config_options {
1633        return (None, None, Some(Rc::new(RefCell::new(opts))));
1634    }
1635
1636    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1637    let models = models.map(|models| Rc::new(RefCell::new(models)));
1638    (modes, models, None)
1639}
1640
1641struct AcpSessionModes {
1642    session_id: acp::SessionId,
1643    connection: Rc<acp::ClientSideConnection>,
1644    state: Rc<RefCell<acp::SessionModeState>>,
1645}
1646
1647impl acp_thread::AgentSessionModes for AcpSessionModes {
1648    fn current_mode(&self) -> acp::SessionModeId {
1649        self.state.borrow().current_mode_id.clone()
1650    }
1651
1652    fn all_modes(&self) -> Vec<acp::SessionMode> {
1653        self.state.borrow().available_modes.clone()
1654    }
1655
1656    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1657        let connection = self.connection.clone();
1658        let session_id = self.session_id.clone();
1659        let old_mode_id;
1660        {
1661            let mut state = self.state.borrow_mut();
1662            old_mode_id = state.current_mode_id.clone();
1663            state.current_mode_id = mode_id.clone();
1664        };
1665        let state = self.state.clone();
1666        cx.foreground_executor().spawn(async move {
1667            let result = connection
1668                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1669                .await;
1670
1671            if result.is_err() {
1672                state.borrow_mut().current_mode_id = old_mode_id;
1673            }
1674
1675            result?;
1676
1677            Ok(())
1678        })
1679    }
1680}
1681
1682struct AcpModelSelector {
1683    session_id: acp::SessionId,
1684    connection: Rc<acp::ClientSideConnection>,
1685    state: Rc<RefCell<acp::SessionModelState>>,
1686}
1687
1688impl AcpModelSelector {
1689    fn new(
1690        session_id: acp::SessionId,
1691        connection: Rc<acp::ClientSideConnection>,
1692        state: Rc<RefCell<acp::SessionModelState>>,
1693    ) -> Self {
1694        Self {
1695            session_id,
1696            connection,
1697            state,
1698        }
1699    }
1700}
1701
1702impl acp_thread::AgentModelSelector for AcpModelSelector {
1703    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1704        Task::ready(Ok(acp_thread::AgentModelList::Flat(
1705            self.state
1706                .borrow()
1707                .available_models
1708                .clone()
1709                .into_iter()
1710                .map(acp_thread::AgentModelInfo::from)
1711                .collect(),
1712        )))
1713    }
1714
1715    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1716        let connection = self.connection.clone();
1717        let session_id = self.session_id.clone();
1718        let old_model_id;
1719        {
1720            let mut state = self.state.borrow_mut();
1721            old_model_id = state.current_model_id.clone();
1722            state.current_model_id = model_id.clone();
1723        };
1724        let state = self.state.clone();
1725        cx.foreground_executor().spawn(async move {
1726            let result = connection
1727                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1728                .await;
1729
1730            if result.is_err() {
1731                state.borrow_mut().current_model_id = old_model_id;
1732            }
1733
1734            result?;
1735
1736            Ok(())
1737        })
1738    }
1739
1740    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1741        let state = self.state.borrow();
1742        Task::ready(
1743            state
1744                .available_models
1745                .iter()
1746                .find(|m| m.model_id == state.current_model_id)
1747                .cloned()
1748                .map(acp_thread::AgentModelInfo::from)
1749                .ok_or_else(|| anyhow::anyhow!("Model not found")),
1750        )
1751    }
1752}
1753
1754struct AcpSessionConfigOptions {
1755    session_id: acp::SessionId,
1756    connection: Rc<acp::ClientSideConnection>,
1757    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1758    watch_tx: Rc<RefCell<watch::Sender<()>>>,
1759    watch_rx: watch::Receiver<()>,
1760}
1761
1762impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1763    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1764        self.state.borrow().clone()
1765    }
1766
1767    fn set_config_option(
1768        &self,
1769        config_id: acp::SessionConfigId,
1770        value: acp::SessionConfigValueId,
1771        cx: &mut App,
1772    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1773        let connection = self.connection.clone();
1774        let session_id = self.session_id.clone();
1775        let state = self.state.clone();
1776
1777        let watch_tx = self.watch_tx.clone();
1778
1779        cx.foreground_executor().spawn(async move {
1780            let response = connection
1781                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1782                    session_id, config_id, value,
1783                ))
1784                .await?;
1785
1786            *state.borrow_mut() = response.config_options.clone();
1787            watch_tx.borrow_mut().send(()).ok();
1788            Ok(response.config_options)
1789        })
1790    }
1791
1792    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1793        Some(self.watch_rx.clone())
1794    }
1795}
1796
1797struct ClientDelegate {
1798    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1799    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1800    cx: AsyncApp,
1801}
1802
1803#[async_trait::async_trait(?Send)]
1804impl acp::Client for ClientDelegate {
1805    async fn request_permission(
1806        &self,
1807        arguments: acp::RequestPermissionRequest,
1808    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1809        let thread;
1810        {
1811            let sessions_ref = self.sessions.borrow();
1812            let session = sessions_ref
1813                .get(&arguments.session_id)
1814                .context("Failed to get session")?;
1815            thread = session.thread.clone();
1816        }
1817
1818        let cx = &mut self.cx.clone();
1819
1820        let task = thread.update(cx, |thread, cx| {
1821            thread.request_tool_call_authorization(
1822                arguments.tool_call,
1823                acp_thread::PermissionOptions::Flat(arguments.options),
1824                cx,
1825            )
1826        })??;
1827
1828        let outcome = task.await;
1829
1830        Ok(acp::RequestPermissionResponse::new(outcome.into()))
1831    }
1832
1833    async fn write_text_file(
1834        &self,
1835        arguments: acp::WriteTextFileRequest,
1836    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1837        let cx = &mut self.cx.clone();
1838        let task = self
1839            .session_thread(&arguments.session_id)?
1840            .update(cx, |thread, cx| {
1841                thread.write_text_file(arguments.path, arguments.content, cx)
1842            })?;
1843
1844        task.await?;
1845
1846        Ok(Default::default())
1847    }
1848
1849    async fn read_text_file(
1850        &self,
1851        arguments: acp::ReadTextFileRequest,
1852    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1853        let task = self.session_thread(&arguments.session_id)?.update(
1854            &mut self.cx.clone(),
1855            |thread, cx| {
1856                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1857            },
1858        )?;
1859
1860        let content = task.await?;
1861
1862        Ok(acp::ReadTextFileResponse::new(content))
1863    }
1864
1865    async fn session_notification(
1866        &self,
1867        notification: acp::SessionNotification,
1868    ) -> Result<(), acp::Error> {
1869        let sessions = self.sessions.borrow();
1870        let session = sessions
1871            .get(&notification.session_id)
1872            .context("Failed to get session")?;
1873
1874        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1875            current_mode_id,
1876            ..
1877        }) = &notification.update
1878        {
1879            if let Some(session_modes) = &session.session_modes {
1880                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1881            }
1882        }
1883
1884        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1885            config_options,
1886            ..
1887        }) = &notification.update
1888        {
1889            if let Some(opts) = &session.config_options {
1890                *opts.config_options.borrow_mut() = config_options.clone();
1891                opts.tx.borrow_mut().send(()).ok();
1892            }
1893        }
1894
1895        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
1896            && let Some(session_list) = self.session_list.borrow().as_ref()
1897        {
1898            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1899        }
1900
1901        // Clone so we can inspect meta both before and after handing off to the thread
1902        let update_clone = notification.update.clone();
1903
1904        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1905        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1906            if let Some(meta) = &tc.meta {
1907                if let Some(terminal_info) = meta.get("terminal_info") {
1908                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1909                    {
1910                        let terminal_id = acp::TerminalId::new(id_str);
1911                        let cwd = terminal_info
1912                            .get("cwd")
1913                            .and_then(|v| v.as_str().map(PathBuf::from));
1914
1915                        // Create a minimal display-only lower-level terminal and register it.
1916                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1917                            let builder = TerminalBuilder::new_display_only(
1918                                CursorShape::default(),
1919                                AlternateScroll::On,
1920                                None,
1921                                0,
1922                                cx.background_executor(),
1923                                thread.project().read(cx).path_style(cx),
1924                            )?;
1925                            let lower = cx.new(|cx| builder.subscribe(cx));
1926                            thread.on_terminal_provider_event(
1927                                TerminalProviderEvent::Created {
1928                                    terminal_id,
1929                                    label: tc.title.clone(),
1930                                    cwd,
1931                                    output_byte_limit: None,
1932                                    terminal: lower,
1933                                },
1934                                cx,
1935                            );
1936                            anyhow::Ok(())
1937                        });
1938                    }
1939                }
1940            }
1941        }
1942
1943        // Forward the update to the acp_thread as usual.
1944        session.thread.update(&mut self.cx.clone(), |thread, cx| {
1945            thread.handle_session_update(notification.update.clone(), cx)
1946        })??;
1947
1948        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1949        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1950            if let Some(meta) = &tcu.meta {
1951                if let Some(term_out) = meta.get("terminal_output") {
1952                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1953                        let terminal_id = acp::TerminalId::new(id_str);
1954                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1955                            let data = s.as_bytes().to_vec();
1956                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1957                                thread.on_terminal_provider_event(
1958                                    TerminalProviderEvent::Output { terminal_id, data },
1959                                    cx,
1960                                );
1961                            });
1962                        }
1963                    }
1964                }
1965
1966                // terminal_exit
1967                if let Some(term_exit) = meta.get("terminal_exit") {
1968                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1969                        let terminal_id = acp::TerminalId::new(id_str);
1970                        let status = acp::TerminalExitStatus::new()
1971                            .exit_code(
1972                                term_exit
1973                                    .get("exit_code")
1974                                    .and_then(|v| v.as_u64())
1975                                    .map(|i| i as u32),
1976                            )
1977                            .signal(
1978                                term_exit
1979                                    .get("signal")
1980                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
1981                            );
1982
1983                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1984                            thread.on_terminal_provider_event(
1985                                TerminalProviderEvent::Exit {
1986                                    terminal_id,
1987                                    status,
1988                                },
1989                                cx,
1990                            );
1991                        });
1992                    }
1993                }
1994            }
1995        }
1996
1997        Ok(())
1998    }
1999
2000    async fn create_terminal(
2001        &self,
2002        args: acp::CreateTerminalRequest,
2003    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
2004        let thread = self.session_thread(&args.session_id)?;
2005        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
2006
2007        let terminal_entity = acp_thread::create_terminal_entity(
2008            args.command.clone(),
2009            &args.args,
2010            args.env
2011                .into_iter()
2012                .map(|env| (env.name, env.value))
2013                .collect(),
2014            args.cwd.clone(),
2015            &project,
2016            &mut self.cx.clone(),
2017        )
2018        .await?;
2019
2020        // Register with renderer
2021        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
2022            thread.register_terminal_created(
2023                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
2024                format!("{} {}", args.command, args.args.join(" ")),
2025                args.cwd.clone(),
2026                args.output_byte_limit,
2027                terminal_entity,
2028                cx,
2029            )
2030        })?;
2031        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
2032        Ok(acp::CreateTerminalResponse::new(terminal_id))
2033    }
2034
2035    async fn kill_terminal(
2036        &self,
2037        args: acp::KillTerminalRequest,
2038    ) -> Result<acp::KillTerminalResponse, acp::Error> {
2039        self.session_thread(&args.session_id)?
2040            .update(&mut self.cx.clone(), |thread, cx| {
2041                thread.kill_terminal(args.terminal_id, cx)
2042            })??;
2043
2044        Ok(Default::default())
2045    }
2046
2047    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
2048        Err(acp::Error::method_not_found())
2049    }
2050
2051    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
2052        Err(acp::Error::method_not_found())
2053    }
2054
2055    async fn release_terminal(
2056        &self,
2057        args: acp::ReleaseTerminalRequest,
2058    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
2059        self.session_thread(&args.session_id)?
2060            .update(&mut self.cx.clone(), |thread, cx| {
2061                thread.release_terminal(args.terminal_id, cx)
2062            })??;
2063
2064        Ok(Default::default())
2065    }
2066
2067    async fn terminal_output(
2068        &self,
2069        args: acp::TerminalOutputRequest,
2070    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
2071        self.session_thread(&args.session_id)?
2072            .read_with(&mut self.cx.clone(), |thread, cx| {
2073                let out = thread
2074                    .terminal(args.terminal_id)?
2075                    .read(cx)
2076                    .current_output(cx);
2077
2078                Ok(out)
2079            })?
2080    }
2081
2082    async fn wait_for_terminal_exit(
2083        &self,
2084        args: acp::WaitForTerminalExitRequest,
2085    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
2086        let exit_status = self
2087            .session_thread(&args.session_id)?
2088            .update(&mut self.cx.clone(), |thread, cx| {
2089                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
2090            })??
2091            .await;
2092
2093        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
2094    }
2095}
2096
2097impl ClientDelegate {
2098    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
2099        let sessions = self.sessions.borrow();
2100        sessions
2101            .get(session_id)
2102            .context("Failed to get session")
2103            .map(|session| session.thread.clone())
2104    }
2105}