acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
   8use anyhow::anyhow;
   9use collections::HashMap;
  10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  11use futures::AsyncBufReadExt as _;
  12use futures::FutureExt as _;
  13use futures::future::Shared;
  14use futures::io::BufReader;
  15use project::agent_server_store::{AgentServerCommand, AgentServerStore};
  16use project::{AgentId, Project};
  17use remote::remote_client::Interactive;
  18use serde::Deserialize;
  19use std::path::PathBuf;
  20use std::process::Stdio;
  21use std::rc::Rc;
  22use std::{any::Any, cell::RefCell};
  23use task::{Shell, ShellBuilder, SpawnInTerminal};
  24use thiserror::Error;
  25use util::ResultExt as _;
  26use util::path_list::PathList;
  27use util::process::Child;
  28
  29use std::sync::Arc;
  30
  31use anyhow::{Context as _, Result};
  32use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  33
  34use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  35use terminal::TerminalBuilder;
  36use terminal::terminal_settings::{AlternateScroll, CursorShape};
  37
  38use crate::GEMINI_ID;
  39
  40pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  41
  42#[derive(Debug, Error)]
  43#[error("Unsupported version")]
  44pub struct UnsupportedVersion;
  45
  46pub struct AcpConnection {
  47    id: AgentId,
  48    telemetry_id: SharedString,
  49    connection: Rc<acp::ClientSideConnection>,
  50    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  51    pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
  52    auth_methods: Vec<acp::AuthMethod>,
  53    agent_server_store: WeakEntity<AgentServerStore>,
  54    agent_capabilities: acp::AgentCapabilities,
  55    default_mode: Option<acp::SessionModeId>,
  56    default_model: Option<acp::ModelId>,
  57    default_config_options: HashMap<String, String>,
  58    child: Option<Child>,
  59    session_list: Option<Rc<AcpSessionList>>,
  60    _io_task: Task<Result<(), acp::Error>>,
  61    _wait_task: Task<Result<()>>,
  62    _stderr_task: Task<Result<()>>,
  63}
  64
  65struct PendingAcpSession {
  66    task: Shared<Task<Result<Entity<AcpThread>, Arc<anyhow::Error>>>>,
  67    ref_count: usize,
  68}
  69
  70struct SessionConfigResponse {
  71    modes: Option<acp::SessionModeState>,
  72    models: Option<acp::SessionModelState>,
  73    config_options: Option<Vec<acp::SessionConfigOption>>,
  74}
  75
  76#[derive(Clone)]
  77struct ConfigOptions {
  78    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
  79    tx: Rc<RefCell<watch::Sender<()>>>,
  80    rx: watch::Receiver<()>,
  81}
  82
  83impl ConfigOptions {
  84    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
  85        let (tx, rx) = watch::channel(());
  86        Self {
  87            config_options,
  88            tx: Rc::new(RefCell::new(tx)),
  89            rx,
  90        }
  91    }
  92}
  93
  94pub struct AcpSession {
  95    thread: WeakEntity<AcpThread>,
  96    suppress_abort_err: bool,
  97    models: Option<Rc<RefCell<acp::SessionModelState>>>,
  98    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
  99    config_options: Option<ConfigOptions>,
 100    ref_count: usize,
 101}
 102
 103pub struct AcpSessionList {
 104    connection: Rc<acp::ClientSideConnection>,
 105    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
 106    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
 107}
 108
 109impl AcpSessionList {
 110    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
 111        let (tx, rx) = smol::channel::unbounded();
 112        Self {
 113            connection,
 114            updates_tx: tx,
 115            updates_rx: rx,
 116        }
 117    }
 118
 119    fn notify_update(&self) {
 120        self.updates_tx
 121            .try_send(acp_thread::SessionListUpdate::Refresh)
 122            .log_err();
 123    }
 124
 125    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 126        self.updates_tx
 127            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 128            .log_err();
 129    }
 130}
 131
 132impl AgentSessionList for AcpSessionList {
 133    fn list_sessions(
 134        &self,
 135        request: AgentSessionListRequest,
 136        cx: &mut App,
 137    ) -> Task<Result<AgentSessionListResponse>> {
 138        let conn = self.connection.clone();
 139        cx.foreground_executor().spawn(async move {
 140            let acp_request = acp::ListSessionsRequest::new()
 141                .cwd(request.cwd)
 142                .cursor(request.cursor);
 143            let response = conn.list_sessions(acp_request).await?;
 144            Ok(AgentSessionListResponse {
 145                sessions: response
 146                    .sessions
 147                    .into_iter()
 148                    .map(|s| AgentSessionInfo {
 149                        session_id: s.session_id,
 150                        work_dirs: Some(PathList::new(&[s.cwd])),
 151                        title: s.title.map(Into::into),
 152                        updated_at: s.updated_at.and_then(|date_str| {
 153                            chrono::DateTime::parse_from_rfc3339(&date_str)
 154                                .ok()
 155                                .map(|dt| dt.with_timezone(&chrono::Utc))
 156                        }),
 157                        created_at: None,
 158                        meta: s.meta,
 159                    })
 160                    .collect(),
 161                next_cursor: response.next_cursor,
 162                meta: response.meta,
 163            })
 164        })
 165    }
 166
 167    fn watch(
 168        &self,
 169        _cx: &mut App,
 170    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 171        Some(self.updates_rx.clone())
 172    }
 173
 174    fn notify_refresh(&self) {
 175        self.notify_update();
 176    }
 177
 178    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 179        self
 180    }
 181}
 182
 183pub async fn connect(
 184    agent_id: AgentId,
 185    project: Entity<Project>,
 186    command: AgentServerCommand,
 187    agent_server_store: WeakEntity<AgentServerStore>,
 188    default_mode: Option<acp::SessionModeId>,
 189    default_model: Option<acp::ModelId>,
 190    default_config_options: HashMap<String, String>,
 191    cx: &mut AsyncApp,
 192) -> Result<Rc<dyn AgentConnection>> {
 193    let conn = AcpConnection::stdio(
 194        agent_id,
 195        project,
 196        command.clone(),
 197        agent_server_store,
 198        default_mode,
 199        default_model,
 200        default_config_options,
 201        cx,
 202    )
 203    .await?;
 204    Ok(Rc::new(conn) as _)
 205}
 206
 207const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 208
 209impl AcpConnection {
 210    pub async fn stdio(
 211        agent_id: AgentId,
 212        project: Entity<Project>,
 213        command: AgentServerCommand,
 214        agent_server_store: WeakEntity<AgentServerStore>,
 215        default_mode: Option<acp::SessionModeId>,
 216        default_model: Option<acp::ModelId>,
 217        default_config_options: HashMap<String, String>,
 218        cx: &mut AsyncApp,
 219    ) -> Result<Self> {
 220        let root_dir = project.read_with(cx, |project, cx| {
 221            project
 222                .default_path_list(cx)
 223                .ordered_paths()
 224                .next()
 225                .cloned()
 226        });
 227        let original_command = command.clone();
 228        let (path, args, env) = project
 229            .read_with(cx, |project, cx| {
 230                project.remote_client().and_then(|client| {
 231                    let template = client
 232                        .read(cx)
 233                        .build_command_with_options(
 234                            Some(command.path.display().to_string()),
 235                            &command.args,
 236                            &command.env.clone().into_iter().flatten().collect(),
 237                            root_dir.as_ref().map(|path| path.display().to_string()),
 238                            None,
 239                            Interactive::No,
 240                        )
 241                        .log_err()?;
 242                    Some((template.program, template.args, template.env))
 243                })
 244            })
 245            .unwrap_or_else(|| {
 246                (
 247                    command.path.display().to_string(),
 248                    command.args,
 249                    command.env.unwrap_or_default(),
 250                )
 251            });
 252
 253        let builder = ShellBuilder::new(&Shell::System, cfg!(windows)).non_interactive();
 254        let mut child = builder.build_std_command(Some(path.clone()), &args);
 255        child.envs(env.clone());
 256        if let Some(cwd) = project.read_with(cx, |project, _cx| {
 257            if project.is_local() {
 258                root_dir.as_ref()
 259            } else {
 260                None
 261            }
 262        }) {
 263            child.current_dir(cwd);
 264        }
 265        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 266
 267        let stdout = child.stdout.take().context("Failed to take stdout")?;
 268        let stdin = child.stdin.take().context("Failed to take stdin")?;
 269        let stderr = child.stderr.take().context("Failed to take stderr")?;
 270        log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
 271        log::trace!("Spawned (pid: {})", child.id());
 272
 273        let sessions = Rc::new(RefCell::new(HashMap::default()));
 274
 275        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 276            (
 277                release_channel::ReleaseChannel::try_global(cx)
 278                    .map(|release_channel| release_channel.display_name()),
 279                release_channel::AppVersion::global(cx).to_string(),
 280            )
 281        });
 282
 283        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 284            Rc::new(RefCell::new(None));
 285
 286        let client = ClientDelegate {
 287            sessions: sessions.clone(),
 288            session_list: client_session_list.clone(),
 289            cx: cx.clone(),
 290        };
 291        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
 292            let foreground_executor = cx.foreground_executor().clone();
 293            move |fut| {
 294                foreground_executor.spawn(fut).detach();
 295            }
 296        });
 297
 298        let io_task = cx.background_spawn(io_task);
 299
 300        let stderr_task = cx.background_spawn(async move {
 301            let mut stderr = BufReader::new(stderr);
 302            let mut line = String::new();
 303            while let Ok(n) = stderr.read_line(&mut line).await
 304                && n > 0
 305            {
 306                log::warn!("agent stderr: {}", line.trim());
 307                line.clear();
 308            }
 309            Ok(())
 310        });
 311
 312        let wait_task = cx.spawn({
 313            let sessions = sessions.clone();
 314            let status_fut = child.status();
 315            async move |cx| {
 316                let status = status_fut.await?;
 317                emit_load_error_to_all_sessions(&sessions, LoadError::Exited { status }, cx);
 318                anyhow::Ok(())
 319            }
 320        });
 321
 322        let connection = Rc::new(connection);
 323
 324        cx.update(|cx| {
 325            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 326                registry.set_active_connection(agent_id.clone(), &connection, cx)
 327            });
 328        });
 329
 330        let response = connection
 331            .initialize(
 332                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 333                    .client_capabilities(
 334                        acp::ClientCapabilities::new()
 335                            .fs(acp::FileSystemCapabilities::new()
 336                                .read_text_file(true)
 337                                .write_text_file(true))
 338                            .terminal(true)
 339                            .auth(acp::AuthCapabilities::new().terminal(true))
 340                            // Experimental: Allow for rendering terminal output from the agents
 341                            .meta(acp::Meta::from_iter([
 342                                ("terminal_output".into(), true.into()),
 343                                ("terminal-auth".into(), true.into()),
 344                            ])),
 345                    )
 346                    .client_info(
 347                        acp::Implementation::new("zed", version)
 348                            .title(release_channel.map(ToOwned::to_owned)),
 349                    ),
 350            )
 351            .await?;
 352
 353        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 354            return Err(UnsupportedVersion.into());
 355        }
 356
 357        let telemetry_id = response
 358            .agent_info
 359            // Use the one the agent provides if we have one
 360            .map(|info| info.name.into())
 361            // Otherwise, just use the name
 362            .unwrap_or_else(|| agent_id.0.clone());
 363
 364        let session_list = if response
 365            .agent_capabilities
 366            .session_capabilities
 367            .list
 368            .is_some()
 369        {
 370            let list = Rc::new(AcpSessionList::new(connection.clone()));
 371            *client_session_list.borrow_mut() = Some(list.clone());
 372            Some(list)
 373        } else {
 374            None
 375        };
 376
 377        // TODO: Remove this override once Google team releases their official auth methods
 378        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 379            let mut gemini_args = original_command.args.clone();
 380            gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
 381            let value = serde_json::json!({
 382                "label": "gemini /auth",
 383                "command": original_command.path.to_string_lossy(),
 384                "args": gemini_args,
 385                "env": original_command.env.unwrap_or_default(),
 386            });
 387            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 388            vec![acp::AuthMethod::Agent(
 389                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 390                    .description("Login with your Google or Vertex AI account")
 391                    .meta(meta),
 392            )]
 393        } else {
 394            response.auth_methods
 395        };
 396        Ok(Self {
 397            id: agent_id,
 398            auth_methods,
 399            agent_server_store,
 400            connection,
 401            telemetry_id,
 402            sessions,
 403            pending_sessions: Rc::new(RefCell::new(HashMap::default())),
 404            agent_capabilities: response.agent_capabilities,
 405            default_mode,
 406            default_model,
 407            default_config_options,
 408            session_list,
 409            _io_task: io_task,
 410            _wait_task: wait_task,
 411            _stderr_task: stderr_task,
 412            child: Some(child),
 413        })
 414    }
 415
 416    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 417        &self.agent_capabilities.prompt_capabilities
 418    }
 419
 420    #[cfg(any(test, feature = "test-support"))]
 421    fn new_for_test(
 422        connection: Rc<acp::ClientSideConnection>,
 423        sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 424        agent_capabilities: acp::AgentCapabilities,
 425        agent_server_store: WeakEntity<AgentServerStore>,
 426        io_task: Task<Result<(), acp::Error>>,
 427        _cx: &mut App,
 428    ) -> Self {
 429        Self {
 430            id: AgentId::new("test"),
 431            telemetry_id: "test".into(),
 432            connection,
 433            sessions,
 434            pending_sessions: Rc::new(RefCell::new(HashMap::default())),
 435            auth_methods: vec![],
 436            agent_server_store,
 437            agent_capabilities,
 438            default_mode: None,
 439            default_model: None,
 440            default_config_options: HashMap::default(),
 441            child: None,
 442            session_list: None,
 443            _io_task: io_task,
 444            _wait_task: Task::ready(Ok(())),
 445            _stderr_task: Task::ready(Ok(())),
 446        }
 447    }
 448
 449    fn open_or_create_session(
 450        self: Rc<Self>,
 451        session_id: acp::SessionId,
 452        project: Entity<Project>,
 453        work_dirs: PathList,
 454        title: Option<SharedString>,
 455        rpc_call: impl FnOnce(
 456            Rc<acp::ClientSideConnection>,
 457            acp::SessionId,
 458            PathBuf,
 459        )
 460            -> futures::future::LocalBoxFuture<'static, Result<SessionConfigResponse>>
 461        + 'static,
 462        cx: &mut App,
 463    ) -> Task<Result<Entity<AcpThread>>> {
 464        if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 465            session.ref_count += 1;
 466            if let Some(thread) = session.thread.upgrade() {
 467                return Task::ready(Ok(thread));
 468            }
 469        }
 470
 471        if let Some(pending) = self.pending_sessions.borrow_mut().get_mut(&session_id) {
 472            pending.ref_count += 1;
 473            let task = pending.task.clone();
 474            return cx
 475                .foreground_executor()
 476                .spawn(async move { task.await.map_err(|err| anyhow!(err)) });
 477        }
 478
 479        // TODO: remove this once ACP supports multiple working directories
 480        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 481            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 482        };
 483
 484        let shared_task = cx
 485            .spawn({
 486                let session_id = session_id.clone();
 487                let this = self.clone();
 488                async move |cx| {
 489                    let action_log = cx.new(|_| ActionLog::new(project.clone()));
 490                    let thread: Entity<AcpThread> = cx.new(|cx| {
 491                        AcpThread::new(
 492                            None,
 493                            title,
 494                            Some(work_dirs),
 495                            this.clone(),
 496                            project,
 497                            action_log,
 498                            session_id.clone(),
 499                            watch::Receiver::constant(
 500                                this.agent_capabilities.prompt_capabilities.clone(),
 501                            ),
 502                            cx,
 503                        )
 504                    });
 505
 506                    let response =
 507                        match rpc_call(this.connection.clone(), session_id.clone(), cwd).await {
 508                            Ok(response) => response,
 509                            Err(err) => {
 510                                this.pending_sessions.borrow_mut().remove(&session_id);
 511                                return Err(Arc::new(err));
 512                            }
 513                        };
 514
 515                    let (modes, models, config_options) =
 516                        config_state(response.modes, response.models, response.config_options);
 517
 518                    if let Some(config_opts) = config_options.as_ref() {
 519                        this.apply_default_config_options(&session_id, config_opts, cx);
 520                    }
 521
 522                    let ref_count = this
 523                        .pending_sessions
 524                        .borrow_mut()
 525                        .remove(&session_id)
 526                        .map_or(1, |pending| pending.ref_count);
 527
 528                    this.sessions.borrow_mut().insert(
 529                        session_id,
 530                        AcpSession {
 531                            thread: thread.downgrade(),
 532                            suppress_abort_err: false,
 533                            session_modes: modes,
 534                            models,
 535                            config_options: config_options.map(ConfigOptions::new),
 536                            ref_count,
 537                        },
 538                    );
 539
 540                    Ok(thread)
 541                }
 542            })
 543            .shared();
 544
 545        self.pending_sessions.borrow_mut().insert(
 546            session_id,
 547            PendingAcpSession {
 548                task: shared_task.clone(),
 549                ref_count: 1,
 550            },
 551        );
 552
 553        cx.foreground_executor()
 554            .spawn(async move { shared_task.await.map_err(|err| anyhow!(err)) })
 555    }
 556
 557    fn apply_default_config_options(
 558        &self,
 559        session_id: &acp::SessionId,
 560        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 561        cx: &mut AsyncApp,
 562    ) {
 563        let id = self.id.clone();
 564        let defaults_to_apply: Vec<_> = {
 565            let config_opts_ref = config_options.borrow();
 566            config_opts_ref
 567                .iter()
 568                .filter_map(|config_option| {
 569                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 570
 571                    let is_valid = match &config_option.kind {
 572                        acp::SessionConfigKind::Select(select) => match &select.options {
 573                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 574                                .iter()
 575                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 576                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 577                                groups.iter().any(|g| {
 578                                    g.options
 579                                        .iter()
 580                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 581                                })
 582                            }
 583                            _ => false,
 584                        },
 585                        _ => false,
 586                    };
 587
 588                    if is_valid {
 589                        let initial_value = match &config_option.kind {
 590                            acp::SessionConfigKind::Select(select) => {
 591                                Some(select.current_value.clone())
 592                            }
 593                            _ => None,
 594                        };
 595                        Some((
 596                            config_option.id.clone(),
 597                            default_value.clone(),
 598                            initial_value,
 599                        ))
 600                    } else {
 601                        log::warn!(
 602                            "`{}` is not a valid value for config option `{}` in {}",
 603                            default_value,
 604                            config_option.id.0,
 605                            id
 606                        );
 607                        None
 608                    }
 609                })
 610                .collect()
 611        };
 612
 613        for (config_id, default_value, initial_value) in defaults_to_apply {
 614            cx.spawn({
 615                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 616                let session_id = session_id.clone();
 617                let config_id_clone = config_id.clone();
 618                let config_opts = config_options.clone();
 619                let conn = self.connection.clone();
 620                async move |_| {
 621                    let result = conn
 622                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
 623                            session_id,
 624                            config_id_clone.clone(),
 625                            default_value_id,
 626                        ))
 627                        .await
 628                        .log_err();
 629
 630                    if result.is_none() {
 631                        if let Some(initial) = initial_value {
 632                            let mut opts = config_opts.borrow_mut();
 633                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 634                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 635                                    select.current_value = initial;
 636                                }
 637                            }
 638                        }
 639                    }
 640                }
 641            })
 642            .detach();
 643
 644            let mut opts = config_options.borrow_mut();
 645            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 646                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 647                    select.current_value = acp::SessionConfigValueId::new(default_value);
 648                }
 649            }
 650        }
 651    }
 652}
 653
 654fn emit_load_error_to_all_sessions(
 655    sessions: &Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 656    error: LoadError,
 657    cx: &mut AsyncApp,
 658) {
 659    let threads: Vec<_> = sessions
 660        .borrow()
 661        .values()
 662        .map(|session| session.thread.clone())
 663        .collect();
 664
 665    for thread in threads {
 666        thread
 667            .update(cx, |thread, cx| thread.emit_load_error(error.clone(), cx))
 668            .ok();
 669    }
 670}
 671
 672impl Drop for AcpConnection {
 673    fn drop(&mut self) {
 674        if let Some(ref mut child) = self.child {
 675            child.kill().log_err();
 676        }
 677    }
 678}
 679
 680fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 681    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 682}
 683
 684fn terminal_auth_task(
 685    command: &AgentServerCommand,
 686    agent_id: &AgentId,
 687    method: &acp::AuthMethodTerminal,
 688) -> SpawnInTerminal {
 689    acp_thread::build_terminal_auth_task(
 690        terminal_auth_task_id(agent_id, &method.id),
 691        method.name.clone(),
 692        command.path.to_string_lossy().into_owned(),
 693        command.args.clone(),
 694        command.env.clone().unwrap_or_default(),
 695    )
 696}
 697
 698/// Used to support the _meta method prior to stabilization
 699fn meta_terminal_auth_task(
 700    agent_id: &AgentId,
 701    method_id: &acp::AuthMethodId,
 702    method: &acp::AuthMethod,
 703) -> Option<SpawnInTerminal> {
 704    #[derive(Deserialize)]
 705    struct MetaTerminalAuth {
 706        label: String,
 707        command: String,
 708        #[serde(default)]
 709        args: Vec<String>,
 710        #[serde(default)]
 711        env: HashMap<String, String>,
 712    }
 713
 714    let meta = match method {
 715        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 716        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 717        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 718        _ => None,
 719    }?;
 720    let terminal_auth =
 721        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 722
 723    Some(acp_thread::build_terminal_auth_task(
 724        terminal_auth_task_id(agent_id, method_id),
 725        terminal_auth.label.clone(),
 726        terminal_auth.command,
 727        terminal_auth.args,
 728        terminal_auth.env,
 729    ))
 730}
 731
 732impl AgentConnection for AcpConnection {
 733    fn agent_id(&self) -> AgentId {
 734        self.id.clone()
 735    }
 736
 737    fn telemetry_id(&self) -> SharedString {
 738        self.telemetry_id.clone()
 739    }
 740
 741    fn new_session(
 742        self: Rc<Self>,
 743        project: Entity<Project>,
 744        work_dirs: PathList,
 745        cx: &mut App,
 746    ) -> Task<Result<Entity<AcpThread>>> {
 747        // TODO: remove this once ACP supports multiple working directories
 748        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 749            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 750        };
 751        let name = self.id.0.clone();
 752        let mcp_servers = mcp_servers_for_project(&project, cx);
 753
 754        cx.spawn(async move |cx| {
 755            let response = self.connection
 756                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
 757                .await
 758                .map_err(map_acp_error)?;
 759
 760            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
 761
 762            if let Some(default_mode) = self.default_mode.clone() {
 763                if let Some(modes) = modes.as_ref() {
 764                    let mut modes_ref = modes.borrow_mut();
 765                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
 766
 767                    if has_mode {
 768                        let initial_mode_id = modes_ref.current_mode_id.clone();
 769
 770                        cx.spawn({
 771                            let default_mode = default_mode.clone();
 772                            let session_id = response.session_id.clone();
 773                            let modes = modes.clone();
 774                            let conn = self.connection.clone();
 775                            async move |_| {
 776                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
 777                                .await.log_err();
 778
 779                                if result.is_none() {
 780                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 781                                }
 782                            }
 783                        }).detach();
 784
 785                        modes_ref.current_mode_id = default_mode;
 786                    } else {
 787                        let available_modes = modes_ref
 788                            .available_modes
 789                            .iter()
 790                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 791                            .collect::<Vec<_>>()
 792                            .join("\n");
 793
 794                        log::warn!(
 795                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 796                        );
 797                    }
 798                }
 799            }
 800
 801            if let Some(default_model) = self.default_model.clone() {
 802                if let Some(models) = models.as_ref() {
 803                    let mut models_ref = models.borrow_mut();
 804                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
 805
 806                    if has_model {
 807                        let initial_model_id = models_ref.current_model_id.clone();
 808
 809                        cx.spawn({
 810                            let default_model = default_model.clone();
 811                            let session_id = response.session_id.clone();
 812                            let models = models.clone();
 813                            let conn = self.connection.clone();
 814                            async move |_| {
 815                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
 816                                .await.log_err();
 817
 818                                if result.is_none() {
 819                                    models.borrow_mut().current_model_id = initial_model_id;
 820                                }
 821                            }
 822                        }).detach();
 823
 824                        models_ref.current_model_id = default_model;
 825                    } else {
 826                        let available_models = models_ref
 827                            .available_models
 828                            .iter()
 829                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 830                            .collect::<Vec<_>>()
 831                            .join("\n");
 832
 833                        log::warn!(
 834                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 835                        );
 836                    }
 837                }
 838            }
 839
 840            if let Some(config_opts) = config_options.as_ref() {
 841                self.apply_default_config_options(&response.session_id, config_opts, cx);
 842            }
 843
 844            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 845            let thread: Entity<AcpThread> = cx.new(|cx| {
 846                AcpThread::new(
 847                    None,
 848                    None,
 849                    Some(work_dirs),
 850                    self.clone(),
 851                    project,
 852                    action_log,
 853                    response.session_id.clone(),
 854                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 855                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 856                    cx,
 857                )
 858            });
 859
 860            self.sessions.borrow_mut().insert(
 861                response.session_id,
 862                AcpSession {
 863                    thread: thread.downgrade(),
 864                    suppress_abort_err: false,
 865                    session_modes: modes,
 866                    models,
 867                    config_options: config_options.map(ConfigOptions::new),
 868                    ref_count: 1,
 869                },
 870            );
 871
 872            Ok(thread)
 873        })
 874    }
 875
 876    fn supports_load_session(&self) -> bool {
 877        self.agent_capabilities.load_session
 878    }
 879
 880    fn supports_resume_session(&self) -> bool {
 881        self.agent_capabilities
 882            .session_capabilities
 883            .resume
 884            .is_some()
 885    }
 886
 887    fn load_session(
 888        self: Rc<Self>,
 889        session_id: acp::SessionId,
 890        project: Entity<Project>,
 891        work_dirs: PathList,
 892        title: Option<SharedString>,
 893        cx: &mut App,
 894    ) -> Task<Result<Entity<AcpThread>>> {
 895        if !self.agent_capabilities.load_session {
 896            return Task::ready(Err(anyhow!(LoadError::Other(
 897                "Loading sessions is not supported by this agent.".into()
 898            ))));
 899        }
 900
 901        let mcp_servers = mcp_servers_for_project(&project, cx);
 902        self.open_or_create_session(
 903            session_id,
 904            project,
 905            work_dirs,
 906            title,
 907            move |connection, session_id, cwd| {
 908                Box::pin(async move {
 909                    let response = connection
 910                        .load_session(
 911                            acp::LoadSessionRequest::new(session_id, cwd).mcp_servers(mcp_servers),
 912                        )
 913                        .await
 914                        .map_err(map_acp_error)?;
 915                    Ok(SessionConfigResponse {
 916                        modes: response.modes,
 917                        models: response.models,
 918                        config_options: response.config_options,
 919                    })
 920                })
 921            },
 922            cx,
 923        )
 924    }
 925
 926    fn resume_session(
 927        self: Rc<Self>,
 928        session_id: acp::SessionId,
 929        project: Entity<Project>,
 930        work_dirs: PathList,
 931        title: Option<SharedString>,
 932        cx: &mut App,
 933    ) -> Task<Result<Entity<AcpThread>>> {
 934        if self
 935            .agent_capabilities
 936            .session_capabilities
 937            .resume
 938            .is_none()
 939        {
 940            return Task::ready(Err(anyhow!(LoadError::Other(
 941                "Resuming sessions is not supported by this agent.".into()
 942            ))));
 943        }
 944
 945        let mcp_servers = mcp_servers_for_project(&project, cx);
 946        self.open_or_create_session(
 947            session_id,
 948            project,
 949            work_dirs,
 950            title,
 951            move |connection, session_id, cwd| {
 952                Box::pin(async move {
 953                    let response = connection
 954                        .resume_session(
 955                            acp::ResumeSessionRequest::new(session_id, cwd)
 956                                .mcp_servers(mcp_servers),
 957                        )
 958                        .await
 959                        .map_err(map_acp_error)?;
 960                    Ok(SessionConfigResponse {
 961                        modes: response.modes,
 962                        models: response.models,
 963                        config_options: response.config_options,
 964                    })
 965                })
 966            },
 967            cx,
 968        )
 969    }
 970
 971    fn supports_close_session(&self) -> bool {
 972        self.agent_capabilities.session_capabilities.close.is_some()
 973    }
 974
 975    fn close_session(
 976        self: Rc<Self>,
 977        session_id: &acp::SessionId,
 978        cx: &mut App,
 979    ) -> Task<Result<()>> {
 980        if !self.supports_close_session() {
 981            return Task::ready(Err(anyhow!(LoadError::Other(
 982                "Closing sessions is not supported by this agent.".into()
 983            ))));
 984        }
 985
 986        let mut sessions = self.sessions.borrow_mut();
 987        let Some(session) = sessions.get_mut(session_id) else {
 988            return Task::ready(Ok(()));
 989        };
 990
 991        session.ref_count -= 1;
 992        if session.ref_count > 0 {
 993            return Task::ready(Ok(()));
 994        }
 995
 996        sessions.remove(session_id);
 997        drop(sessions);
 998
 999        let conn = self.connection.clone();
1000        let session_id = session_id.clone();
1001        cx.foreground_executor().spawn(async move {
1002            conn.close_session(acp::CloseSessionRequest::new(session_id))
1003                .await?;
1004            Ok(())
1005        })
1006    }
1007
1008    fn auth_methods(&self) -> &[acp::AuthMethod] {
1009        &self.auth_methods
1010    }
1011
1012    fn terminal_auth_task(
1013        &self,
1014        method_id: &acp::AuthMethodId,
1015        cx: &App,
1016    ) -> Option<Task<Result<SpawnInTerminal>>> {
1017        let method = self
1018            .auth_methods
1019            .iter()
1020            .find(|method| method.id() == method_id)?;
1021
1022        match method {
1023            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1024                let agent_id = self.id.clone();
1025                let terminal = terminal.clone();
1026                let store = self.agent_server_store.clone();
1027                Some(cx.spawn(async move |cx| {
1028                    let command = store
1029                        .update(cx, |store, cx| {
1030                            let agent = store
1031                                .get_external_agent(&agent_id)
1032                                .context("Agent server not found")?;
1033                            anyhow::Ok(agent.get_command(
1034                                terminal.args.clone(),
1035                                HashMap::from_iter(terminal.env.clone()),
1036                                &mut cx.to_async(),
1037                            ))
1038                        })?
1039                        .context("Failed to get agent command")?
1040                        .await?;
1041                    Ok(terminal_auth_task(&command, &agent_id, &terminal))
1042                }))
1043            }
1044            _ => meta_terminal_auth_task(&self.id, method_id, method)
1045                .map(|task| Task::ready(Ok(task))),
1046        }
1047    }
1048
1049    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1050        let conn = self.connection.clone();
1051        cx.foreground_executor().spawn(async move {
1052            conn.authenticate(acp::AuthenticateRequest::new(method_id))
1053                .await?;
1054            Ok(())
1055        })
1056    }
1057
1058    fn prompt(
1059        &self,
1060        _id: acp_thread::UserMessageId,
1061        params: acp::PromptRequest,
1062        cx: &mut App,
1063    ) -> Task<Result<acp::PromptResponse>> {
1064        let conn = self.connection.clone();
1065        let sessions = self.sessions.clone();
1066        let session_id = params.session_id.clone();
1067        cx.foreground_executor().spawn(async move {
1068            let result = conn.prompt(params).await;
1069
1070            let mut suppress_abort_err = false;
1071
1072            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1073                suppress_abort_err = session.suppress_abort_err;
1074                session.suppress_abort_err = false;
1075            }
1076
1077            match result {
1078                Ok(response) => Ok(response),
1079                Err(err) => {
1080                    if err.code == acp::ErrorCode::AuthRequired {
1081                        return Err(anyhow!(acp::Error::auth_required()));
1082                    }
1083
1084                    if err.code != ErrorCode::InternalError {
1085                        anyhow::bail!(err)
1086                    }
1087
1088                    let Some(data) = &err.data else {
1089                        anyhow::bail!(err)
1090                    };
1091
1092                    // Temporary workaround until the following PR is generally available:
1093                    // https://github.com/google-gemini/gemini-cli/pull/6656
1094
1095                    #[derive(Deserialize)]
1096                    #[serde(deny_unknown_fields)]
1097                    struct ErrorDetails {
1098                        details: Box<str>,
1099                    }
1100
1101                    match serde_json::from_value(data.clone()) {
1102                        Ok(ErrorDetails { details }) => {
1103                            if suppress_abort_err
1104                                && (details.contains("This operation was aborted")
1105                                    || details.contains("The user aborted a request"))
1106                            {
1107                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1108                            } else {
1109                                Err(anyhow!(details))
1110                            }
1111                        }
1112                        Err(_) => Err(anyhow!(err)),
1113                    }
1114                }
1115            }
1116        })
1117    }
1118
1119    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1120        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1121            session.suppress_abort_err = true;
1122        }
1123        let conn = self.connection.clone();
1124        let params = acp::CancelNotification::new(session_id.clone());
1125        cx.foreground_executor()
1126            .spawn(async move { conn.cancel(params).await })
1127            .detach();
1128    }
1129
1130    fn session_modes(
1131        &self,
1132        session_id: &acp::SessionId,
1133        _cx: &App,
1134    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1135        let sessions = self.sessions.clone();
1136        let sessions_ref = sessions.borrow();
1137        let Some(session) = sessions_ref.get(session_id) else {
1138            return None;
1139        };
1140
1141        if let Some(modes) = session.session_modes.as_ref() {
1142            Some(Rc::new(AcpSessionModes {
1143                connection: self.connection.clone(),
1144                session_id: session_id.clone(),
1145                state: modes.clone(),
1146            }) as _)
1147        } else {
1148            None
1149        }
1150    }
1151
1152    fn model_selector(
1153        &self,
1154        session_id: &acp::SessionId,
1155    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1156        let sessions = self.sessions.clone();
1157        let sessions_ref = sessions.borrow();
1158        let Some(session) = sessions_ref.get(session_id) else {
1159            return None;
1160        };
1161
1162        if let Some(models) = session.models.as_ref() {
1163            Some(Rc::new(AcpModelSelector::new(
1164                session_id.clone(),
1165                self.connection.clone(),
1166                models.clone(),
1167            )) as _)
1168        } else {
1169            None
1170        }
1171    }
1172
1173    fn session_config_options(
1174        &self,
1175        session_id: &acp::SessionId,
1176        _cx: &App,
1177    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1178        let sessions = self.sessions.borrow();
1179        let session = sessions.get(session_id)?;
1180
1181        let config_opts = session.config_options.as_ref()?;
1182
1183        Some(Rc::new(AcpSessionConfigOptions {
1184            session_id: session_id.clone(),
1185            connection: self.connection.clone(),
1186            state: config_opts.config_options.clone(),
1187            watch_tx: config_opts.tx.clone(),
1188            watch_rx: config_opts.rx.clone(),
1189        }) as _)
1190    }
1191
1192    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1193        self.session_list.clone().map(|s| s as _)
1194    }
1195
1196    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1197        self
1198    }
1199}
1200
1201fn map_acp_error(err: acp::Error) -> anyhow::Error {
1202    if err.code == acp::ErrorCode::AuthRequired {
1203        let mut error = AuthRequired::new();
1204
1205        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1206            error = error.with_description(err.message);
1207        }
1208
1209        anyhow!(error)
1210    } else {
1211        anyhow!(err)
1212    }
1213}
1214
1215#[cfg(any(test, feature = "test-support"))]
1216pub mod test_support {
1217    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1218
1219    use acp_thread::{
1220        AgentModelSelector, AgentSessionConfigOptions, AgentSessionModes, AgentSessionRetry,
1221        AgentSessionSetTitle, AgentSessionTruncate, AgentTelemetry, UserMessageId,
1222    };
1223
1224    use super::*;
1225
1226    #[derive(Clone, Default)]
1227    pub struct FakeAcpAgentServer {
1228        load_session_count: Arc<AtomicUsize>,
1229        close_session_count: Arc<AtomicUsize>,
1230        fail_next_prompt: Arc<AtomicBool>,
1231        exit_status_sender:
1232            Arc<std::sync::Mutex<Option<smol::channel::Sender<std::process::ExitStatus>>>>,
1233    }
1234
1235    impl FakeAcpAgentServer {
1236        pub fn new() -> Self {
1237            Self::default()
1238        }
1239
1240        pub fn load_session_count(&self) -> Arc<AtomicUsize> {
1241            self.load_session_count.clone()
1242        }
1243
1244        pub fn close_session_count(&self) -> Arc<AtomicUsize> {
1245            self.close_session_count.clone()
1246        }
1247
1248        pub fn simulate_server_exit(&self) {
1249            let sender = self
1250                .exit_status_sender
1251                .lock()
1252                .expect("exit status sender lock should not be poisoned")
1253                .clone()
1254                .expect("fake ACP server must be connected before simulating exit");
1255            sender
1256                .try_send(std::process::ExitStatus::default())
1257                .expect("fake ACP server exit receiver should still be alive");
1258        }
1259
1260        pub fn fail_next_prompt(&self) {
1261            self.fail_next_prompt.store(true, Ordering::SeqCst);
1262        }
1263    }
1264
1265    impl crate::AgentServer for FakeAcpAgentServer {
1266        fn logo(&self) -> ui::IconName {
1267            ui::IconName::ZedAgent
1268        }
1269
1270        fn agent_id(&self) -> AgentId {
1271            AgentId::new("Test")
1272        }
1273
1274        fn connect(
1275            &self,
1276            _delegate: crate::AgentServerDelegate,
1277            project: Entity<Project>,
1278            cx: &mut App,
1279        ) -> Task<anyhow::Result<Rc<dyn AgentConnection>>> {
1280            let load_session_count = self.load_session_count.clone();
1281            let close_session_count = self.close_session_count.clone();
1282            let fail_next_prompt = self.fail_next_prompt.clone();
1283            let exit_status_sender = self.exit_status_sender.clone();
1284            cx.spawn(async move |cx| {
1285                let harness = build_fake_acp_connection(
1286                    project,
1287                    load_session_count,
1288                    close_session_count,
1289                    fail_next_prompt,
1290                    cx,
1291                )
1292                .await?;
1293                let (exit_tx, exit_rx) = smol::channel::bounded(1);
1294                *exit_status_sender
1295                    .lock()
1296                    .expect("exit status sender lock should not be poisoned") = Some(exit_tx);
1297                let connection = harness.connection.clone();
1298                let simulate_exit_task = cx.spawn(async move |cx| {
1299                    while let Ok(status) = exit_rx.recv().await {
1300                        emit_load_error_to_all_sessions(
1301                            &connection.sessions,
1302                            LoadError::Exited { status },
1303                            cx,
1304                        );
1305                    }
1306                    Ok(())
1307                });
1308                Ok(Rc::new(FakeAcpAgentConnection {
1309                    inner: harness.connection,
1310                    _keep_agent_alive: harness.keep_agent_alive,
1311                    _simulate_exit_task: simulate_exit_task,
1312                }) as Rc<dyn AgentConnection>)
1313            })
1314        }
1315
1316        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1317            self
1318        }
1319    }
1320
1321    pub struct FakeAcpConnectionHarness {
1322        pub connection: Rc<AcpConnection>,
1323        pub load_session_count: Arc<AtomicUsize>,
1324        pub close_session_count: Arc<AtomicUsize>,
1325        pub keep_agent_alive: Task<anyhow::Result<()>>,
1326    }
1327
1328    struct FakeAcpAgentConnection {
1329        inner: Rc<AcpConnection>,
1330        _keep_agent_alive: Task<anyhow::Result<()>>,
1331        _simulate_exit_task: Task<anyhow::Result<()>>,
1332    }
1333
1334    impl AgentConnection for FakeAcpAgentConnection {
1335        fn agent_id(&self) -> AgentId {
1336            self.inner.agent_id()
1337        }
1338
1339        fn telemetry_id(&self) -> SharedString {
1340            self.inner.telemetry_id()
1341        }
1342
1343        fn new_session(
1344            self: Rc<Self>,
1345            project: Entity<Project>,
1346            work_dirs: PathList,
1347            cx: &mut App,
1348        ) -> Task<Result<Entity<AcpThread>>> {
1349            self.inner.clone().new_session(project, work_dirs, cx)
1350        }
1351
1352        fn supports_load_session(&self) -> bool {
1353            self.inner.supports_load_session()
1354        }
1355
1356        fn load_session(
1357            self: Rc<Self>,
1358            session_id: acp::SessionId,
1359            project: Entity<Project>,
1360            work_dirs: PathList,
1361            title: Option<SharedString>,
1362            cx: &mut App,
1363        ) -> Task<Result<Entity<AcpThread>>> {
1364            self.inner
1365                .clone()
1366                .load_session(session_id, project, work_dirs, title, cx)
1367        }
1368
1369        fn supports_close_session(&self) -> bool {
1370            self.inner.supports_close_session()
1371        }
1372
1373        fn close_session(
1374            self: Rc<Self>,
1375            session_id: &acp::SessionId,
1376            cx: &mut App,
1377        ) -> Task<Result<()>> {
1378            self.inner.clone().close_session(session_id, cx)
1379        }
1380
1381        fn supports_resume_session(&self) -> bool {
1382            self.inner.supports_resume_session()
1383        }
1384
1385        fn resume_session(
1386            self: Rc<Self>,
1387            session_id: acp::SessionId,
1388            project: Entity<Project>,
1389            work_dirs: PathList,
1390            title: Option<SharedString>,
1391            cx: &mut App,
1392        ) -> Task<Result<Entity<AcpThread>>> {
1393            self.inner
1394                .clone()
1395                .resume_session(session_id, project, work_dirs, title, cx)
1396        }
1397
1398        fn auth_methods(&self) -> &[acp::AuthMethod] {
1399            self.inner.auth_methods()
1400        }
1401
1402        fn terminal_auth_task(
1403            &self,
1404            method: &acp::AuthMethodId,
1405            cx: &App,
1406        ) -> Option<Task<Result<SpawnInTerminal>>> {
1407            self.inner.terminal_auth_task(method, cx)
1408        }
1409
1410        fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1411            self.inner.authenticate(method, cx)
1412        }
1413
1414        fn prompt(
1415            &self,
1416            user_message_id: UserMessageId,
1417            params: acp::PromptRequest,
1418            cx: &mut App,
1419        ) -> Task<Result<acp::PromptResponse>> {
1420            self.inner.prompt(user_message_id, params, cx)
1421        }
1422
1423        fn retry(
1424            &self,
1425            session_id: &acp::SessionId,
1426            cx: &App,
1427        ) -> Option<Rc<dyn AgentSessionRetry>> {
1428            self.inner.retry(session_id, cx)
1429        }
1430
1431        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1432            self.inner.cancel(session_id, cx)
1433        }
1434
1435        fn truncate(
1436            &self,
1437            session_id: &acp::SessionId,
1438            cx: &App,
1439        ) -> Option<Rc<dyn AgentSessionTruncate>> {
1440            self.inner.truncate(session_id, cx)
1441        }
1442
1443        fn set_title(
1444            &self,
1445            session_id: &acp::SessionId,
1446            cx: &App,
1447        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
1448            self.inner.set_title(session_id, cx)
1449        }
1450
1451        fn model_selector(
1452            &self,
1453            session_id: &acp::SessionId,
1454        ) -> Option<Rc<dyn AgentModelSelector>> {
1455            self.inner.model_selector(session_id)
1456        }
1457
1458        fn telemetry(&self) -> Option<Rc<dyn AgentTelemetry>> {
1459            self.inner.telemetry()
1460        }
1461
1462        fn session_modes(
1463            &self,
1464            session_id: &acp::SessionId,
1465            cx: &App,
1466        ) -> Option<Rc<dyn AgentSessionModes>> {
1467            self.inner.session_modes(session_id, cx)
1468        }
1469
1470        fn session_config_options(
1471            &self,
1472            session_id: &acp::SessionId,
1473            cx: &App,
1474        ) -> Option<Rc<dyn AgentSessionConfigOptions>> {
1475            self.inner.session_config_options(session_id, cx)
1476        }
1477
1478        fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1479            self.inner.session_list(cx)
1480        }
1481
1482        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1483            self
1484        }
1485    }
1486
1487    struct FakeAcpAgent {
1488        load_session_count: Arc<AtomicUsize>,
1489        close_session_count: Arc<AtomicUsize>,
1490        fail_next_prompt: Arc<AtomicBool>,
1491    }
1492
1493    #[async_trait::async_trait(?Send)]
1494    impl acp::Agent for FakeAcpAgent {
1495        async fn initialize(
1496            &self,
1497            args: acp::InitializeRequest,
1498        ) -> acp::Result<acp::InitializeResponse> {
1499            Ok(
1500                acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
1501                    acp::AgentCapabilities::default()
1502                        .load_session(true)
1503                        .session_capabilities(
1504                            acp::SessionCapabilities::default()
1505                                .close(acp::SessionCloseCapabilities::new()),
1506                        ),
1507                ),
1508            )
1509        }
1510
1511        async fn authenticate(
1512            &self,
1513            _: acp::AuthenticateRequest,
1514        ) -> acp::Result<acp::AuthenticateResponse> {
1515            Ok(Default::default())
1516        }
1517
1518        async fn new_session(
1519            &self,
1520            _: acp::NewSessionRequest,
1521        ) -> acp::Result<acp::NewSessionResponse> {
1522            Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1523        }
1524
1525        async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
1526            if self.fail_next_prompt.swap(false, Ordering::SeqCst) {
1527                Err(acp::ErrorCode::InternalError.into())
1528            } else {
1529                Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
1530            }
1531        }
1532
1533        async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
1534            Ok(())
1535        }
1536
1537        async fn load_session(
1538            &self,
1539            _: acp::LoadSessionRequest,
1540        ) -> acp::Result<acp::LoadSessionResponse> {
1541            self.load_session_count.fetch_add(1, Ordering::SeqCst);
1542            Ok(acp::LoadSessionResponse::new())
1543        }
1544
1545        async fn close_session(
1546            &self,
1547            _: acp::CloseSessionRequest,
1548        ) -> acp::Result<acp::CloseSessionResponse> {
1549            self.close_session_count.fetch_add(1, Ordering::SeqCst);
1550            Ok(acp::CloseSessionResponse::new())
1551        }
1552    }
1553
1554    async fn build_fake_acp_connection(
1555        project: Entity<Project>,
1556        load_session_count: Arc<AtomicUsize>,
1557        close_session_count: Arc<AtomicUsize>,
1558        fail_next_prompt: Arc<AtomicBool>,
1559        cx: &mut AsyncApp,
1560    ) -> Result<FakeAcpConnectionHarness> {
1561        let (c2a_writer, c2a_reader) = async_pipe::pipe();
1562        let (a2c_writer, a2c_reader) = async_pipe::pipe();
1563
1564        let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1565            Rc::new(RefCell::new(HashMap::default()));
1566        let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1567            Rc::new(RefCell::new(None));
1568
1569        let foreground = cx.foreground_executor().clone();
1570
1571        let client_delegate = ClientDelegate {
1572            sessions: sessions.clone(),
1573            session_list: session_list_container,
1574            cx: cx.clone(),
1575        };
1576
1577        let (client_conn, client_io_task) =
1578            acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
1579                let foreground = foreground.clone();
1580                move |fut| {
1581                    foreground.spawn(fut).detach();
1582                }
1583            });
1584
1585        let fake_agent = FakeAcpAgent {
1586            load_session_count: load_session_count.clone(),
1587            close_session_count: close_session_count.clone(),
1588            fail_next_prompt,
1589        };
1590
1591        let (_, agent_io_task) =
1592            acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
1593                let foreground = foreground.clone();
1594                move |fut| {
1595                    foreground.spawn(fut).detach();
1596                }
1597            });
1598
1599        let client_io_task = cx.background_spawn(client_io_task);
1600        let agent_io_task = cx.background_spawn(agent_io_task);
1601
1602        let response = client_conn
1603            .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
1604            .await?;
1605
1606        let agent_capabilities = response.agent_capabilities;
1607
1608        let agent_server_store =
1609            project.read_with(cx, |project, _| project.agent_server_store().downgrade());
1610
1611        let connection = cx.update(|cx| {
1612            AcpConnection::new_for_test(
1613                Rc::new(client_conn),
1614                sessions,
1615                agent_capabilities,
1616                agent_server_store,
1617                client_io_task,
1618                cx,
1619            )
1620        });
1621
1622        let keep_agent_alive = cx.background_spawn(async move {
1623            agent_io_task.await.ok();
1624            anyhow::Ok(())
1625        });
1626
1627        Ok(FakeAcpConnectionHarness {
1628            connection: Rc::new(connection),
1629            load_session_count,
1630            close_session_count,
1631            keep_agent_alive,
1632        })
1633    }
1634
1635    pub async fn connect_fake_acp_connection(
1636        project: Entity<Project>,
1637        cx: &mut gpui::TestAppContext,
1638    ) -> FakeAcpConnectionHarness {
1639        cx.update(|cx| {
1640            let store = settings::SettingsStore::test(cx);
1641            cx.set_global(store);
1642        });
1643
1644        build_fake_acp_connection(
1645            project,
1646            Arc::new(AtomicUsize::new(0)),
1647            Arc::new(AtomicUsize::new(0)),
1648            Arc::new(AtomicBool::new(false)),
1649            &mut cx.to_async(),
1650        )
1651        .await
1652        .expect("failed to initialize ACP connection")
1653    }
1654}
1655
1656#[cfg(test)]
1657mod tests {
1658    use std::sync::atomic::{AtomicUsize, Ordering};
1659
1660    use super::*;
1661
1662    #[test]
1663    fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1664        let command = AgentServerCommand {
1665            path: "/path/to/agent".into(),
1666            args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1667            env: Some(HashMap::from_iter([
1668                ("BASE".into(), "1".into()),
1669                ("SHARED".into(), "override".into()),
1670                ("EXTRA".into(), "2".into()),
1671            ])),
1672        };
1673        let method = acp::AuthMethodTerminal::new("login", "Login");
1674
1675        let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1676
1677        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1678        assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1679        assert_eq!(
1680            task.env,
1681            HashMap::from_iter([
1682                ("BASE".into(), "1".into()),
1683                ("SHARED".into(), "override".into()),
1684                ("EXTRA".into(), "2".into()),
1685            ])
1686        );
1687        assert_eq!(task.label, "Login");
1688        assert_eq!(task.command_label, "Login");
1689    }
1690
1691    #[test]
1692    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1693        let method_id = acp::AuthMethodId::new("legacy-login");
1694        let method = acp::AuthMethod::Agent(
1695            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1696                "terminal-auth".to_string(),
1697                serde_json::json!({
1698                    "label": "legacy /auth",
1699                    "command": "legacy-agent",
1700                    "args": ["auth", "--interactive"],
1701                    "env": {
1702                        "AUTH_MODE": "interactive",
1703                    },
1704                }),
1705            )])),
1706        );
1707
1708        let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1709            .expect("expected legacy terminal auth task");
1710
1711        assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1712        assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1713        assert_eq!(task.args, vec!["auth", "--interactive"]);
1714        assert_eq!(
1715            task.env,
1716            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1717        );
1718        assert_eq!(task.label, "legacy /auth");
1719    }
1720
1721    #[test]
1722    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1723        let method_id = acp::AuthMethodId::new("legacy-login");
1724        let method = acp::AuthMethod::Agent(
1725            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1726                "terminal-auth".to_string(),
1727                serde_json::json!({
1728                    "label": "legacy /auth",
1729                }),
1730            )])),
1731        );
1732
1733        assert!(
1734            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1735        );
1736    }
1737
1738    #[test]
1739    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1740        let method_id = acp::AuthMethodId::new("login");
1741        let method = acp::AuthMethod::Terminal(
1742            acp::AuthMethodTerminal::new(method_id, "Login")
1743                .args(vec!["/auth".into()])
1744                .env(std::collections::HashMap::from_iter([(
1745                    "AUTH_MODE".into(),
1746                    "first-class".into(),
1747                )]))
1748                .meta(acp::Meta::from_iter([(
1749                    "terminal-auth".to_string(),
1750                    serde_json::json!({
1751                        "label": "legacy /auth",
1752                        "command": "legacy-agent",
1753                        "args": ["legacy-auth"],
1754                        "env": {
1755                            "AUTH_MODE": "legacy",
1756                        },
1757                    }),
1758                )])),
1759        );
1760
1761        let command = AgentServerCommand {
1762            path: "/path/to/agent".into(),
1763            args: vec!["--acp".into(), "/auth".into()],
1764            env: Some(HashMap::from_iter([
1765                ("BASE".into(), "1".into()),
1766                ("AUTH_MODE".into(), "first-class".into()),
1767            ])),
1768        };
1769
1770        let task = match &method {
1771            acp::AuthMethod::Terminal(terminal) => {
1772                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1773            }
1774            _ => unreachable!(),
1775        };
1776
1777        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1778        assert_eq!(task.args, vec!["--acp", "/auth"]);
1779        assert_eq!(
1780            task.env,
1781            HashMap::from_iter([
1782                ("BASE".into(), "1".into()),
1783                ("AUTH_MODE".into(), "first-class".into()),
1784            ])
1785        );
1786        assert_eq!(task.label, "Login");
1787    }
1788
1789    struct FakeAcpAgent {
1790        load_session_count: Arc<AtomicUsize>,
1791        close_session_count: Arc<AtomicUsize>,
1792    }
1793
1794    #[async_trait::async_trait(?Send)]
1795    impl acp::Agent for FakeAcpAgent {
1796        async fn initialize(
1797            &self,
1798            args: acp::InitializeRequest,
1799        ) -> acp::Result<acp::InitializeResponse> {
1800            Ok(
1801                acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
1802                    acp::AgentCapabilities::default()
1803                        .load_session(true)
1804                        .session_capabilities(
1805                            acp::SessionCapabilities::default()
1806                                .close(acp::SessionCloseCapabilities::new()),
1807                        ),
1808                ),
1809            )
1810        }
1811
1812        async fn authenticate(
1813            &self,
1814            _: acp::AuthenticateRequest,
1815        ) -> acp::Result<acp::AuthenticateResponse> {
1816            Ok(Default::default())
1817        }
1818
1819        async fn new_session(
1820            &self,
1821            _: acp::NewSessionRequest,
1822        ) -> acp::Result<acp::NewSessionResponse> {
1823            Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1824        }
1825
1826        async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
1827            Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
1828        }
1829
1830        async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
1831            Ok(())
1832        }
1833
1834        async fn load_session(
1835            &self,
1836            _: acp::LoadSessionRequest,
1837        ) -> acp::Result<acp::LoadSessionResponse> {
1838            self.load_session_count.fetch_add(1, Ordering::SeqCst);
1839            Ok(acp::LoadSessionResponse::new())
1840        }
1841
1842        async fn close_session(
1843            &self,
1844            _: acp::CloseSessionRequest,
1845        ) -> acp::Result<acp::CloseSessionResponse> {
1846            self.close_session_count.fetch_add(1, Ordering::SeqCst);
1847            Ok(acp::CloseSessionResponse::new())
1848        }
1849    }
1850
1851    async fn connect_fake_agent(
1852        cx: &mut gpui::TestAppContext,
1853    ) -> (
1854        Rc<AcpConnection>,
1855        Entity<project::Project>,
1856        Arc<AtomicUsize>,
1857        Arc<AtomicUsize>,
1858        Task<anyhow::Result<()>>,
1859    ) {
1860        cx.update(|cx| {
1861            let store = settings::SettingsStore::test(cx);
1862            cx.set_global(store);
1863        });
1864
1865        let fs = fs::FakeFs::new(cx.executor());
1866        fs.insert_tree("/", serde_json::json!({ "a": {} })).await;
1867        let project = project::Project::test(fs, [std::path::Path::new("/a")], cx).await;
1868
1869        let load_count = Arc::new(AtomicUsize::new(0));
1870        let close_count = Arc::new(AtomicUsize::new(0));
1871
1872        let (c2a_writer, c2a_reader) = async_pipe::pipe();
1873        let (a2c_writer, a2c_reader) = async_pipe::pipe();
1874
1875        let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1876            Rc::new(RefCell::new(HashMap::default()));
1877        let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1878            Rc::new(RefCell::new(None));
1879
1880        let foreground = cx.foreground_executor().clone();
1881
1882        let client_delegate = ClientDelegate {
1883            sessions: sessions.clone(),
1884            session_list: session_list_container,
1885            cx: cx.to_async(),
1886        };
1887
1888        let (client_conn, client_io_task) =
1889            acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
1890                let foreground = foreground.clone();
1891                move |fut| {
1892                    foreground.spawn(fut).detach();
1893                }
1894            });
1895
1896        let fake_agent = FakeAcpAgent {
1897            load_session_count: load_count.clone(),
1898            close_session_count: close_count.clone(),
1899        };
1900
1901        let (_, agent_io_task) =
1902            acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
1903                let foreground = foreground.clone();
1904                move |fut| {
1905                    foreground.spawn(fut).detach();
1906                }
1907            });
1908
1909        let client_io_task = cx.background_spawn(client_io_task);
1910        let agent_io_task = cx.background_spawn(agent_io_task);
1911
1912        let response = client_conn
1913            .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
1914            .await
1915            .expect("failed to initialize ACP connection");
1916
1917        let agent_capabilities = response.agent_capabilities;
1918
1919        let agent_server_store =
1920            project.read_with(cx, |project, _| project.agent_server_store().downgrade());
1921
1922        let connection = cx.update(|cx| {
1923            AcpConnection::new_for_test(
1924                Rc::new(client_conn),
1925                sessions,
1926                agent_capabilities,
1927                agent_server_store,
1928                client_io_task,
1929                cx,
1930            )
1931        });
1932
1933        let keep_agent_alive = cx.background_spawn(async move {
1934            agent_io_task.await.ok();
1935            anyhow::Ok(())
1936        });
1937
1938        (
1939            Rc::new(connection),
1940            project,
1941            load_count,
1942            close_count,
1943            keep_agent_alive,
1944        )
1945    }
1946
1947    #[gpui::test]
1948    async fn test_loaded_sessions_keep_state_until_last_close(cx: &mut gpui::TestAppContext) {
1949        let (connection, project, load_count, close_count, _keep_agent_alive) =
1950            connect_fake_agent(cx).await;
1951
1952        let session_id = acp::SessionId::new("session-1");
1953        let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
1954
1955        // Load the same session twice concurrently — the second call should join
1956        // the pending task rather than issuing a second ACP load_session RPC.
1957        let first_load = cx.update(|cx| {
1958            connection.clone().load_session(
1959                session_id.clone(),
1960                project.clone(),
1961                work_dirs.clone(),
1962                None,
1963                cx,
1964            )
1965        });
1966        let second_load = cx.update(|cx| {
1967            connection.clone().load_session(
1968                session_id.clone(),
1969                project.clone(),
1970                work_dirs.clone(),
1971                None,
1972                cx,
1973            )
1974        });
1975
1976        let first_thread = first_load.await.expect("first load failed");
1977        let second_thread = second_load.await.expect("second load failed");
1978        cx.run_until_parked();
1979
1980        assert_eq!(
1981            first_thread.entity_id(),
1982            second_thread.entity_id(),
1983            "concurrent loads for the same session should share one AcpThread"
1984        );
1985        assert_eq!(
1986            load_count.load(Ordering::SeqCst),
1987            1,
1988            "underlying ACP load_session should be called exactly once for concurrent loads"
1989        );
1990
1991        // The session has ref_count 2. The first close should not send the ACP
1992        // close_session RPC — the session is still referenced.
1993        cx.update(|cx| connection.clone().close_session(&session_id, cx))
1994            .await
1995            .expect("first close failed");
1996
1997        assert_eq!(
1998            close_count.load(Ordering::SeqCst),
1999            0,
2000            "ACP close_session should not be sent while ref_count > 0"
2001        );
2002        assert!(
2003            connection.sessions.borrow().contains_key(&session_id),
2004            "session should still be tracked after first close"
2005        );
2006
2007        // The second close drops ref_count to 0 — now the ACP RPC must be sent.
2008        cx.update(|cx| connection.clone().close_session(&session_id, cx))
2009            .await
2010            .expect("second close failed");
2011        cx.run_until_parked();
2012
2013        assert_eq!(
2014            close_count.load(Ordering::SeqCst),
2015            1,
2016            "ACP close_session should be sent exactly once when ref_count reaches 0"
2017        );
2018        assert!(
2019            !connection.sessions.borrow().contains_key(&session_id),
2020            "session should be removed after final close"
2021        );
2022    }
2023}
2024
2025fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
2026    let context_server_store = project.read(cx).context_server_store().read(cx);
2027    let is_local = project.read(cx).is_local();
2028    context_server_store
2029        .configured_server_ids()
2030        .iter()
2031        .filter_map(|id| {
2032            let configuration = context_server_store.configuration_for_server(id)?;
2033            match &*configuration {
2034                project::context_server_store::ContextServerConfiguration::Custom {
2035                    command,
2036                    remote,
2037                    ..
2038                }
2039                | project::context_server_store::ContextServerConfiguration::Extension {
2040                    command,
2041                    remote,
2042                    ..
2043                } if is_local || *remote => Some(acp::McpServer::Stdio(
2044                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
2045                        .args(command.args.clone())
2046                        .env(if let Some(env) = command.env.as_ref() {
2047                            env.iter()
2048                                .map(|(name, value)| acp::EnvVariable::new(name, value))
2049                                .collect()
2050                        } else {
2051                            vec![]
2052                        }),
2053                )),
2054                project::context_server_store::ContextServerConfiguration::Http {
2055                    url,
2056                    headers,
2057                    timeout: _,
2058                } => Some(acp::McpServer::Http(
2059                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
2060                        headers
2061                            .iter()
2062                            .map(|(name, value)| acp::HttpHeader::new(name, value))
2063                            .collect(),
2064                    ),
2065                )),
2066                _ => None,
2067            }
2068        })
2069        .collect()
2070}
2071
2072fn config_state(
2073    modes: Option<acp::SessionModeState>,
2074    models: Option<acp::SessionModelState>,
2075    config_options: Option<Vec<acp::SessionConfigOption>>,
2076) -> (
2077    Option<Rc<RefCell<acp::SessionModeState>>>,
2078    Option<Rc<RefCell<acp::SessionModelState>>>,
2079    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
2080) {
2081    if let Some(opts) = config_options {
2082        return (None, None, Some(Rc::new(RefCell::new(opts))));
2083    }
2084
2085    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
2086    let models = models.map(|models| Rc::new(RefCell::new(models)));
2087    (modes, models, None)
2088}
2089
2090struct AcpSessionModes {
2091    session_id: acp::SessionId,
2092    connection: Rc<acp::ClientSideConnection>,
2093    state: Rc<RefCell<acp::SessionModeState>>,
2094}
2095
2096impl acp_thread::AgentSessionModes for AcpSessionModes {
2097    fn current_mode(&self) -> acp::SessionModeId {
2098        self.state.borrow().current_mode_id.clone()
2099    }
2100
2101    fn all_modes(&self) -> Vec<acp::SessionMode> {
2102        self.state.borrow().available_modes.clone()
2103    }
2104
2105    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
2106        let connection = self.connection.clone();
2107        let session_id = self.session_id.clone();
2108        let old_mode_id;
2109        {
2110            let mut state = self.state.borrow_mut();
2111            old_mode_id = state.current_mode_id.clone();
2112            state.current_mode_id = mode_id.clone();
2113        };
2114        let state = self.state.clone();
2115        cx.foreground_executor().spawn(async move {
2116            let result = connection
2117                .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
2118                .await;
2119
2120            if result.is_err() {
2121                state.borrow_mut().current_mode_id = old_mode_id;
2122            }
2123
2124            result?;
2125
2126            Ok(())
2127        })
2128    }
2129}
2130
2131struct AcpModelSelector {
2132    session_id: acp::SessionId,
2133    connection: Rc<acp::ClientSideConnection>,
2134    state: Rc<RefCell<acp::SessionModelState>>,
2135}
2136
2137impl AcpModelSelector {
2138    fn new(
2139        session_id: acp::SessionId,
2140        connection: Rc<acp::ClientSideConnection>,
2141        state: Rc<RefCell<acp::SessionModelState>>,
2142    ) -> Self {
2143        Self {
2144            session_id,
2145            connection,
2146            state,
2147        }
2148    }
2149}
2150
2151impl acp_thread::AgentModelSelector for AcpModelSelector {
2152    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
2153        Task::ready(Ok(acp_thread::AgentModelList::Flat(
2154            self.state
2155                .borrow()
2156                .available_models
2157                .clone()
2158                .into_iter()
2159                .map(acp_thread::AgentModelInfo::from)
2160                .collect(),
2161        )))
2162    }
2163
2164    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
2165        let connection = self.connection.clone();
2166        let session_id = self.session_id.clone();
2167        let old_model_id;
2168        {
2169            let mut state = self.state.borrow_mut();
2170            old_model_id = state.current_model_id.clone();
2171            state.current_model_id = model_id.clone();
2172        };
2173        let state = self.state.clone();
2174        cx.foreground_executor().spawn(async move {
2175            let result = connection
2176                .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
2177                .await;
2178
2179            if result.is_err() {
2180                state.borrow_mut().current_model_id = old_model_id;
2181            }
2182
2183            result?;
2184
2185            Ok(())
2186        })
2187    }
2188
2189    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
2190        let state = self.state.borrow();
2191        Task::ready(
2192            state
2193                .available_models
2194                .iter()
2195                .find(|m| m.model_id == state.current_model_id)
2196                .cloned()
2197                .map(acp_thread::AgentModelInfo::from)
2198                .ok_or_else(|| anyhow::anyhow!("Model not found")),
2199        )
2200    }
2201}
2202
2203struct AcpSessionConfigOptions {
2204    session_id: acp::SessionId,
2205    connection: Rc<acp::ClientSideConnection>,
2206    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
2207    watch_tx: Rc<RefCell<watch::Sender<()>>>,
2208    watch_rx: watch::Receiver<()>,
2209}
2210
2211impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
2212    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
2213        self.state.borrow().clone()
2214    }
2215
2216    fn set_config_option(
2217        &self,
2218        config_id: acp::SessionConfigId,
2219        value: acp::SessionConfigValueId,
2220        cx: &mut App,
2221    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
2222        let connection = self.connection.clone();
2223        let session_id = self.session_id.clone();
2224        let state = self.state.clone();
2225
2226        let watch_tx = self.watch_tx.clone();
2227
2228        cx.foreground_executor().spawn(async move {
2229            let response = connection
2230                .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
2231                    session_id, config_id, value,
2232                ))
2233                .await?;
2234
2235            *state.borrow_mut() = response.config_options.clone();
2236            watch_tx.borrow_mut().send(()).ok();
2237            Ok(response.config_options)
2238        })
2239    }
2240
2241    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
2242        Some(self.watch_rx.clone())
2243    }
2244}
2245
2246struct ClientDelegate {
2247    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
2248    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
2249    cx: AsyncApp,
2250}
2251
2252#[async_trait::async_trait(?Send)]
2253impl acp::Client for ClientDelegate {
2254    async fn request_permission(
2255        &self,
2256        arguments: acp::RequestPermissionRequest,
2257    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
2258        let thread;
2259        {
2260            let sessions_ref = self.sessions.borrow();
2261            let session = sessions_ref
2262                .get(&arguments.session_id)
2263                .context("Failed to get session")?;
2264            thread = session.thread.clone();
2265        }
2266
2267        let cx = &mut self.cx.clone();
2268
2269        let task = thread.update(cx, |thread, cx| {
2270            thread.request_tool_call_authorization(
2271                arguments.tool_call,
2272                acp_thread::PermissionOptions::Flat(arguments.options),
2273                cx,
2274            )
2275        })??;
2276
2277        let outcome = task.await;
2278
2279        Ok(acp::RequestPermissionResponse::new(outcome.into()))
2280    }
2281
2282    async fn write_text_file(
2283        &self,
2284        arguments: acp::WriteTextFileRequest,
2285    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
2286        let cx = &mut self.cx.clone();
2287        let task = self
2288            .session_thread(&arguments.session_id)?
2289            .update(cx, |thread, cx| {
2290                thread.write_text_file(arguments.path, arguments.content, cx)
2291            })?;
2292
2293        task.await?;
2294
2295        Ok(Default::default())
2296    }
2297
2298    async fn read_text_file(
2299        &self,
2300        arguments: acp::ReadTextFileRequest,
2301    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
2302        let task = self.session_thread(&arguments.session_id)?.update(
2303            &mut self.cx.clone(),
2304            |thread, cx| {
2305                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
2306            },
2307        )?;
2308
2309        let content = task.await?;
2310
2311        Ok(acp::ReadTextFileResponse::new(content))
2312    }
2313
2314    async fn session_notification(
2315        &self,
2316        notification: acp::SessionNotification,
2317    ) -> Result<(), acp::Error> {
2318        let (thread, session_modes, session_config_options) = {
2319            let sessions = self.sessions.borrow();
2320            let session = sessions
2321                .get(&notification.session_id)
2322                .context("Failed to get session")?;
2323            (
2324                session.thread.clone(),
2325                session.session_modes.clone(),
2326                session.config_options.clone(),
2327            )
2328        };
2329
2330        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
2331            current_mode_id,
2332            ..
2333        }) = &notification.update
2334        {
2335            if let Some(session_modes) = &session_modes {
2336                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
2337            }
2338        }
2339
2340        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
2341            config_options,
2342            ..
2343        }) = &notification.update
2344        {
2345            if let Some(opts) = &session_config_options {
2346                *opts.config_options.borrow_mut() = config_options.clone();
2347                opts.tx.borrow_mut().send(()).ok();
2348            }
2349        }
2350
2351        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
2352            && let Some(session_list) = self.session_list.borrow().as_ref()
2353        {
2354            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
2355        }
2356
2357        // Clone so we can inspect meta both before and after handing off to the thread
2358        let update_clone = notification.update.clone();
2359
2360        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
2361        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
2362            if let Some(meta) = &tc.meta {
2363                if let Some(terminal_info) = meta.get("terminal_info") {
2364                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
2365                    {
2366                        let terminal_id = acp::TerminalId::new(id_str);
2367                        let cwd = terminal_info
2368                            .get("cwd")
2369                            .and_then(|v| v.as_str().map(PathBuf::from));
2370
2371                        // Create a minimal display-only lower-level terminal and register it.
2372                        let _ = thread.update(&mut self.cx.clone(), |thread, cx| {
2373                            let builder = TerminalBuilder::new_display_only(
2374                                CursorShape::default(),
2375                                AlternateScroll::On,
2376                                None,
2377                                0,
2378                                cx.background_executor(),
2379                                thread.project().read(cx).path_style(cx),
2380                            )?;
2381                            let lower = cx.new(|cx| builder.subscribe(cx));
2382                            thread.on_terminal_provider_event(
2383                                TerminalProviderEvent::Created {
2384                                    terminal_id,
2385                                    label: tc.title.clone(),
2386                                    cwd,
2387                                    output_byte_limit: None,
2388                                    terminal: lower,
2389                                },
2390                                cx,
2391                            );
2392                            anyhow::Ok(())
2393                        });
2394                    }
2395                }
2396            }
2397        }
2398
2399        // Forward the update to the acp_thread as usual.
2400        thread.update(&mut self.cx.clone(), |thread, cx| {
2401            thread.handle_session_update(notification.update.clone(), cx)
2402        })??;
2403
2404        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
2405        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
2406            if let Some(meta) = &tcu.meta {
2407                if let Some(term_out) = meta.get("terminal_output") {
2408                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
2409                        let terminal_id = acp::TerminalId::new(id_str);
2410                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
2411                            let data = s.as_bytes().to_vec();
2412                            let _ = thread.update(&mut self.cx.clone(), |thread, cx| {
2413                                thread.on_terminal_provider_event(
2414                                    TerminalProviderEvent::Output { terminal_id, data },
2415                                    cx,
2416                                );
2417                            });
2418                        }
2419                    }
2420                }
2421
2422                // terminal_exit
2423                if let Some(term_exit) = meta.get("terminal_exit") {
2424                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
2425                        let terminal_id = acp::TerminalId::new(id_str);
2426                        let status = acp::TerminalExitStatus::new()
2427                            .exit_code(
2428                                term_exit
2429                                    .get("exit_code")
2430                                    .and_then(|v| v.as_u64())
2431                                    .map(|i| i as u32),
2432                            )
2433                            .signal(
2434                                term_exit
2435                                    .get("signal")
2436                                    .and_then(|v| v.as_str().map(|s| s.to_string())),
2437                            );
2438
2439                        let _ = thread.update(&mut self.cx.clone(), |thread, cx| {
2440                            thread.on_terminal_provider_event(
2441                                TerminalProviderEvent::Exit {
2442                                    terminal_id,
2443                                    status,
2444                                },
2445                                cx,
2446                            );
2447                        });
2448                    }
2449                }
2450            }
2451        }
2452
2453        Ok(())
2454    }
2455
2456    async fn create_terminal(
2457        &self,
2458        args: acp::CreateTerminalRequest,
2459    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
2460        let thread = self.session_thread(&args.session_id)?;
2461        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
2462
2463        let terminal_entity = acp_thread::create_terminal_entity(
2464            args.command.clone(),
2465            &args.args,
2466            args.env
2467                .into_iter()
2468                .map(|env| (env.name, env.value))
2469                .collect(),
2470            args.cwd.clone(),
2471            &project,
2472            &mut self.cx.clone(),
2473        )
2474        .await?;
2475
2476        // Register with renderer
2477        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
2478            thread.register_terminal_created(
2479                acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
2480                format!("{} {}", args.command, args.args.join(" ")),
2481                args.cwd.clone(),
2482                args.output_byte_limit,
2483                terminal_entity,
2484                cx,
2485            )
2486        })?;
2487        let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
2488        Ok(acp::CreateTerminalResponse::new(terminal_id))
2489    }
2490
2491    async fn kill_terminal(
2492        &self,
2493        args: acp::KillTerminalRequest,
2494    ) -> Result<acp::KillTerminalResponse, acp::Error> {
2495        self.session_thread(&args.session_id)?
2496            .update(&mut self.cx.clone(), |thread, cx| {
2497                thread.kill_terminal(args.terminal_id, cx)
2498            })??;
2499
2500        Ok(Default::default())
2501    }
2502
2503    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
2504        Err(acp::Error::method_not_found())
2505    }
2506
2507    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
2508        Err(acp::Error::method_not_found())
2509    }
2510
2511    async fn release_terminal(
2512        &self,
2513        args: acp::ReleaseTerminalRequest,
2514    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
2515        self.session_thread(&args.session_id)?
2516            .update(&mut self.cx.clone(), |thread, cx| {
2517                thread.release_terminal(args.terminal_id, cx)
2518            })??;
2519
2520        Ok(Default::default())
2521    }
2522
2523    async fn terminal_output(
2524        &self,
2525        args: acp::TerminalOutputRequest,
2526    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
2527        self.session_thread(&args.session_id)?
2528            .read_with(&mut self.cx.clone(), |thread, cx| {
2529                let out = thread
2530                    .terminal(args.terminal_id)?
2531                    .read(cx)
2532                    .current_output(cx);
2533
2534                Ok(out)
2535            })?
2536    }
2537
2538    async fn wait_for_terminal_exit(
2539        &self,
2540        args: acp::WaitForTerminalExitRequest,
2541    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
2542        let exit_status = self
2543            .session_thread(&args.session_id)?
2544            .update(&mut self.cx.clone(), |thread, cx| {
2545                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
2546            })??
2547            .await;
2548
2549        Ok(acp::WaitForTerminalExitResponse::new(exit_status))
2550    }
2551}
2552
2553impl ClientDelegate {
2554    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
2555        let sessions = self.sessions.borrow();
2556        sessions
2557            .get(session_id)
2558            .context("Failed to get session")
2559            .map(|session| session.thread.clone())
2560    }
2561}