acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::AcpConnectionRegistry;
   6use action_log::ActionLog;
   7use agent_client_protocol::schema::{self as acp, ErrorCode};
   8use agent_client_protocol::{
   9    Agent, Client, ConnectionTo, JsonRpcResponse, Lines, Responder, SentRequest,
  10};
  11use anyhow::anyhow;
  12use collections::HashMap;
  13use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  14use futures::channel::mpsc;
  15use futures::future::Shared;
  16use futures::io::BufReader;
  17use futures::{AsyncBufReadExt as _, Future, FutureExt as _, StreamExt as _};
  18use project::agent_server_store::{AgentServerCommand, AgentServerStore};
  19use project::{AgentId, Project};
  20use remote::remote_client::Interactive;
  21use serde::Deserialize;
  22use std::path::PathBuf;
  23use std::process::Stdio;
  24use std::rc::Rc;
  25use std::sync::Arc;
  26use std::{any::Any, cell::RefCell};
  27use task::{Shell, ShellBuilder, SpawnInTerminal};
  28use thiserror::Error;
  29use util::ResultExt as _;
  30use util::path_list::PathList;
  31use util::process::Child;
  32
  33use anyhow::{Context as _, Result};
  34use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  35
  36use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  37use terminal::TerminalBuilder;
  38use terminal::terminal_settings::{AlternateScroll, CursorShape};
  39
  40use crate::GEMINI_ID;
  41
  42pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  43
  44/// Awaits the response to an ACP request from a GPUI foreground task.
  45///
  46/// The ACP SDK offers two ways to consume a [`SentRequest`]:
  47///   - [`SentRequest::block_task`]: linear `.await` inside a spawned task.
  48///   - [`SentRequest::on_receiving_result`]: a callback invoked when the
  49///     response arrives, with the guarantee that no other inbound messages
  50///     are processed while the callback runs. This is the recommended form
  51///     inside SDK handler callbacks, where [`block_task`] would deadlock.
  52///
  53/// We use `on_receiving_result` with a oneshot bridge here (rather than
  54/// [`block_task`]) so that our handler-side code paths can share a single
  55/// request-awaiting helper. The SDK callback itself is trivial (one channel
  56/// send) so the extra ordering guarantee it imposes on the dispatch loop is
  57/// negligible.
  58fn into_foreground_future<T: JsonRpcResponse>(
  59    sent: SentRequest<T>,
  60) -> impl Future<Output = Result<T, acp::Error>> {
  61    let (tx, rx) = futures::channel::oneshot::channel();
  62    let spawn_result = sent.on_receiving_result(async move |result| {
  63        tx.send(result).ok();
  64        Ok(())
  65    });
  66    async move {
  67        spawn_result?;
  68        rx.await.map_err(|_| {
  69            acp::Error::internal_error()
  70                .data("response channel cancelled — connection may have dropped")
  71        })?
  72    }
  73}
  74
  75#[derive(Debug, Error)]
  76#[error("Unsupported version")]
  77pub struct UnsupportedVersion;
  78
  79/// Helper for flattening the nested `Result` shapes that come out of
  80/// `entity.update(cx, |_, cx| fallible_op(cx))` into a single `Result<T,
  81/// acp::Error>`.
  82///
  83/// `anyhow::Error` values get converted via `acp::Error::from`, which
  84/// downcasts an `acp::Error` back out of `anyhow` when present, so typed
  85/// errors like auth-required survive the trip.
  86trait FlattenAcpResult<T> {
  87    fn flatten_acp(self) -> Result<T, acp::Error>;
  88}
  89
  90impl<T> FlattenAcpResult<T> for Result<Result<T, anyhow::Error>, anyhow::Error> {
  91    fn flatten_acp(self) -> Result<T, acp::Error> {
  92        match self {
  93            Ok(Ok(value)) => Ok(value),
  94            Ok(Err(err)) => Err(err.into()),
  95            Err(err) => Err(err.into()),
  96        }
  97    }
  98}
  99
 100impl<T> FlattenAcpResult<T> for Result<Result<T, acp::Error>, anyhow::Error> {
 101    fn flatten_acp(self) -> Result<T, acp::Error> {
 102        match self {
 103            Ok(Ok(value)) => Ok(value),
 104            Ok(Err(err)) => Err(err),
 105            Err(err) => Err(err.into()),
 106        }
 107    }
 108}
 109
 110/// Holds state needed by foreground work dispatched from background handler closures.
 111struct ClientContext {
 112    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 113    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
 114}
 115
 116fn dispatch_queue_closed_error() -> acp::Error {
 117    acp::Error::internal_error().data("ACP foreground dispatch queue closed")
 118}
 119
 120/// Work items sent from `Send` handler closures to the `!Send` foreground thread.
 121trait ForegroundWorkItem: Send {
 122    fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext);
 123    fn reject(self: Box<Self>);
 124}
 125
 126type ForegroundWork = Box<dyn ForegroundWorkItem>;
 127
 128struct RequestForegroundWork<Req, Res>
 129where
 130    Req: Send + 'static,
 131    Res: JsonRpcResponse + Send + 'static,
 132{
 133    request: Req,
 134    responder: Responder<Res>,
 135    handler: fn(Req, Responder<Res>, &mut AsyncApp, &ClientContext),
 136}
 137
 138impl<Req, Res> ForegroundWorkItem for RequestForegroundWork<Req, Res>
 139where
 140    Req: Send + 'static,
 141    Res: JsonRpcResponse + Send + 'static,
 142{
 143    fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext) {
 144        let Self {
 145            request,
 146            responder,
 147            handler,
 148        } = *self;
 149        handler(request, responder, cx, ctx);
 150    }
 151
 152    fn reject(self: Box<Self>) {
 153        let Self { responder, .. } = *self;
 154        log::error!("ACP foreground dispatch queue closed while handling inbound request");
 155        responder
 156            .respond_with_error(dispatch_queue_closed_error())
 157            .log_err();
 158    }
 159}
 160
 161struct NotificationForegroundWork<Notif>
 162where
 163    Notif: Send + 'static,
 164{
 165    notification: Notif,
 166    connection: ConnectionTo<Agent>,
 167    handler: fn(Notif, &mut AsyncApp, &ClientContext),
 168}
 169
 170impl<Notif> ForegroundWorkItem for NotificationForegroundWork<Notif>
 171where
 172    Notif: Send + 'static,
 173{
 174    fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext) {
 175        let Self {
 176            notification,
 177            handler,
 178            ..
 179        } = *self;
 180        handler(notification, cx, ctx);
 181    }
 182
 183    fn reject(self: Box<Self>) {
 184        let Self { connection, .. } = *self;
 185        log::error!("ACP foreground dispatch queue closed while handling inbound notification");
 186        connection
 187            .send_error_notification(dispatch_queue_closed_error())
 188            .log_err();
 189    }
 190}
 191
 192fn enqueue_request<Req, Res>(
 193    dispatch_tx: &mpsc::UnboundedSender<ForegroundWork>,
 194    request: Req,
 195    responder: Responder<Res>,
 196    handler: fn(Req, Responder<Res>, &mut AsyncApp, &ClientContext),
 197) where
 198    Req: Send + 'static,
 199    Res: JsonRpcResponse + Send + 'static,
 200{
 201    let work: ForegroundWork = Box::new(RequestForegroundWork {
 202        request,
 203        responder,
 204        handler,
 205    });
 206    if let Err(err) = dispatch_tx.unbounded_send(work) {
 207        err.into_inner().reject();
 208    }
 209}
 210
 211fn enqueue_notification<Notif>(
 212    dispatch_tx: &mpsc::UnboundedSender<ForegroundWork>,
 213    notification: Notif,
 214    connection: ConnectionTo<Agent>,
 215    handler: fn(Notif, &mut AsyncApp, &ClientContext),
 216) where
 217    Notif: Send + 'static,
 218{
 219    let work: ForegroundWork = Box::new(NotificationForegroundWork {
 220        notification,
 221        connection,
 222        handler,
 223    });
 224    if let Err(err) = dispatch_tx.unbounded_send(work) {
 225        err.into_inner().reject();
 226    }
 227}
 228
 229pub struct AcpConnection {
 230    id: AgentId,
 231    telemetry_id: SharedString,
 232    connection: ConnectionTo<Agent>,
 233    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 234    pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
 235    auth_methods: Vec<acp::AuthMethod>,
 236    agent_server_store: WeakEntity<AgentServerStore>,
 237    agent_capabilities: acp::AgentCapabilities,
 238    default_mode: Option<acp::SessionModeId>,
 239    default_model: Option<acp::ModelId>,
 240    default_config_options: HashMap<String, String>,
 241    child: Option<Child>,
 242    session_list: Option<Rc<AcpSessionList>>,
 243    _io_task: Task<()>,
 244    _dispatch_task: Task<()>,
 245    _wait_task: Task<Result<()>>,
 246    _stderr_task: Task<Result<()>>,
 247}
 248
 249struct PendingAcpSession {
 250    task: Shared<Task<Result<Entity<AcpThread>, Arc<anyhow::Error>>>>,
 251    ref_count: usize,
 252}
 253
 254struct SessionConfigResponse {
 255    modes: Option<acp::SessionModeState>,
 256    models: Option<acp::SessionModelState>,
 257    config_options: Option<Vec<acp::SessionConfigOption>>,
 258}
 259
 260#[derive(Clone)]
 261struct ConfigOptions {
 262    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 263    tx: Rc<RefCell<watch::Sender<()>>>,
 264    rx: watch::Receiver<()>,
 265}
 266
 267impl ConfigOptions {
 268    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
 269        let (tx, rx) = watch::channel(());
 270        Self {
 271            config_options,
 272            tx: Rc::new(RefCell::new(tx)),
 273            rx,
 274        }
 275    }
 276}
 277
 278pub struct AcpSession {
 279    thread: WeakEntity<AcpThread>,
 280    suppress_abort_err: bool,
 281    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 282    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 283    config_options: Option<ConfigOptions>,
 284    ref_count: usize,
 285}
 286
 287pub struct AcpSessionList {
 288    connection: ConnectionTo<Agent>,
 289    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
 290    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
 291}
 292
 293impl AcpSessionList {
 294    fn new(connection: ConnectionTo<Agent>) -> Self {
 295        let (tx, rx) = smol::channel::unbounded();
 296        Self {
 297            connection,
 298            updates_tx: tx,
 299            updates_rx: rx,
 300        }
 301    }
 302
 303    fn notify_update(&self) {
 304        self.updates_tx
 305            .try_send(acp_thread::SessionListUpdate::Refresh)
 306            .log_err();
 307    }
 308
 309    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 310        self.updates_tx
 311            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 312            .log_err();
 313    }
 314}
 315
 316impl AgentSessionList for AcpSessionList {
 317    fn list_sessions(
 318        &self,
 319        request: AgentSessionListRequest,
 320        cx: &mut App,
 321    ) -> Task<Result<AgentSessionListResponse>> {
 322        let conn = self.connection.clone();
 323        cx.foreground_executor().spawn(async move {
 324            let acp_request = acp::ListSessionsRequest::new()
 325                .cwd(request.cwd)
 326                .cursor(request.cursor);
 327            let response = into_foreground_future(conn.send_request(acp_request))
 328                .await
 329                .map_err(map_acp_error)?;
 330            Ok(AgentSessionListResponse {
 331                sessions: response
 332                    .sessions
 333                    .into_iter()
 334                    .map(|s| AgentSessionInfo {
 335                        session_id: s.session_id,
 336                        work_dirs: Some(PathList::new(&[s.cwd])),
 337                        title: s.title.map(Into::into),
 338                        updated_at: s.updated_at.and_then(|date_str| {
 339                            chrono::DateTime::parse_from_rfc3339(&date_str)
 340                                .ok()
 341                                .map(|dt| dt.with_timezone(&chrono::Utc))
 342                        }),
 343                        created_at: None,
 344                        meta: s.meta,
 345                    })
 346                    .collect(),
 347                next_cursor: response.next_cursor,
 348                meta: response.meta,
 349            })
 350        })
 351    }
 352
 353    fn watch(
 354        &self,
 355        _cx: &mut App,
 356    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 357        Some(self.updates_rx.clone())
 358    }
 359
 360    fn notify_refresh(&self) {
 361        self.notify_update();
 362    }
 363
 364    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 365        self
 366    }
 367}
 368
 369pub async fn connect(
 370    agent_id: AgentId,
 371    project: Entity<Project>,
 372    command: AgentServerCommand,
 373    agent_server_store: WeakEntity<AgentServerStore>,
 374    default_mode: Option<acp::SessionModeId>,
 375    default_model: Option<acp::ModelId>,
 376    default_config_options: HashMap<String, String>,
 377    cx: &mut AsyncApp,
 378) -> Result<Rc<dyn AgentConnection>> {
 379    let conn = AcpConnection::stdio(
 380        agent_id,
 381        project,
 382        command.clone(),
 383        agent_server_store,
 384        default_mode,
 385        default_model,
 386        default_config_options,
 387        cx,
 388    )
 389    .await?;
 390    Ok(Rc::new(conn) as _)
 391}
 392
 393const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 394
 395/// Build a `Client` connection over `transport` with Zed's full
 396/// agent→client handler set wired up.
 397///
 398/// All incoming requests and notifications are forwarded to the foreground
 399/// dispatch queue via `dispatch_tx`, where they are handled by the
 400/// `handle_*` functions on a GPUI context. The returned future drives the
 401/// connection and completes when the transport closes; callers are expected
 402/// to spawn it on a background executor and hold the task for the lifetime
 403/// of the connection. The `connection_tx` oneshot receives the
 404/// `ConnectionTo<Agent>` handle as soon as the builder runs its `main_fn`.
 405fn connect_client_future(
 406    name: &'static str,
 407    transport: impl agent_client_protocol::ConnectTo<Client> + 'static,
 408    dispatch_tx: mpsc::UnboundedSender<ForegroundWork>,
 409    connection_tx: futures::channel::oneshot::Sender<ConnectionTo<Agent>>,
 410) -> impl Future<Output = Result<(), acp::Error>> {
 411    // Each handler forwards its inputs onto the foreground dispatch queue.
 412    // The SDK requires the closure to be `Send`, so we move a clone of
 413    // `dispatch_tx` into each one.
 414    macro_rules! on_request {
 415        ($handler:ident) => {{
 416            let dispatch_tx = dispatch_tx.clone();
 417            async move |req, responder, _connection| {
 418                enqueue_request(&dispatch_tx, req, responder, $handler);
 419                Ok(())
 420            }
 421        }};
 422    }
 423    macro_rules! on_notification {
 424        ($handler:ident) => {{
 425            let dispatch_tx = dispatch_tx.clone();
 426            async move |notif, connection| {
 427                enqueue_notification(&dispatch_tx, notif, connection, $handler);
 428                Ok(())
 429            }
 430        }};
 431    }
 432
 433    Client
 434        .builder()
 435        .name(name)
 436        // --- Request handlers (agent→client) ---
 437        .on_receive_request(
 438            on_request!(handle_request_permission),
 439            agent_client_protocol::on_receive_request!(),
 440        )
 441        .on_receive_request(
 442            on_request!(handle_write_text_file),
 443            agent_client_protocol::on_receive_request!(),
 444        )
 445        .on_receive_request(
 446            on_request!(handle_read_text_file),
 447            agent_client_protocol::on_receive_request!(),
 448        )
 449        .on_receive_request(
 450            on_request!(handle_create_terminal),
 451            agent_client_protocol::on_receive_request!(),
 452        )
 453        .on_receive_request(
 454            on_request!(handle_kill_terminal),
 455            agent_client_protocol::on_receive_request!(),
 456        )
 457        .on_receive_request(
 458            on_request!(handle_release_terminal),
 459            agent_client_protocol::on_receive_request!(),
 460        )
 461        .on_receive_request(
 462            on_request!(handle_terminal_output),
 463            agent_client_protocol::on_receive_request!(),
 464        )
 465        .on_receive_request(
 466            on_request!(handle_wait_for_terminal_exit),
 467            agent_client_protocol::on_receive_request!(),
 468        )
 469        // --- Notification handlers (agent→client) ---
 470        .on_receive_notification(
 471            on_notification!(handle_session_notification),
 472            agent_client_protocol::on_receive_notification!(),
 473        )
 474        .connect_with(
 475            transport,
 476            move |connection: ConnectionTo<Agent>| async move {
 477                if connection_tx.send(connection).is_err() {
 478                    log::error!("failed to send ACP connection handle — receiver was dropped");
 479                }
 480                // Keep the connection alive until the transport closes.
 481                futures::future::pending::<Result<(), acp::Error>>().await
 482            },
 483        )
 484}
 485
 486impl AcpConnection {
 487    pub async fn stdio(
 488        agent_id: AgentId,
 489        project: Entity<Project>,
 490        command: AgentServerCommand,
 491        agent_server_store: WeakEntity<AgentServerStore>,
 492        default_mode: Option<acp::SessionModeId>,
 493        default_model: Option<acp::ModelId>,
 494        default_config_options: HashMap<String, String>,
 495        cx: &mut AsyncApp,
 496    ) -> Result<Self> {
 497        let root_dir = project.read_with(cx, |project, cx| {
 498            project
 499                .default_path_list(cx)
 500                .ordered_paths()
 501                .next()
 502                .cloned()
 503        });
 504        let original_command = command.clone();
 505        let (path, args, env) = project
 506            .read_with(cx, |project, cx| {
 507                project.remote_client().and_then(|client| {
 508                    let template = client
 509                        .read(cx)
 510                        .build_command_with_options(
 511                            Some(command.path.display().to_string()),
 512                            &command.args,
 513                            &command.env.clone().into_iter().flatten().collect(),
 514                            root_dir.as_ref().map(|path| path.display().to_string()),
 515                            None,
 516                            Interactive::No,
 517                        )
 518                        .log_err()?;
 519                    Some((template.program, template.args, template.env))
 520                })
 521            })
 522            .unwrap_or_else(|| {
 523                (
 524                    command.path.display().to_string(),
 525                    command.args,
 526                    command.env.unwrap_or_default(),
 527                )
 528            });
 529
 530        let builder = ShellBuilder::new(&Shell::System, cfg!(windows)).non_interactive();
 531        let mut child = builder.build_std_command(Some(path.clone()), &args);
 532        child.envs(env.clone());
 533        if let Some(cwd) = project.read_with(cx, |project, _cx| {
 534            if project.is_local() {
 535                root_dir.as_ref()
 536            } else {
 537                None
 538            }
 539        }) {
 540            child.current_dir(cwd);
 541        }
 542        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 543
 544        let stdout = child.stdout.take().context("Failed to take stdout")?;
 545        let stdin = child.stdin.take().context("Failed to take stdin")?;
 546        let stderr = child.stderr.take().context("Failed to take stderr")?;
 547        log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
 548        log::trace!("Spawned (pid: {})", child.id());
 549
 550        let sessions = Rc::new(RefCell::new(HashMap::default()));
 551
 552        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 553            (
 554                release_channel::ReleaseChannel::try_global(cx)
 555                    .map(|release_channel| release_channel.display_name()),
 556                release_channel::AppVersion::global(cx).to_string(),
 557            )
 558        });
 559
 560        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 561            Rc::new(RefCell::new(None));
 562
 563        // Set up the foreground dispatch channel for bridging Send handler
 564        // closures to the !Send foreground thread.
 565        let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
 566
 567        // Register this connection with the logs panel registry. The
 568        // returned tap is opt-in: until someone subscribes to the ACP logs
 569        // panel, `emit_*` calls below are ~free (atomic load + return).
 570        let log_tap = cx.update(|cx| {
 571            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 572                registry.set_active_connection(agent_id.clone(), cx)
 573            })
 574        });
 575
 576        let incoming_lines = futures::io::BufReader::new(stdout).lines();
 577        let tapped_incoming = incoming_lines.inspect({
 578            let log_tap = log_tap.clone();
 579            move |result| match result {
 580                Ok(line) => log_tap.emit_incoming(line),
 581                Err(err) => {
 582                    // I/O errors on the transport are fatal for the SDK, but
 583                    // without logging them the ACP logs panel shows no trace
 584                    // of why the connection died.
 585                    log::warn!("ACP transport read error: {err}");
 586                }
 587            }
 588        });
 589
 590        let tapped_outgoing = futures::sink::unfold(
 591            (Box::pin(stdin), log_tap.clone()),
 592            async move |(mut writer, log_tap), line: String| {
 593                use futures::AsyncWriteExt;
 594                log_tap.emit_outgoing(&line);
 595                let mut bytes = line.into_bytes();
 596                bytes.push(b'\n');
 597                writer.write_all(&bytes).await?;
 598                Ok::<_, std::io::Error>((writer, log_tap))
 599            },
 600        );
 601
 602        let transport = Lines::new(tapped_outgoing, tapped_incoming);
 603
 604        // `connect_client_future` installs the production handler set and
 605        // hands us back both the connection-future (to run on a background
 606        // executor) and a oneshot receiver that produces the
 607        // `ConnectionTo<Agent>` once the transport handshake is ready.
 608        let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
 609        let connection_future =
 610            connect_client_future("zed", transport, dispatch_tx.clone(), connection_tx);
 611        let io_task = cx.background_spawn(async move {
 612            if let Err(err) = connection_future.await {
 613                log::error!("ACP connection error: {err}");
 614            }
 615        });
 616
 617        let connection: ConnectionTo<Agent> = connection_rx
 618            .await
 619            .context("Failed to receive ACP connection handle")?;
 620
 621        // Set up the foreground dispatch loop to process work items from handlers.
 622        let dispatch_context = ClientContext {
 623            sessions: sessions.clone(),
 624            session_list: client_session_list.clone(),
 625        };
 626        let dispatch_task = cx.spawn({
 627            let mut dispatch_rx = dispatch_rx;
 628            async move |cx| {
 629                while let Some(work) = dispatch_rx.next().await {
 630                    work.run(cx, &dispatch_context);
 631                }
 632            }
 633        });
 634
 635        let stderr_task = cx.background_spawn({
 636            let log_tap = log_tap.clone();
 637            async move {
 638                let mut stderr = BufReader::new(stderr);
 639                let mut line = String::new();
 640                while let Ok(n) = stderr.read_line(&mut line).await
 641                    && n > 0
 642                {
 643                    let trimmed = line.trim_end_matches(['\n', '\r']);
 644                    log::warn!("agent stderr: {trimmed}");
 645                    log_tap.emit_stderr(trimmed);
 646                    line.clear();
 647                }
 648                Ok(())
 649            }
 650        });
 651
 652        let wait_task = cx.spawn({
 653            let sessions = sessions.clone();
 654            let status_fut = child.status();
 655            async move |cx| {
 656                let status = status_fut.await?;
 657                emit_load_error_to_all_sessions(&sessions, LoadError::Exited { status }, cx);
 658                anyhow::Ok(())
 659            }
 660        });
 661
 662        let response = into_foreground_future(
 663            connection.send_request(
 664                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 665                    .client_capabilities(
 666                        acp::ClientCapabilities::new()
 667                            .fs(acp::FileSystemCapabilities::new()
 668                                .read_text_file(true)
 669                                .write_text_file(true))
 670                            .terminal(true)
 671                            .auth(acp::AuthCapabilities::new().terminal(true))
 672                            .meta(acp::Meta::from_iter([
 673                                ("terminal_output".into(), true.into()),
 674                                ("terminal-auth".into(), true.into()),
 675                            ])),
 676                    )
 677                    .client_info(
 678                        acp::Implementation::new("zed", version)
 679                            .title(release_channel.map(ToOwned::to_owned)),
 680                    ),
 681            ),
 682        )
 683        .await?;
 684
 685        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 686            return Err(UnsupportedVersion.into());
 687        }
 688
 689        let telemetry_id = response
 690            .agent_info
 691            // Use the one the agent provides if we have one
 692            .map(|info| info.name.into())
 693            // Otherwise, just use the name
 694            .unwrap_or_else(|| agent_id.0.clone());
 695
 696        let session_list = if response
 697            .agent_capabilities
 698            .session_capabilities
 699            .list
 700            .is_some()
 701        {
 702            let list = Rc::new(AcpSessionList::new(connection.clone()));
 703            *client_session_list.borrow_mut() = Some(list.clone());
 704            Some(list)
 705        } else {
 706            None
 707        };
 708
 709        // TODO: Remove this override once Google team releases their official auth methods
 710        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 711            let mut gemini_args = original_command.args.clone();
 712            gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
 713            let value = serde_json::json!({
 714                "label": "gemini /auth",
 715                "command": original_command.path.to_string_lossy(),
 716                "args": gemini_args,
 717                "env": original_command.env.unwrap_or_default(),
 718            });
 719            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 720            vec![acp::AuthMethod::Agent(
 721                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 722                    .description("Login with your Google or Vertex AI account")
 723                    .meta(meta),
 724            )]
 725        } else {
 726            response.auth_methods
 727        };
 728        Ok(Self {
 729            id: agent_id,
 730            auth_methods,
 731            agent_server_store,
 732            connection,
 733            telemetry_id,
 734            sessions,
 735            pending_sessions: Rc::new(RefCell::new(HashMap::default())),
 736            agent_capabilities: response.agent_capabilities,
 737            default_mode,
 738            default_model,
 739            default_config_options,
 740            session_list,
 741            _io_task: io_task,
 742            _dispatch_task: dispatch_task,
 743            _wait_task: wait_task,
 744            _stderr_task: stderr_task,
 745            child: Some(child),
 746        })
 747    }
 748
 749    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 750        &self.agent_capabilities.prompt_capabilities
 751    }
 752
 753    #[cfg(any(test, feature = "test-support"))]
 754    fn new_for_test(
 755        connection: ConnectionTo<Agent>,
 756        sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 757        agent_capabilities: acp::AgentCapabilities,
 758        agent_server_store: WeakEntity<AgentServerStore>,
 759        io_task: Task<()>,
 760        dispatch_task: Task<()>,
 761        _cx: &mut App,
 762    ) -> Self {
 763        Self {
 764            id: AgentId::new("test"),
 765            telemetry_id: "test".into(),
 766            connection,
 767            sessions,
 768            pending_sessions: Rc::new(RefCell::new(HashMap::default())),
 769            auth_methods: vec![],
 770            agent_server_store,
 771            agent_capabilities,
 772            default_mode: None,
 773            default_model: None,
 774            default_config_options: HashMap::default(),
 775            child: None,
 776            session_list: None,
 777            _io_task: io_task,
 778            _dispatch_task: dispatch_task,
 779            _wait_task: Task::ready(Ok(())),
 780            _stderr_task: Task::ready(Ok(())),
 781        }
 782    }
 783
 784    fn open_or_create_session(
 785        self: Rc<Self>,
 786        session_id: acp::SessionId,
 787        project: Entity<Project>,
 788        work_dirs: PathList,
 789        title: Option<SharedString>,
 790        rpc_call: impl FnOnce(
 791            ConnectionTo<Agent>,
 792            acp::SessionId,
 793            PathBuf,
 794        )
 795            -> futures::future::LocalBoxFuture<'static, Result<SessionConfigResponse>>
 796        + 'static,
 797        cx: &mut App,
 798    ) -> Task<Result<Entity<AcpThread>>> {
 799        // Check `pending_sessions` before `sessions` because the session is now
 800        // inserted into `sessions` before the load RPC completes (so that
 801        // notifications dispatched during history replay can find the thread).
 802        // Concurrent loads should still wait for the in-flight task so that
 803        // ref-counting happens in one place and the caller sees a fully loaded
 804        // session.
 805        if let Some(pending) = self.pending_sessions.borrow_mut().get_mut(&session_id) {
 806            pending.ref_count += 1;
 807            let task = pending.task.clone();
 808            return cx
 809                .foreground_executor()
 810                .spawn(async move { task.await.map_err(|err| anyhow!(err)) });
 811        }
 812
 813        if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 814            session.ref_count += 1;
 815            if let Some(thread) = session.thread.upgrade() {
 816                return Task::ready(Ok(thread));
 817            }
 818        }
 819
 820        // TODO: remove this once ACP supports multiple working directories
 821        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 822            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 823        };
 824
 825        let shared_task = cx
 826            .spawn({
 827                let session_id = session_id.clone();
 828                let this = self.clone();
 829                async move |cx| {
 830                    let action_log = cx.new(|_| ActionLog::new(project.clone()));
 831                    let thread: Entity<AcpThread> = cx.new(|cx| {
 832                        AcpThread::new(
 833                            None,
 834                            title,
 835                            Some(work_dirs),
 836                            this.clone(),
 837                            project,
 838                            action_log,
 839                            session_id.clone(),
 840                            watch::Receiver::constant(
 841                                this.agent_capabilities.prompt_capabilities.clone(),
 842                            ),
 843                            cx,
 844                        )
 845                    });
 846
 847                    // Register the session before awaiting the RPC so that any
 848                    // `session/update` notifications that arrive during the call
 849                    // (e.g. history replay during `session/load`) can find the thread.
 850                    // Modes/models/config are filled in once the response arrives.
 851                    this.sessions.borrow_mut().insert(
 852                        session_id.clone(),
 853                        AcpSession {
 854                            thread: thread.downgrade(),
 855                            suppress_abort_err: false,
 856                            session_modes: None,
 857                            models: None,
 858                            config_options: None,
 859                            ref_count: 1,
 860                        },
 861                    );
 862
 863                    let response =
 864                        match rpc_call(this.connection.clone(), session_id.clone(), cwd).await {
 865                            Ok(response) => response,
 866                            Err(err) => {
 867                                this.sessions.borrow_mut().remove(&session_id);
 868                                this.pending_sessions.borrow_mut().remove(&session_id);
 869                                return Err(Arc::new(err));
 870                            }
 871                        };
 872
 873                    let (modes, models, config_options) =
 874                        config_state(response.modes, response.models, response.config_options);
 875
 876                    if let Some(config_opts) = config_options.as_ref() {
 877                        this.apply_default_config_options(&session_id, config_opts, cx);
 878                    }
 879
 880                    let ref_count = this
 881                        .pending_sessions
 882                        .borrow_mut()
 883                        .remove(&session_id)
 884                        .map_or(1, |pending| pending.ref_count);
 885
 886                    // If `close_session` ran to completion while the load RPC was in
 887                    // flight, it will have removed both the pending entry and the
 888                    // sessions entry (and dispatched the ACP close RPC). In that case
 889                    // the thread has no live session to attach to, so fail the load
 890                    // instead of handing back an orphaned thread.
 891                    {
 892                        let mut sessions = this.sessions.borrow_mut();
 893                        let Some(session) = sessions.get_mut(&session_id) else {
 894                            return Err(Arc::new(anyhow!(
 895                                "session was closed before load completed"
 896                            )));
 897                        };
 898                        session.session_modes = modes;
 899                        session.models = models;
 900                        session.config_options = config_options.map(ConfigOptions::new);
 901                        session.ref_count = ref_count;
 902                    }
 903
 904                    Ok(thread)
 905                }
 906            })
 907            .shared();
 908
 909        self.pending_sessions.borrow_mut().insert(
 910            session_id,
 911            PendingAcpSession {
 912                task: shared_task.clone(),
 913                ref_count: 1,
 914            },
 915        );
 916
 917        cx.foreground_executor()
 918            .spawn(async move { shared_task.await.map_err(|err| anyhow!(err)) })
 919    }
 920
 921    fn apply_default_config_options(
 922        &self,
 923        session_id: &acp::SessionId,
 924        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 925        cx: &mut AsyncApp,
 926    ) {
 927        let id = self.id.clone();
 928        let defaults_to_apply: Vec<_> = {
 929            let config_opts_ref = config_options.borrow();
 930            config_opts_ref
 931                .iter()
 932                .filter_map(|config_option| {
 933                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 934
 935                    let is_valid = match &config_option.kind {
 936                        acp::SessionConfigKind::Select(select) => match &select.options {
 937                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 938                                .iter()
 939                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 940                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 941                                groups.iter().any(|g| {
 942                                    g.options
 943                                        .iter()
 944                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 945                                })
 946                            }
 947                            _ => false,
 948                        },
 949                        _ => false,
 950                    };
 951
 952                    if is_valid {
 953                        let initial_value = match &config_option.kind {
 954                            acp::SessionConfigKind::Select(select) => {
 955                                Some(select.current_value.clone())
 956                            }
 957                            _ => None,
 958                        };
 959                        Some((
 960                            config_option.id.clone(),
 961                            default_value.clone(),
 962                            initial_value,
 963                        ))
 964                    } else {
 965                        log::warn!(
 966                            "`{}` is not a valid value for config option `{}` in {}",
 967                            default_value,
 968                            config_option.id.0,
 969                            id
 970                        );
 971                        None
 972                    }
 973                })
 974                .collect()
 975        };
 976
 977        for (config_id, default_value, initial_value) in defaults_to_apply {
 978            cx.spawn({
 979                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 980                let session_id = session_id.clone();
 981                let config_id_clone = config_id.clone();
 982                let config_opts = config_options.clone();
 983                let conn = self.connection.clone();
 984                async move |_| {
 985                    let result = into_foreground_future(conn.send_request(
 986                        acp::SetSessionConfigOptionRequest::new(
 987                            session_id,
 988                            config_id_clone.clone(),
 989                            default_value_id,
 990                        ),
 991                    ))
 992                    .await
 993                    .log_err();
 994
 995                    if result.is_none() {
 996                        if let Some(initial) = initial_value {
 997                            let mut opts = config_opts.borrow_mut();
 998                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 999                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
1000                                    select.current_value = initial;
1001                                }
1002                            }
1003                        }
1004                    }
1005                }
1006            })
1007            .detach();
1008
1009            let mut opts = config_options.borrow_mut();
1010            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
1011                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
1012                    select.current_value = acp::SessionConfigValueId::new(default_value);
1013                }
1014            }
1015        }
1016    }
1017}
1018
1019fn emit_load_error_to_all_sessions(
1020    sessions: &Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1021    error: LoadError,
1022    cx: &mut AsyncApp,
1023) {
1024    let threads: Vec<_> = sessions
1025        .borrow()
1026        .values()
1027        .map(|session| session.thread.clone())
1028        .collect();
1029
1030    for thread in threads {
1031        thread
1032            .update(cx, |thread, cx| thread.emit_load_error(error.clone(), cx))
1033            .ok();
1034    }
1035}
1036
1037impl Drop for AcpConnection {
1038    fn drop(&mut self) {
1039        if let Some(ref mut child) = self.child {
1040            child.kill().log_err();
1041        }
1042    }
1043}
1044
1045fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
1046    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
1047}
1048
1049fn terminal_auth_task(
1050    command: &AgentServerCommand,
1051    agent_id: &AgentId,
1052    method: &acp::AuthMethodTerminal,
1053) -> SpawnInTerminal {
1054    acp_thread::build_terminal_auth_task(
1055        terminal_auth_task_id(agent_id, &method.id),
1056        method.name.clone(),
1057        command.path.to_string_lossy().into_owned(),
1058        command.args.clone(),
1059        command.env.clone().unwrap_or_default(),
1060    )
1061}
1062
1063/// Used to support the _meta method prior to stabilization
1064fn meta_terminal_auth_task(
1065    agent_id: &AgentId,
1066    method_id: &acp::AuthMethodId,
1067    method: &acp::AuthMethod,
1068) -> Option<SpawnInTerminal> {
1069    #[derive(Deserialize)]
1070    struct MetaTerminalAuth {
1071        label: String,
1072        command: String,
1073        #[serde(default)]
1074        args: Vec<String>,
1075        #[serde(default)]
1076        env: HashMap<String, String>,
1077    }
1078
1079    let meta = match method {
1080        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
1081        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
1082        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
1083        _ => None,
1084    }?;
1085    let terminal_auth =
1086        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
1087
1088    Some(acp_thread::build_terminal_auth_task(
1089        terminal_auth_task_id(agent_id, method_id),
1090        terminal_auth.label.clone(),
1091        terminal_auth.command,
1092        terminal_auth.args,
1093        terminal_auth.env,
1094    ))
1095}
1096
1097impl AgentConnection for AcpConnection {
1098    fn agent_id(&self) -> AgentId {
1099        self.id.clone()
1100    }
1101
1102    fn telemetry_id(&self) -> SharedString {
1103        self.telemetry_id.clone()
1104    }
1105
1106    fn new_session(
1107        self: Rc<Self>,
1108        project: Entity<Project>,
1109        work_dirs: PathList,
1110        cx: &mut App,
1111    ) -> Task<Result<Entity<AcpThread>>> {
1112        // TODO: remove this once ACP supports multiple working directories
1113        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
1114            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
1115        };
1116        let name = self.id.0.clone();
1117        let mcp_servers = mcp_servers_for_project(&project, cx);
1118
1119        cx.spawn(async move |cx| {
1120            let response = into_foreground_future(
1121                self.connection
1122                    .send_request(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers)),
1123            )
1124            .await
1125            .map_err(map_acp_error)?;
1126
1127            let (modes, models, config_options) =
1128                config_state(response.modes, response.models, response.config_options);
1129
1130            if let Some(default_mode) = self.default_mode.clone() {
1131                if let Some(modes) = modes.as_ref() {
1132                    let mut modes_ref = modes.borrow_mut();
1133                    let has_mode = modes_ref
1134                        .available_modes
1135                        .iter()
1136                        .any(|mode| mode.id == default_mode);
1137
1138                    if has_mode {
1139                        let initial_mode_id = modes_ref.current_mode_id.clone();
1140
1141                        cx.spawn({
1142                            let default_mode = default_mode.clone();
1143                            let session_id = response.session_id.clone();
1144                            let modes = modes.clone();
1145                            let conn = self.connection.clone();
1146                            async move |_| {
1147                                let result = into_foreground_future(
1148                                    conn.send_request(acp::SetSessionModeRequest::new(
1149                                        session_id,
1150                                        default_mode,
1151                                    )),
1152                                )
1153                                .await
1154                                .log_err();
1155
1156                                if result.is_none() {
1157                                    modes.borrow_mut().current_mode_id = initial_mode_id;
1158                                }
1159                            }
1160                        })
1161                        .detach();
1162
1163                        modes_ref.current_mode_id = default_mode;
1164                    } else {
1165                        let available_modes = modes_ref
1166                            .available_modes
1167                            .iter()
1168                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
1169                            .collect::<Vec<_>>()
1170                            .join("\n");
1171
1172                        log::warn!(
1173                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
1174                        );
1175                    }
1176                }
1177            }
1178
1179            if let Some(default_model) = self.default_model.clone() {
1180                if let Some(models) = models.as_ref() {
1181                    let mut models_ref = models.borrow_mut();
1182                    let has_model = models_ref
1183                        .available_models
1184                        .iter()
1185                        .any(|model| model.model_id == default_model);
1186
1187                    if has_model {
1188                        let initial_model_id = models_ref.current_model_id.clone();
1189
1190                        cx.spawn({
1191                            let default_model = default_model.clone();
1192                            let session_id = response.session_id.clone();
1193                            let models = models.clone();
1194                            let conn = self.connection.clone();
1195                            async move |_| {
1196                                let result = into_foreground_future(
1197                                    conn.send_request(acp::SetSessionModelRequest::new(
1198                                        session_id,
1199                                        default_model,
1200                                    )),
1201                                )
1202                                .await
1203                                .log_err();
1204
1205                                if result.is_none() {
1206                                    models.borrow_mut().current_model_id = initial_model_id;
1207                                }
1208                            }
1209                        })
1210                        .detach();
1211
1212                        models_ref.current_model_id = default_model;
1213                    } else {
1214                        let available_models = models_ref
1215                            .available_models
1216                            .iter()
1217                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
1218                            .collect::<Vec<_>>()
1219                            .join("\n");
1220
1221                        log::warn!(
1222                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
1223                        );
1224                    }
1225                }
1226            }
1227
1228            if let Some(config_opts) = config_options.as_ref() {
1229                self.apply_default_config_options(&response.session_id, config_opts, cx);
1230            }
1231
1232            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1233            let thread: Entity<AcpThread> = cx.new(|cx| {
1234                AcpThread::new(
1235                    None,
1236                    None,
1237                    Some(work_dirs),
1238                    self.clone(),
1239                    project,
1240                    action_log,
1241                    response.session_id.clone(),
1242                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
1243                    watch::Receiver::constant(
1244                        self.agent_capabilities.prompt_capabilities.clone(),
1245                    ),
1246                    cx,
1247                )
1248            });
1249
1250            self.sessions.borrow_mut().insert(
1251                response.session_id,
1252                AcpSession {
1253                    thread: thread.downgrade(),
1254                    suppress_abort_err: false,
1255                    session_modes: modes,
1256                    models,
1257                    config_options: config_options.map(ConfigOptions::new),
1258                    ref_count: 1,
1259                },
1260            );
1261
1262            Ok(thread)
1263        })
1264    }
1265
1266    fn supports_load_session(&self) -> bool {
1267        self.agent_capabilities.load_session
1268    }
1269
1270    fn supports_resume_session(&self) -> bool {
1271        self.agent_capabilities
1272            .session_capabilities
1273            .resume
1274            .is_some()
1275    }
1276
1277    fn load_session(
1278        self: Rc<Self>,
1279        session_id: acp::SessionId,
1280        project: Entity<Project>,
1281        work_dirs: PathList,
1282        title: Option<SharedString>,
1283        cx: &mut App,
1284    ) -> Task<Result<Entity<AcpThread>>> {
1285        if !self.agent_capabilities.load_session {
1286            return Task::ready(Err(anyhow!(LoadError::Other(
1287                "Loading sessions is not supported by this agent.".into()
1288            ))));
1289        }
1290
1291        let mcp_servers = mcp_servers_for_project(&project, cx);
1292        self.open_or_create_session(
1293            session_id,
1294            project,
1295            work_dirs,
1296            title,
1297            move |connection, session_id, cwd| {
1298                Box::pin(async move {
1299                    let response = into_foreground_future(
1300                        connection.send_request(
1301                            acp::LoadSessionRequest::new(session_id.clone(), cwd)
1302                                .mcp_servers(mcp_servers),
1303                        ),
1304                    )
1305                    .await
1306                    .map_err(map_acp_error)?;
1307                    Ok(SessionConfigResponse {
1308                        modes: response.modes,
1309                        models: response.models,
1310                        config_options: response.config_options,
1311                    })
1312                })
1313            },
1314            cx,
1315        )
1316    }
1317
1318    fn resume_session(
1319        self: Rc<Self>,
1320        session_id: acp::SessionId,
1321        project: Entity<Project>,
1322        work_dirs: PathList,
1323        title: Option<SharedString>,
1324        cx: &mut App,
1325    ) -> Task<Result<Entity<AcpThread>>> {
1326        if self
1327            .agent_capabilities
1328            .session_capabilities
1329            .resume
1330            .is_none()
1331        {
1332            return Task::ready(Err(anyhow!(LoadError::Other(
1333                "Resuming sessions is not supported by this agent.".into()
1334            ))));
1335        }
1336
1337        let mcp_servers = mcp_servers_for_project(&project, cx);
1338        self.open_or_create_session(
1339            session_id,
1340            project,
1341            work_dirs,
1342            title,
1343            move |connection, session_id, cwd| {
1344                Box::pin(async move {
1345                    let response = into_foreground_future(
1346                        connection.send_request(
1347                            acp::ResumeSessionRequest::new(session_id.clone(), cwd)
1348                                .mcp_servers(mcp_servers),
1349                        ),
1350                    )
1351                    .await
1352                    .map_err(map_acp_error)?;
1353                    Ok(SessionConfigResponse {
1354                        modes: response.modes,
1355                        models: response.models,
1356                        config_options: response.config_options,
1357                    })
1358                })
1359            },
1360            cx,
1361        )
1362    }
1363
1364    fn supports_close_session(&self) -> bool {
1365        self.agent_capabilities.session_capabilities.close.is_some()
1366    }
1367
1368    fn close_session(
1369        self: Rc<Self>,
1370        session_id: &acp::SessionId,
1371        cx: &mut App,
1372    ) -> Task<Result<()>> {
1373        if !self.supports_close_session() {
1374            return Task::ready(Err(anyhow!(LoadError::Other(
1375                "Closing sessions is not supported by this agent.".into()
1376            ))));
1377        }
1378
1379        // If a load is still in flight, decrement its ref count. The pending
1380        // entry is the source of truth for how many handles exist during a
1381        // load, so we must tick it down here as well as the `sessions` entry
1382        // that was pre-registered to receive history-replay notifications.
1383        // Only once the pending ref count hits zero do we actually close the
1384        // session; the load task will observe the missing sessions entry and
1385        // fail with "session was closed before load completed".
1386        let pending_ref_count = {
1387            let mut pending_sessions = self.pending_sessions.borrow_mut();
1388            pending_sessions.get_mut(session_id).map(|pending| {
1389                pending.ref_count = pending.ref_count.saturating_sub(1);
1390                pending.ref_count
1391            })
1392        };
1393        match pending_ref_count {
1394            Some(0) => {
1395                self.pending_sessions.borrow_mut().remove(session_id);
1396                self.sessions.borrow_mut().remove(session_id);
1397
1398                let conn = self.connection.clone();
1399                let session_id = session_id.clone();
1400                return cx.foreground_executor().spawn(async move {
1401                    into_foreground_future(
1402                        conn.send_request(acp::CloseSessionRequest::new(session_id)),
1403                    )
1404                    .await?;
1405                    Ok(())
1406                });
1407            }
1408            Some(_) => return Task::ready(Ok(())),
1409            None => {}
1410        }
1411
1412        let mut sessions = self.sessions.borrow_mut();
1413        let Some(session) = sessions.get_mut(session_id) else {
1414            return Task::ready(Ok(()));
1415        };
1416
1417        session.ref_count = session.ref_count.saturating_sub(1);
1418        if session.ref_count > 0 {
1419            return Task::ready(Ok(()));
1420        }
1421
1422        sessions.remove(session_id);
1423        drop(sessions);
1424
1425        let conn = self.connection.clone();
1426        let session_id = session_id.clone();
1427        cx.foreground_executor().spawn(async move {
1428            into_foreground_future(
1429                conn.send_request(acp::CloseSessionRequest::new(session_id.clone())),
1430            )
1431            .await?;
1432            Ok(())
1433        })
1434    }
1435
1436    fn auth_methods(&self) -> &[acp::AuthMethod] {
1437        &self.auth_methods
1438    }
1439
1440    fn terminal_auth_task(
1441        &self,
1442        method_id: &acp::AuthMethodId,
1443        cx: &App,
1444    ) -> Option<Task<Result<SpawnInTerminal>>> {
1445        let method = self
1446            .auth_methods
1447            .iter()
1448            .find(|method| method.id() == method_id)?;
1449
1450        match method {
1451            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1452                let agent_id = self.id.clone();
1453                let terminal = terminal.clone();
1454                let store = self.agent_server_store.clone();
1455                Some(cx.spawn(async move |cx| {
1456                    let command = store
1457                        .update(cx, |store, cx| {
1458                            let agent = store
1459                                .get_external_agent(&agent_id)
1460                                .context("Agent server not found")?;
1461                            anyhow::Ok(agent.get_command(
1462                                terminal.args.clone(),
1463                                HashMap::from_iter(terminal.env.clone()),
1464                                &mut cx.to_async(),
1465                            ))
1466                        })?
1467                        .context("Failed to get agent command")?
1468                        .await?;
1469                    Ok(terminal_auth_task(&command, &agent_id, &terminal))
1470                }))
1471            }
1472            _ => meta_terminal_auth_task(&self.id, method_id, method)
1473                .map(|task| Task::ready(Ok(task))),
1474        }
1475    }
1476
1477    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1478        let conn = self.connection.clone();
1479        cx.foreground_executor().spawn(async move {
1480            into_foreground_future(conn.send_request(acp::AuthenticateRequest::new(method_id)))
1481                .await?;
1482            Ok(())
1483        })
1484    }
1485
1486    fn prompt(
1487        &self,
1488        _id: acp_thread::UserMessageId,
1489        params: acp::PromptRequest,
1490        cx: &mut App,
1491    ) -> Task<Result<acp::PromptResponse>> {
1492        let conn = self.connection.clone();
1493        let sessions = self.sessions.clone();
1494        let session_id = params.session_id.clone();
1495        cx.foreground_executor().spawn(async move {
1496            let result = into_foreground_future(conn.send_request(params)).await;
1497
1498            let mut suppress_abort_err = false;
1499
1500            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1501                suppress_abort_err = session.suppress_abort_err;
1502                session.suppress_abort_err = false;
1503            }
1504
1505            match result {
1506                Ok(response) => Ok(response),
1507                Err(err) => {
1508                    if err.code == acp::ErrorCode::AuthRequired {
1509                        return Err(anyhow!(acp::Error::auth_required()));
1510                    }
1511
1512                    if err.code != ErrorCode::InternalError {
1513                        anyhow::bail!(err)
1514                    }
1515
1516                    let Some(data) = &err.data else {
1517                        anyhow::bail!(err)
1518                    };
1519
1520                    // Temporary workaround until the following PR is generally available:
1521                    // https://github.com/google-gemini/gemini-cli/pull/6656
1522
1523                    #[derive(Deserialize)]
1524                    #[serde(deny_unknown_fields)]
1525                    struct ErrorDetails {
1526                        details: Box<str>,
1527                    }
1528
1529                    match serde_json::from_value(data.clone()) {
1530                        Ok(ErrorDetails { details }) => {
1531                            if suppress_abort_err
1532                                && (details.contains("This operation was aborted")
1533                                    || details.contains("The user aborted a request"))
1534                            {
1535                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1536                            } else {
1537                                Err(anyhow!(details))
1538                            }
1539                        }
1540                        Err(_) => Err(anyhow!(err)),
1541                    }
1542                }
1543            }
1544        })
1545    }
1546
1547    fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) {
1548        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1549            session.suppress_abort_err = true;
1550        }
1551        let params = acp::CancelNotification::new(session_id.clone());
1552        self.connection.send_notification(params).log_err();
1553    }
1554
1555    fn session_modes(
1556        &self,
1557        session_id: &acp::SessionId,
1558        _cx: &App,
1559    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1560        let sessions = self.sessions.clone();
1561        let sessions_ref = sessions.borrow();
1562        let Some(session) = sessions_ref.get(session_id) else {
1563            return None;
1564        };
1565
1566        if let Some(modes) = session.session_modes.as_ref() {
1567            Some(Rc::new(AcpSessionModes {
1568                connection: self.connection.clone(),
1569                session_id: session_id.clone(),
1570                state: modes.clone(),
1571            }) as _)
1572        } else {
1573            None
1574        }
1575    }
1576
1577    fn model_selector(
1578        &self,
1579        session_id: &acp::SessionId,
1580    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1581        let sessions = self.sessions.clone();
1582        let sessions_ref = sessions.borrow();
1583        let Some(session) = sessions_ref.get(session_id) else {
1584            return None;
1585        };
1586
1587        if let Some(models) = session.models.as_ref() {
1588            Some(Rc::new(AcpModelSelector::new(
1589                session_id.clone(),
1590                self.connection.clone(),
1591                models.clone(),
1592            )) as _)
1593        } else {
1594            None
1595        }
1596    }
1597
1598    fn session_config_options(
1599        &self,
1600        session_id: &acp::SessionId,
1601        _cx: &App,
1602    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1603        let sessions = self.sessions.borrow();
1604        let session = sessions.get(session_id)?;
1605
1606        let config_opts = session.config_options.as_ref()?;
1607
1608        Some(Rc::new(AcpSessionConfigOptions {
1609            session_id: session_id.clone(),
1610            connection: self.connection.clone(),
1611            state: config_opts.config_options.clone(),
1612            watch_tx: config_opts.tx.clone(),
1613            watch_rx: config_opts.rx.clone(),
1614        }) as _)
1615    }
1616
1617    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1618        self.session_list.clone().map(|s| s as _)
1619    }
1620
1621    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1622        self
1623    }
1624}
1625
1626fn map_acp_error(err: acp::Error) -> anyhow::Error {
1627    if err.code == acp::ErrorCode::AuthRequired {
1628        let mut error = AuthRequired::new();
1629
1630        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1631            error = error.with_description(err.message);
1632        }
1633
1634        anyhow!(error)
1635    } else {
1636        anyhow!(err)
1637    }
1638}
1639
1640#[cfg(any(test, feature = "test-support"))]
1641pub mod test_support {
1642    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1643
1644    use acp_thread::{
1645        AgentModelSelector, AgentSessionConfigOptions, AgentSessionModes, AgentSessionRetry,
1646        AgentSessionSetTitle, AgentSessionTruncate, AgentTelemetry, UserMessageId,
1647    };
1648
1649    use super::*;
1650
1651    #[derive(Clone, Default)]
1652    pub struct FakeAcpAgentServer {
1653        load_session_count: Arc<AtomicUsize>,
1654        close_session_count: Arc<AtomicUsize>,
1655        fail_next_prompt: Arc<AtomicBool>,
1656        exit_status_sender:
1657            Arc<std::sync::Mutex<Option<smol::channel::Sender<std::process::ExitStatus>>>>,
1658    }
1659
1660    impl FakeAcpAgentServer {
1661        pub fn new() -> Self {
1662            Self::default()
1663        }
1664
1665        pub fn load_session_count(&self) -> Arc<AtomicUsize> {
1666            self.load_session_count.clone()
1667        }
1668
1669        pub fn close_session_count(&self) -> Arc<AtomicUsize> {
1670            self.close_session_count.clone()
1671        }
1672
1673        pub fn simulate_server_exit(&self) {
1674            let sender = self
1675                .exit_status_sender
1676                .lock()
1677                .expect("exit status sender lock should not be poisoned")
1678                .clone()
1679                .expect("fake ACP server must be connected before simulating exit");
1680            sender
1681                .try_send(std::process::ExitStatus::default())
1682                .expect("fake ACP server exit receiver should still be alive");
1683        }
1684
1685        pub fn fail_next_prompt(&self) {
1686            self.fail_next_prompt.store(true, Ordering::SeqCst);
1687        }
1688    }
1689
1690    impl crate::AgentServer for FakeAcpAgentServer {
1691        fn logo(&self) -> ui::IconName {
1692            ui::IconName::ZedAgent
1693        }
1694
1695        fn agent_id(&self) -> AgentId {
1696            AgentId::new("Test")
1697        }
1698
1699        fn connect(
1700            &self,
1701            _delegate: crate::AgentServerDelegate,
1702            project: Entity<Project>,
1703            cx: &mut App,
1704        ) -> Task<anyhow::Result<Rc<dyn AgentConnection>>> {
1705            let load_session_count = self.load_session_count.clone();
1706            let close_session_count = self.close_session_count.clone();
1707            let fail_next_prompt = self.fail_next_prompt.clone();
1708            let exit_status_sender = self.exit_status_sender.clone();
1709            cx.spawn(async move |cx| {
1710                let harness = build_fake_acp_connection(
1711                    project,
1712                    load_session_count,
1713                    close_session_count,
1714                    fail_next_prompt,
1715                    cx,
1716                )
1717                .await?;
1718                let (exit_tx, exit_rx) = smol::channel::bounded(1);
1719                *exit_status_sender
1720                    .lock()
1721                    .expect("exit status sender lock should not be poisoned") = Some(exit_tx);
1722                let connection = harness.connection.clone();
1723                let simulate_exit_task = cx.spawn(async move |cx| {
1724                    while let Ok(status) = exit_rx.recv().await {
1725                        emit_load_error_to_all_sessions(
1726                            &connection.sessions,
1727                            LoadError::Exited { status },
1728                            cx,
1729                        );
1730                    }
1731                    Ok(())
1732                });
1733                Ok(Rc::new(FakeAcpAgentConnection {
1734                    inner: harness.connection,
1735                    _keep_agent_alive: harness.keep_agent_alive,
1736                    _simulate_exit_task: simulate_exit_task,
1737                }) as Rc<dyn AgentConnection>)
1738            })
1739        }
1740
1741        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1742            self
1743        }
1744    }
1745
1746    pub struct FakeAcpConnectionHarness {
1747        pub connection: Rc<AcpConnection>,
1748        pub load_session_count: Arc<AtomicUsize>,
1749        pub close_session_count: Arc<AtomicUsize>,
1750        pub keep_agent_alive: Task<anyhow::Result<()>>,
1751    }
1752
1753    struct FakeAcpAgentConnection {
1754        inner: Rc<AcpConnection>,
1755        _keep_agent_alive: Task<anyhow::Result<()>>,
1756        _simulate_exit_task: Task<anyhow::Result<()>>,
1757    }
1758
1759    impl AgentConnection for FakeAcpAgentConnection {
1760        fn agent_id(&self) -> AgentId {
1761            self.inner.agent_id()
1762        }
1763
1764        fn telemetry_id(&self) -> SharedString {
1765            self.inner.telemetry_id()
1766        }
1767
1768        fn new_session(
1769            self: Rc<Self>,
1770            project: Entity<Project>,
1771            work_dirs: PathList,
1772            cx: &mut App,
1773        ) -> Task<Result<Entity<AcpThread>>> {
1774            self.inner.clone().new_session(project, work_dirs, cx)
1775        }
1776
1777        fn supports_load_session(&self) -> bool {
1778            self.inner.supports_load_session()
1779        }
1780
1781        fn load_session(
1782            self: Rc<Self>,
1783            session_id: acp::SessionId,
1784            project: Entity<Project>,
1785            work_dirs: PathList,
1786            title: Option<SharedString>,
1787            cx: &mut App,
1788        ) -> Task<Result<Entity<AcpThread>>> {
1789            self.inner
1790                .clone()
1791                .load_session(session_id, project, work_dirs, title, cx)
1792        }
1793
1794        fn supports_close_session(&self) -> bool {
1795            self.inner.supports_close_session()
1796        }
1797
1798        fn close_session(
1799            self: Rc<Self>,
1800            session_id: &acp::SessionId,
1801            cx: &mut App,
1802        ) -> Task<Result<()>> {
1803            self.inner.clone().close_session(session_id, cx)
1804        }
1805
1806        fn supports_resume_session(&self) -> bool {
1807            self.inner.supports_resume_session()
1808        }
1809
1810        fn resume_session(
1811            self: Rc<Self>,
1812            session_id: acp::SessionId,
1813            project: Entity<Project>,
1814            work_dirs: PathList,
1815            title: Option<SharedString>,
1816            cx: &mut App,
1817        ) -> Task<Result<Entity<AcpThread>>> {
1818            self.inner
1819                .clone()
1820                .resume_session(session_id, project, work_dirs, title, cx)
1821        }
1822
1823        fn auth_methods(&self) -> &[acp::AuthMethod] {
1824            self.inner.auth_methods()
1825        }
1826
1827        fn terminal_auth_task(
1828            &self,
1829            method: &acp::AuthMethodId,
1830            cx: &App,
1831        ) -> Option<Task<Result<SpawnInTerminal>>> {
1832            self.inner.terminal_auth_task(method, cx)
1833        }
1834
1835        fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1836            self.inner.authenticate(method, cx)
1837        }
1838
1839        fn prompt(
1840            &self,
1841            user_message_id: UserMessageId,
1842            params: acp::PromptRequest,
1843            cx: &mut App,
1844        ) -> Task<Result<acp::PromptResponse>> {
1845            self.inner.prompt(user_message_id, params, cx)
1846        }
1847
1848        fn retry(
1849            &self,
1850            session_id: &acp::SessionId,
1851            cx: &App,
1852        ) -> Option<Rc<dyn AgentSessionRetry>> {
1853            self.inner.retry(session_id, cx)
1854        }
1855
1856        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1857            self.inner.cancel(session_id, cx)
1858        }
1859
1860        fn truncate(
1861            &self,
1862            session_id: &acp::SessionId,
1863            cx: &App,
1864        ) -> Option<Rc<dyn AgentSessionTruncate>> {
1865            self.inner.truncate(session_id, cx)
1866        }
1867
1868        fn set_title(
1869            &self,
1870            session_id: &acp::SessionId,
1871            cx: &App,
1872        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
1873            self.inner.set_title(session_id, cx)
1874        }
1875
1876        fn model_selector(
1877            &self,
1878            session_id: &acp::SessionId,
1879        ) -> Option<Rc<dyn AgentModelSelector>> {
1880            self.inner.model_selector(session_id)
1881        }
1882
1883        fn telemetry(&self) -> Option<Rc<dyn AgentTelemetry>> {
1884            self.inner.telemetry()
1885        }
1886
1887        fn session_modes(
1888            &self,
1889            session_id: &acp::SessionId,
1890            cx: &App,
1891        ) -> Option<Rc<dyn AgentSessionModes>> {
1892            self.inner.session_modes(session_id, cx)
1893        }
1894
1895        fn session_config_options(
1896            &self,
1897            session_id: &acp::SessionId,
1898            cx: &App,
1899        ) -> Option<Rc<dyn AgentSessionConfigOptions>> {
1900            self.inner.session_config_options(session_id, cx)
1901        }
1902
1903        fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1904            self.inner.session_list(cx)
1905        }
1906
1907        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1908            self
1909        }
1910    }
1911
1912    async fn build_fake_acp_connection(
1913        project: Entity<Project>,
1914        load_session_count: Arc<AtomicUsize>,
1915        close_session_count: Arc<AtomicUsize>,
1916        fail_next_prompt: Arc<AtomicBool>,
1917        cx: &mut AsyncApp,
1918    ) -> Result<FakeAcpConnectionHarness> {
1919        let (client_transport, agent_transport) = agent_client_protocol::Channel::duplex();
1920
1921        let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1922            Rc::new(RefCell::new(HashMap::default()));
1923        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1924            Rc::new(RefCell::new(None));
1925
1926        let agent_future = Agent
1927            .builder()
1928            .name("fake-agent")
1929            .on_receive_request(
1930                async move |req: acp::InitializeRequest, responder, _cx| {
1931                    responder.respond(
1932                        acp::InitializeResponse::new(req.protocol_version).agent_capabilities(
1933                            acp::AgentCapabilities::default()
1934                                .load_session(true)
1935                                .session_capabilities(
1936                                    acp::SessionCapabilities::default()
1937                                        .close(acp::SessionCloseCapabilities::new()),
1938                                ),
1939                        ),
1940                    )
1941                },
1942                agent_client_protocol::on_receive_request!(),
1943            )
1944            .on_receive_request(
1945                async move |_req: acp::AuthenticateRequest, responder, _cx| {
1946                    responder.respond(Default::default())
1947                },
1948                agent_client_protocol::on_receive_request!(),
1949            )
1950            .on_receive_request(
1951                async move |_req: acp::NewSessionRequest, responder, _cx| {
1952                    responder.respond(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1953                },
1954                agent_client_protocol::on_receive_request!(),
1955            )
1956            .on_receive_request(
1957                {
1958                    let fail_next_prompt = fail_next_prompt.clone();
1959                    async move |_req: acp::PromptRequest, responder, _cx| {
1960                        if fail_next_prompt.swap(false, Ordering::SeqCst) {
1961                            responder.respond_with_error(acp::ErrorCode::InternalError.into())
1962                        } else {
1963                            responder.respond(acp::PromptResponse::new(acp::StopReason::EndTurn))
1964                        }
1965                    }
1966                },
1967                agent_client_protocol::on_receive_request!(),
1968            )
1969            .on_receive_request(
1970                {
1971                    let load_session_count = load_session_count.clone();
1972                    async move |_req: acp::LoadSessionRequest, responder, _cx| {
1973                        load_session_count.fetch_add(1, Ordering::SeqCst);
1974                        responder.respond(acp::LoadSessionResponse::new())
1975                    }
1976                },
1977                agent_client_protocol::on_receive_request!(),
1978            )
1979            .on_receive_request(
1980                {
1981                    let close_session_count = close_session_count.clone();
1982                    async move |_req: acp::CloseSessionRequest, responder, _cx| {
1983                        close_session_count.fetch_add(1, Ordering::SeqCst);
1984                        responder.respond(acp::CloseSessionResponse::new())
1985                    }
1986                },
1987                agent_client_protocol::on_receive_request!(),
1988            )
1989            .on_receive_notification(
1990                async move |_notif: acp::CancelNotification, _cx| Ok(()),
1991                agent_client_protocol::on_receive_notification!(),
1992            )
1993            .connect_to(agent_transport);
1994
1995        let agent_io_task = cx.background_spawn(agent_future);
1996
1997        // Wire the production handler set into the fake client so inbound
1998        // requests/notifications from the fake agent are dispatched the
1999        // same way the real `stdio` path does.
2000        let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
2001
2002        let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
2003        let client_future = connect_client_future(
2004            "zed-test",
2005            client_transport,
2006            dispatch_tx.clone(),
2007            connection_tx,
2008        );
2009        let client_io_task = cx.background_spawn(async move {
2010            client_future.await.ok();
2011        });
2012
2013        let client_conn: ConnectionTo<Agent> = connection_rx
2014            .await
2015            .context("failed to receive fake ACP connection handle")?;
2016
2017        let response = into_foreground_future(
2018            client_conn.send_request(acp::InitializeRequest::new(acp::ProtocolVersion::V1)),
2019        )
2020        .await?;
2021
2022        let agent_capabilities = response.agent_capabilities;
2023
2024        let dispatch_context = ClientContext {
2025            sessions: sessions.clone(),
2026            session_list: client_session_list.clone(),
2027        };
2028        let dispatch_task = cx.spawn({
2029            let mut dispatch_rx = dispatch_rx;
2030            async move |cx| {
2031                while let Some(work) = dispatch_rx.next().await {
2032                    work.run(cx, &dispatch_context);
2033                }
2034            }
2035        });
2036
2037        let agent_server_store =
2038            project.read_with(cx, |project, _| project.agent_server_store().downgrade());
2039
2040        let connection = cx.update(|cx| {
2041            AcpConnection::new_for_test(
2042                client_conn,
2043                sessions,
2044                agent_capabilities,
2045                agent_server_store,
2046                client_io_task,
2047                dispatch_task,
2048                cx,
2049            )
2050        });
2051
2052        let keep_agent_alive = cx.background_spawn(async move {
2053            agent_io_task.await.ok();
2054            anyhow::Ok(())
2055        });
2056
2057        Ok(FakeAcpConnectionHarness {
2058            connection: Rc::new(connection),
2059            load_session_count,
2060            close_session_count,
2061            keep_agent_alive,
2062        })
2063    }
2064
2065    pub async fn connect_fake_acp_connection(
2066        project: Entity<Project>,
2067        cx: &mut gpui::TestAppContext,
2068    ) -> FakeAcpConnectionHarness {
2069        cx.update(|cx| {
2070            let store = settings::SettingsStore::test(cx);
2071            cx.set_global(store);
2072        });
2073
2074        build_fake_acp_connection(
2075            project,
2076            Arc::new(AtomicUsize::new(0)),
2077            Arc::new(AtomicUsize::new(0)),
2078            Arc::new(AtomicBool::new(false)),
2079            &mut cx.to_async(),
2080        )
2081        .await
2082        .expect("failed to initialize ACP connection")
2083    }
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088    use std::sync::atomic::{AtomicUsize, Ordering};
2089
2090    use super::*;
2091
2092    #[test]
2093    fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
2094        let command = AgentServerCommand {
2095            path: "/path/to/agent".into(),
2096            args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
2097            env: Some(HashMap::from_iter([
2098                ("BASE".into(), "1".into()),
2099                ("SHARED".into(), "override".into()),
2100                ("EXTRA".into(), "2".into()),
2101            ])),
2102        };
2103        let method = acp::AuthMethodTerminal::new("login", "Login");
2104
2105        let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
2106
2107        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
2108        assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
2109        assert_eq!(
2110            task.env,
2111            HashMap::from_iter([
2112                ("BASE".into(), "1".into()),
2113                ("SHARED".into(), "override".into()),
2114                ("EXTRA".into(), "2".into()),
2115            ])
2116        );
2117        assert_eq!(task.label, "Login");
2118        assert_eq!(task.command_label, "Login");
2119    }
2120
2121    #[test]
2122    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
2123        let method_id = acp::AuthMethodId::new("legacy-login");
2124        let method = acp::AuthMethod::Agent(
2125            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
2126                "terminal-auth".to_string(),
2127                serde_json::json!({
2128                    "label": "legacy /auth",
2129                    "command": "legacy-agent",
2130                    "args": ["auth", "--interactive"],
2131                    "env": {
2132                        "AUTH_MODE": "interactive",
2133                    },
2134                }),
2135            )])),
2136        );
2137
2138        let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
2139            .expect("expected legacy terminal auth task");
2140
2141        assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
2142        assert_eq!(task.command.as_deref(), Some("legacy-agent"));
2143        assert_eq!(task.args, vec!["auth", "--interactive"]);
2144        assert_eq!(
2145            task.env,
2146            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
2147        );
2148        assert_eq!(task.label, "legacy /auth");
2149    }
2150
2151    #[test]
2152    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
2153        let method_id = acp::AuthMethodId::new("legacy-login");
2154        let method = acp::AuthMethod::Agent(
2155            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
2156                "terminal-auth".to_string(),
2157                serde_json::json!({
2158                    "label": "legacy /auth",
2159                }),
2160            )])),
2161        );
2162
2163        assert!(
2164            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
2165        );
2166    }
2167
2168    #[test]
2169    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
2170        let method_id = acp::AuthMethodId::new("login");
2171        let method = acp::AuthMethod::Terminal(
2172            acp::AuthMethodTerminal::new(method_id, "Login")
2173                .args(vec!["/auth".into()])
2174                .env(std::collections::HashMap::from_iter([(
2175                    "AUTH_MODE".into(),
2176                    "first-class".into(),
2177                )]))
2178                .meta(acp::Meta::from_iter([(
2179                    "terminal-auth".to_string(),
2180                    serde_json::json!({
2181                        "label": "legacy /auth",
2182                        "command": "legacy-agent",
2183                        "args": ["legacy-auth"],
2184                        "env": {
2185                            "AUTH_MODE": "legacy",
2186                        },
2187                    }),
2188                )])),
2189        );
2190
2191        let command = AgentServerCommand {
2192            path: "/path/to/agent".into(),
2193            args: vec!["--acp".into(), "/auth".into()],
2194            env: Some(HashMap::from_iter([
2195                ("BASE".into(), "1".into()),
2196                ("AUTH_MODE".into(), "first-class".into()),
2197            ])),
2198        };
2199
2200        let task = match &method {
2201            acp::AuthMethod::Terminal(terminal) => {
2202                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
2203            }
2204            _ => unreachable!(),
2205        };
2206
2207        assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
2208        assert_eq!(task.args, vec!["--acp", "/auth"]);
2209        assert_eq!(
2210            task.env,
2211            HashMap::from_iter([
2212                ("BASE".into(), "1".into()),
2213                ("AUTH_MODE".into(), "first-class".into()),
2214            ])
2215        );
2216        assert_eq!(task.label, "Login");
2217    }
2218
2219    async fn connect_fake_agent(
2220        cx: &mut gpui::TestAppContext,
2221    ) -> (
2222        Rc<AcpConnection>,
2223        Entity<project::Project>,
2224        Arc<AtomicUsize>,
2225        Arc<AtomicUsize>,
2226        Arc<std::sync::Mutex<Vec<acp::SessionUpdate>>>,
2227        Arc<std::sync::Mutex<Option<smol::channel::Receiver<()>>>>,
2228        Task<anyhow::Result<()>>,
2229    ) {
2230        cx.update(|cx| {
2231            let store = settings::SettingsStore::test(cx);
2232            cx.set_global(store);
2233        });
2234
2235        let fs = fs::FakeFs::new(cx.executor());
2236        fs.insert_tree("/", serde_json::json!({ "a": {} })).await;
2237        let project = project::Project::test(fs, [std::path::Path::new("/a")], cx).await;
2238
2239        let load_count = Arc::new(AtomicUsize::new(0));
2240        let close_count = Arc::new(AtomicUsize::new(0));
2241        let load_session_updates: Arc<std::sync::Mutex<Vec<acp::SessionUpdate>>> =
2242            Arc::new(std::sync::Mutex::new(Vec::new()));
2243        let load_session_gate: Arc<std::sync::Mutex<Option<smol::channel::Receiver<()>>>> =
2244            Arc::new(std::sync::Mutex::new(None));
2245
2246        let (client_transport, agent_transport) = agent_client_protocol::Channel::duplex();
2247
2248        let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
2249            Rc::new(RefCell::new(HashMap::default()));
2250        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
2251            Rc::new(RefCell::new(None));
2252
2253        // Build the fake agent side. It handles the requests issued by
2254        // `AcpConnection` during the test and tracks load/close counts.
2255        let agent_future = Agent
2256            .builder()
2257            .name("fake-agent")
2258            .on_receive_request(
2259                async move |req: acp::InitializeRequest, responder, _cx| {
2260                    responder.respond(
2261                        acp::InitializeResponse::new(req.protocol_version).agent_capabilities(
2262                            acp::AgentCapabilities::default()
2263                                .load_session(true)
2264                                .session_capabilities(
2265                                    acp::SessionCapabilities::default()
2266                                        .close(acp::SessionCloseCapabilities::new()),
2267                                ),
2268                        ),
2269                    )
2270                },
2271                agent_client_protocol::on_receive_request!(),
2272            )
2273            .on_receive_request(
2274                async move |_req: acp::AuthenticateRequest, responder, _cx| {
2275                    responder.respond(Default::default())
2276                },
2277                agent_client_protocol::on_receive_request!(),
2278            )
2279            .on_receive_request(
2280                async move |_req: acp::NewSessionRequest, responder, _cx| {
2281                    responder.respond(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
2282                },
2283                agent_client_protocol::on_receive_request!(),
2284            )
2285            .on_receive_request(
2286                async move |_req: acp::PromptRequest, responder, _cx| {
2287                    responder.respond(acp::PromptResponse::new(acp::StopReason::EndTurn))
2288                },
2289                agent_client_protocol::on_receive_request!(),
2290            )
2291            .on_receive_request(
2292                {
2293                    let load_count = load_count.clone();
2294                    let load_session_updates = load_session_updates.clone();
2295                    let load_session_gate = load_session_gate.clone();
2296                    async move |req: acp::LoadSessionRequest, responder, cx| {
2297                        load_count.fetch_add(1, Ordering::SeqCst);
2298
2299                        // Simulate spec-compliant history replay: send
2300                        // notifications to the client before responding to the
2301                        // load request.
2302                        let updates = std::mem::take(
2303                            &mut *load_session_updates
2304                                .lock()
2305                                .expect("load_session_updates mutex poisoned"),
2306                        );
2307                        for update in updates {
2308                            cx.send_notification(acp::SessionNotification::new(
2309                                req.session_id.clone(),
2310                                update,
2311                            ))?;
2312                        }
2313
2314                        // If a gate was installed, park on it before responding
2315                        // so tests can interleave other work (e.g.
2316                        // `close_session`) with an in-flight load.
2317                        let gate = load_session_gate
2318                            .lock()
2319                            .expect("load_session_gate mutex poisoned")
2320                            .take();
2321                        if let Some(gate) = gate {
2322                            gate.recv().await.ok();
2323                        }
2324
2325                        responder.respond(acp::LoadSessionResponse::new())
2326                    }
2327                },
2328                agent_client_protocol::on_receive_request!(),
2329            )
2330            .on_receive_request(
2331                {
2332                    let close_count = close_count.clone();
2333                    async move |_req: acp::CloseSessionRequest, responder, _cx| {
2334                        close_count.fetch_add(1, Ordering::SeqCst);
2335                        responder.respond(acp::CloseSessionResponse::new())
2336                    }
2337                },
2338                agent_client_protocol::on_receive_request!(),
2339            )
2340            .on_receive_notification(
2341                async move |_notif: acp::CancelNotification, _cx| Ok(()),
2342                agent_client_protocol::on_receive_notification!(),
2343            )
2344            .connect_to(agent_transport);
2345
2346        let agent_io_task = cx.background_spawn(agent_future);
2347
2348        // Wire the production handler set into the fake client so inbound
2349        // requests/notifications from the fake agent reach the same
2350        // dispatcher that the real `stdio` path uses.
2351        let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
2352
2353        let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
2354        let client_future = connect_client_future(
2355            "zed-test",
2356            client_transport,
2357            dispatch_tx.clone(),
2358            connection_tx,
2359        );
2360        let client_io_task = cx.background_spawn(async move {
2361            client_future.await.ok();
2362        });
2363
2364        let client_conn: ConnectionTo<Agent> = connection_rx
2365            .await
2366            .expect("failed to receive ACP connection handle");
2367
2368        let response = into_foreground_future(
2369            client_conn.send_request(acp::InitializeRequest::new(acp::ProtocolVersion::V1)),
2370        )
2371        .await
2372        .expect("failed to initialize ACP connection");
2373
2374        let agent_capabilities = response.agent_capabilities;
2375
2376        let dispatch_context = ClientContext {
2377            sessions: sessions.clone(),
2378            session_list: client_session_list.clone(),
2379        };
2380        // `TestAppContext::spawn` hands out an `AsyncApp` by value, whereas the
2381        // production path uses `Context::spawn` which hands out `&mut AsyncApp`.
2382        // Bind the value-form to a local and take `&mut` of it to reuse the
2383        // same dispatch loop shape.
2384        let dispatch_task = cx.spawn({
2385            let mut dispatch_rx = dispatch_rx;
2386            move |cx| async move {
2387                let mut cx = cx;
2388                while let Some(work) = dispatch_rx.next().await {
2389                    work.run(&mut cx, &dispatch_context);
2390                }
2391            }
2392        });
2393
2394        let agent_server_store =
2395            project.read_with(cx, |project, _| project.agent_server_store().downgrade());
2396
2397        let connection = cx.update(|cx| {
2398            AcpConnection::new_for_test(
2399                client_conn,
2400                sessions,
2401                agent_capabilities,
2402                agent_server_store,
2403                client_io_task,
2404                dispatch_task,
2405                cx,
2406            )
2407        });
2408
2409        let keep_agent_alive = cx.background_spawn(async move {
2410            agent_io_task.await.ok();
2411            anyhow::Ok(())
2412        });
2413
2414        (
2415            Rc::new(connection),
2416            project,
2417            load_count,
2418            close_count,
2419            load_session_updates,
2420            load_session_gate,
2421            keep_agent_alive,
2422        )
2423    }
2424
2425    #[gpui::test]
2426    async fn test_loaded_sessions_keep_state_until_last_close(cx: &mut gpui::TestAppContext) {
2427        let (
2428            connection,
2429            project,
2430            load_count,
2431            close_count,
2432            _load_session_updates,
2433            _load_session_gate,
2434            _keep_agent_alive,
2435        ) = connect_fake_agent(cx).await;
2436
2437        let session_id = acp::SessionId::new("session-1");
2438        let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2439
2440        // Load the same session twice concurrently — the second call should join
2441        // the pending task rather than issuing a second ACP load_session RPC.
2442        let first_load = cx.update(|cx| {
2443            connection.clone().load_session(
2444                session_id.clone(),
2445                project.clone(),
2446                work_dirs.clone(),
2447                None,
2448                cx,
2449            )
2450        });
2451        let second_load = cx.update(|cx| {
2452            connection.clone().load_session(
2453                session_id.clone(),
2454                project.clone(),
2455                work_dirs.clone(),
2456                None,
2457                cx,
2458            )
2459        });
2460
2461        let first_thread = first_load.await.expect("first load failed");
2462        let second_thread = second_load.await.expect("second load failed");
2463        cx.run_until_parked();
2464
2465        assert_eq!(
2466            first_thread.entity_id(),
2467            second_thread.entity_id(),
2468            "concurrent loads for the same session should share one AcpThread"
2469        );
2470        assert_eq!(
2471            load_count.load(Ordering::SeqCst),
2472            1,
2473            "underlying ACP load_session should be called exactly once for concurrent loads"
2474        );
2475
2476        // The session has ref_count 2. The first close should not send the ACP
2477        // close_session RPC — the session is still referenced.
2478        cx.update(|cx| connection.clone().close_session(&session_id, cx))
2479            .await
2480            .expect("first close failed");
2481
2482        assert_eq!(
2483            close_count.load(Ordering::SeqCst),
2484            0,
2485            "ACP close_session should not be sent while ref_count > 0"
2486        );
2487        assert!(
2488            connection.sessions.borrow().contains_key(&session_id),
2489            "session should still be tracked after first close"
2490        );
2491
2492        // The second close drops ref_count to 0 — now the ACP RPC must be sent.
2493        cx.update(|cx| connection.clone().close_session(&session_id, cx))
2494            .await
2495            .expect("second close failed");
2496        cx.run_until_parked();
2497
2498        assert_eq!(
2499            close_count.load(Ordering::SeqCst),
2500            1,
2501            "ACP close_session should be sent exactly once when ref_count reaches 0"
2502        );
2503        assert!(
2504            !connection.sessions.borrow().contains_key(&session_id),
2505            "session should be removed after final close"
2506        );
2507    }
2508
2509    // Regression test: per the ACP spec, an agent replays the entire conversation
2510    // history as `session/update` notifications *before* responding to the
2511    // `session/load` request. These notifications must be applied to the
2512    // reconstructed thread, not dropped because the session hasn't been
2513    // registered yet.
2514    #[gpui::test]
2515    async fn test_load_session_replays_notifications_sent_before_response(
2516        cx: &mut gpui::TestAppContext,
2517    ) {
2518        let (
2519            connection,
2520            project,
2521            _load_count,
2522            _close_count,
2523            load_session_updates,
2524            _load_session_gate,
2525            _keep_agent_alive,
2526        ) = connect_fake_agent(cx).await;
2527
2528        // Queue up some history updates that the fake agent will stream to
2529        // the client during the `load_session` call, before responding.
2530        *load_session_updates
2531            .lock()
2532            .expect("load_session_updates mutex poisoned") = vec![
2533            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(acp::ContentBlock::Text(
2534                acp::TextContent::new(String::from("hello agent")),
2535            ))),
2536            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(acp::ContentBlock::Text(
2537                acp::TextContent::new(String::from("hi user")),
2538            ))),
2539        ];
2540
2541        let session_id = acp::SessionId::new("session-replay");
2542        let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2543
2544        let thread = cx
2545            .update(|cx| {
2546                connection.clone().load_session(
2547                    session_id.clone(),
2548                    project.clone(),
2549                    work_dirs,
2550                    None,
2551                    cx,
2552                )
2553            })
2554            .await
2555            .expect("load_session failed");
2556        cx.run_until_parked();
2557
2558        let entries = thread.read_with(cx, |thread, _| {
2559            thread
2560                .entries()
2561                .iter()
2562                .map(|entry| match entry {
2563                    acp_thread::AgentThreadEntry::UserMessage(_) => "user",
2564                    acp_thread::AgentThreadEntry::AssistantMessage(_) => "assistant",
2565                    acp_thread::AgentThreadEntry::ToolCall(_) => "tool_call",
2566                    acp_thread::AgentThreadEntry::CompletedPlan(_) => "plan",
2567                })
2568                .collect::<Vec<_>>()
2569        });
2570
2571        assert_eq!(
2572            entries,
2573            vec!["user", "assistant"],
2574            "replayed notifications should be applied to the thread"
2575        );
2576    }
2577
2578    // Regression test: if `close_session` is issued while a `load_session`
2579    // RPC is still in flight, the close must take effect cleanly — the load
2580    // must fail with a recognizable error (not return an orphaned thread),
2581    // no entry must remain in `sessions` or `pending_sessions`, and the ACP
2582    // `close_session` RPC must be dispatched.
2583    #[gpui::test]
2584    async fn test_close_session_during_in_flight_load(cx: &mut gpui::TestAppContext) {
2585        let (
2586            connection,
2587            project,
2588            load_count,
2589            close_count,
2590            _load_session_updates,
2591            load_session_gate,
2592            _keep_agent_alive,
2593        ) = connect_fake_agent(cx).await;
2594
2595        // Install a gate so the fake agent's `load_session` handler parks
2596        // before sending its response. We'll close the session while the
2597        // load is parked.
2598        let (gate_tx, gate_rx) = smol::channel::bounded::<()>(1);
2599        *load_session_gate
2600            .lock()
2601            .expect("load_session_gate mutex poisoned") = Some(gate_rx);
2602
2603        let session_id = acp::SessionId::new("session-close-during-load");
2604        let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2605
2606        let load_task = cx.update(|cx| {
2607            connection.clone().load_session(
2608                session_id.clone(),
2609                project.clone(),
2610                work_dirs,
2611                None,
2612                cx,
2613            )
2614        });
2615
2616        // Let the load RPC reach the agent and park on the gate.
2617        cx.run_until_parked();
2618        assert_eq!(
2619            load_count.load(Ordering::SeqCst),
2620            1,
2621            "load_session RPC should have been dispatched"
2622        );
2623        assert!(
2624            connection
2625                .pending_sessions
2626                .borrow()
2627                .contains_key(&session_id),
2628            "pending_sessions entry should exist while load is in flight"
2629        );
2630        assert!(
2631            connection.sessions.borrow().contains_key(&session_id),
2632            "sessions entry should be pre-registered to receive replay notifications"
2633        );
2634
2635        // Close the session while the load is still parked. This should take
2636        // the pending path and dispatch the ACP close RPC.
2637        let close_task = cx.update(|cx| connection.clone().close_session(&session_id, cx));
2638
2639        // Release the gate so the load RPC can finally respond.
2640        gate_tx.send(()).await.expect("gate send failed");
2641        drop(gate_tx);
2642
2643        let load_result = load_task.await;
2644        close_task.await.expect("close failed");
2645        cx.run_until_parked();
2646
2647        let err = load_result.expect_err("load should fail after close-during-load");
2648        assert!(
2649            err.to_string()
2650                .contains("session was closed before load completed"),
2651            "expected close-during-load error, got: {err}"
2652        );
2653
2654        assert_eq!(
2655            close_count.load(Ordering::SeqCst),
2656            1,
2657            "ACP close_session should be sent exactly once"
2658        );
2659        assert!(
2660            !connection.sessions.borrow().contains_key(&session_id),
2661            "sessions entry should be removed after close-during-load"
2662        );
2663        assert!(
2664            !connection
2665                .pending_sessions
2666                .borrow()
2667                .contains_key(&session_id),
2668            "pending_sessions entry should be removed after close-during-load"
2669        );
2670    }
2671
2672    // Regression test: when two concurrent `load_session` calls share a pending
2673    // task and one of them issues `close_session` before the load RPC
2674    // resolves, the remaining load must still succeed and the session must
2675    // stay live. If `close_session` incorrectly short-circuits via the
2676    // `sessions` path (removing the entry while a load is still in flight),
2677    // the pending task will fail and both concurrent loaders will lose
2678    // their handle.
2679    #[gpui::test]
2680    async fn test_close_during_load_preserves_other_concurrent_loader(
2681        cx: &mut gpui::TestAppContext,
2682    ) {
2683        let (
2684            connection,
2685            project,
2686            load_count,
2687            close_count,
2688            _load_session_updates,
2689            load_session_gate,
2690            _keep_agent_alive,
2691        ) = connect_fake_agent(cx).await;
2692
2693        let (gate_tx, gate_rx) = smol::channel::bounded::<()>(1);
2694        *load_session_gate
2695            .lock()
2696            .expect("load_session_gate mutex poisoned") = Some(gate_rx);
2697
2698        let session_id = acp::SessionId::new("session-concurrent-close");
2699        let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2700
2701        // Kick off two concurrent loads; the second must join the first's pending
2702        // task rather than issuing a second RPC.
2703        let first_load = cx.update(|cx| {
2704            connection.clone().load_session(
2705                session_id.clone(),
2706                project.clone(),
2707                work_dirs.clone(),
2708                None,
2709                cx,
2710            )
2711        });
2712        let second_load = cx.update(|cx| {
2713            connection.clone().load_session(
2714                session_id.clone(),
2715                project.clone(),
2716                work_dirs.clone(),
2717                None,
2718                cx,
2719            )
2720        });
2721
2722        cx.run_until_parked();
2723        assert_eq!(
2724            load_count.load(Ordering::SeqCst),
2725            1,
2726            "load_session RPC should only be dispatched once for concurrent loads"
2727        );
2728
2729        // Close one of the two handles while the shared load is still parked.
2730        // Because a second loader still holds a pending ref, this should be a
2731        // no-op on the wire.
2732        cx.update(|cx| connection.clone().close_session(&session_id, cx))
2733            .await
2734            .expect("close during load failed");
2735        assert_eq!(
2736            close_count.load(Ordering::SeqCst),
2737            0,
2738            "close_session RPC must not be dispatched while another load handle remains"
2739        );
2740
2741        // Release the gate so the load RPC can finally respond.
2742        gate_tx.send(()).await.expect("gate send failed");
2743        drop(gate_tx);
2744
2745        let first_thread = first_load.await.expect("first load should still succeed");
2746        let second_thread = second_load.await.expect("second load should still succeed");
2747        cx.run_until_parked();
2748
2749        assert_eq!(
2750            first_thread.entity_id(),
2751            second_thread.entity_id(),
2752            "concurrent loads should share one AcpThread"
2753        );
2754        assert!(
2755            connection.sessions.borrow().contains_key(&session_id),
2756            "session must remain tracked while a load handle is still outstanding"
2757        );
2758        assert!(
2759            !connection
2760                .pending_sessions
2761                .borrow()
2762                .contains_key(&session_id),
2763            "pending_sessions entry should be cleared once the load resolves"
2764        );
2765
2766        // Final close drops ref_count to 0 and dispatches the ACP close RPC.
2767        cx.update(|cx| connection.clone().close_session(&session_id, cx))
2768            .await
2769            .expect("final close failed");
2770        cx.run_until_parked();
2771        assert_eq!(
2772            close_count.load(Ordering::SeqCst),
2773            1,
2774            "close_session RPC should fire exactly once when the last handle is released"
2775        );
2776        assert!(
2777            !connection.sessions.borrow().contains_key(&session_id),
2778            "session should be removed after final close"
2779        );
2780    }
2781}
2782
2783fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
2784    let context_server_store = project.read(cx).context_server_store().read(cx);
2785    let is_local = project.read(cx).is_local();
2786    context_server_store
2787        .configured_server_ids()
2788        .iter()
2789        .filter_map(|id| {
2790            let configuration = context_server_store.configuration_for_server(id)?;
2791            match &*configuration {
2792                project::context_server_store::ContextServerConfiguration::Custom {
2793                    command,
2794                    remote,
2795                    ..
2796                }
2797                | project::context_server_store::ContextServerConfiguration::Extension {
2798                    command,
2799                    remote,
2800                    ..
2801                } if is_local || *remote => Some(acp::McpServer::Stdio(
2802                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
2803                        .args(command.args.clone())
2804                        .env(if let Some(env) = command.env.as_ref() {
2805                            env.iter()
2806                                .map(|(name, value)| acp::EnvVariable::new(name, value))
2807                                .collect()
2808                        } else {
2809                            vec![]
2810                        }),
2811                )),
2812                project::context_server_store::ContextServerConfiguration::Http {
2813                    url,
2814                    headers,
2815                    timeout: _,
2816                } => Some(acp::McpServer::Http(
2817                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
2818                        headers
2819                            .iter()
2820                            .map(|(name, value)| acp::HttpHeader::new(name, value))
2821                            .collect(),
2822                    ),
2823                )),
2824                _ => None,
2825            }
2826        })
2827        .collect()
2828}
2829
2830fn config_state(
2831    modes: Option<acp::SessionModeState>,
2832    models: Option<acp::SessionModelState>,
2833    config_options: Option<Vec<acp::SessionConfigOption>>,
2834) -> (
2835    Option<Rc<RefCell<acp::SessionModeState>>>,
2836    Option<Rc<RefCell<acp::SessionModelState>>>,
2837    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
2838) {
2839    if let Some(opts) = config_options {
2840        return (None, None, Some(Rc::new(RefCell::new(opts))));
2841    }
2842
2843    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
2844    let models = models.map(|models| Rc::new(RefCell::new(models)));
2845    (modes, models, None)
2846}
2847
2848struct AcpSessionModes {
2849    session_id: acp::SessionId,
2850    connection: ConnectionTo<Agent>,
2851    state: Rc<RefCell<acp::SessionModeState>>,
2852}
2853
2854impl acp_thread::AgentSessionModes for AcpSessionModes {
2855    fn current_mode(&self) -> acp::SessionModeId {
2856        self.state.borrow().current_mode_id.clone()
2857    }
2858
2859    fn all_modes(&self) -> Vec<acp::SessionMode> {
2860        self.state.borrow().available_modes.clone()
2861    }
2862
2863    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
2864        let connection = self.connection.clone();
2865        let session_id = self.session_id.clone();
2866        let old_mode_id;
2867        {
2868            let mut state = self.state.borrow_mut();
2869            old_mode_id = state.current_mode_id.clone();
2870            state.current_mode_id = mode_id.clone();
2871        };
2872        let state = self.state.clone();
2873        cx.foreground_executor().spawn(async move {
2874            let result = into_foreground_future(
2875                connection.send_request(acp::SetSessionModeRequest::new(session_id, mode_id)),
2876            )
2877            .await;
2878
2879            if result.is_err() {
2880                state.borrow_mut().current_mode_id = old_mode_id;
2881            }
2882
2883            result?;
2884
2885            Ok(())
2886        })
2887    }
2888}
2889
2890struct AcpModelSelector {
2891    session_id: acp::SessionId,
2892    connection: ConnectionTo<Agent>,
2893    state: Rc<RefCell<acp::SessionModelState>>,
2894}
2895
2896impl AcpModelSelector {
2897    fn new(
2898        session_id: acp::SessionId,
2899        connection: ConnectionTo<Agent>,
2900        state: Rc<RefCell<acp::SessionModelState>>,
2901    ) -> Self {
2902        Self {
2903            session_id,
2904            connection,
2905            state,
2906        }
2907    }
2908}
2909
2910impl acp_thread::AgentModelSelector for AcpModelSelector {
2911    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
2912        Task::ready(Ok(acp_thread::AgentModelList::Flat(
2913            self.state
2914                .borrow()
2915                .available_models
2916                .clone()
2917                .into_iter()
2918                .map(acp_thread::AgentModelInfo::from)
2919                .collect(),
2920        )))
2921    }
2922
2923    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
2924        let connection = self.connection.clone();
2925        let session_id = self.session_id.clone();
2926        let old_model_id;
2927        {
2928            let mut state = self.state.borrow_mut();
2929            old_model_id = state.current_model_id.clone();
2930            state.current_model_id = model_id.clone();
2931        };
2932        let state = self.state.clone();
2933        cx.foreground_executor().spawn(async move {
2934            let result = into_foreground_future(
2935                connection.send_request(acp::SetSessionModelRequest::new(session_id, model_id)),
2936            )
2937            .await;
2938
2939            if result.is_err() {
2940                state.borrow_mut().current_model_id = old_model_id;
2941            }
2942
2943            result?;
2944
2945            Ok(())
2946        })
2947    }
2948
2949    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
2950        let state = self.state.borrow();
2951        Task::ready(
2952            state
2953                .available_models
2954                .iter()
2955                .find(|m| m.model_id == state.current_model_id)
2956                .cloned()
2957                .map(acp_thread::AgentModelInfo::from)
2958                .ok_or_else(|| anyhow::anyhow!("Model not found")),
2959        )
2960    }
2961}
2962
2963struct AcpSessionConfigOptions {
2964    session_id: acp::SessionId,
2965    connection: ConnectionTo<Agent>,
2966    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
2967    watch_tx: Rc<RefCell<watch::Sender<()>>>,
2968    watch_rx: watch::Receiver<()>,
2969}
2970
2971impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
2972    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
2973        self.state.borrow().clone()
2974    }
2975
2976    fn set_config_option(
2977        &self,
2978        config_id: acp::SessionConfigId,
2979        value: acp::SessionConfigValueId,
2980        cx: &mut App,
2981    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
2982        let connection = self.connection.clone();
2983        let session_id = self.session_id.clone();
2984        let state = self.state.clone();
2985
2986        let watch_tx = self.watch_tx.clone();
2987
2988        cx.foreground_executor().spawn(async move {
2989            let response = into_foreground_future(connection.send_request(
2990                acp::SetSessionConfigOptionRequest::new(session_id, config_id, value),
2991            ))
2992            .await?;
2993
2994            *state.borrow_mut() = response.config_options.clone();
2995            watch_tx.borrow_mut().send(()).ok();
2996            Ok(response.config_options)
2997        })
2998    }
2999
3000    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
3001        Some(self.watch_rx.clone())
3002    }
3003}
3004
3005// ---------------------------------------------------------------------------
3006// Handler functions dispatched from background handler closures to the
3007// foreground thread via the ForegroundWork channel.
3008// ---------------------------------------------------------------------------
3009
3010fn session_thread(
3011    ctx: &ClientContext,
3012    session_id: &acp::SessionId,
3013) -> Result<WeakEntity<AcpThread>, acp::Error> {
3014    let sessions = ctx.sessions.borrow();
3015    sessions
3016        .get(session_id)
3017        .map(|session| session.thread.clone())
3018        .ok_or_else(|| acp::Error::internal_error().data(format!("unknown session: {session_id}")))
3019}
3020
3021fn respond_err<T: JsonRpcResponse>(responder: Responder<T>, err: acp::Error) {
3022    // Log the actual error we're returning — otherwise agents that hit an
3023    // error path (e.g. unknown session) would see only the generic internal
3024    // error returned over the wire with no trace of why on the client side.
3025    log::warn!(
3026        "Responding to ACP request `{method}` with error: {err:?}",
3027        method = responder.method()
3028    );
3029    responder.respond_with_error(err).log_err();
3030}
3031
3032fn handle_request_permission(
3033    args: acp::RequestPermissionRequest,
3034    responder: Responder<acp::RequestPermissionResponse>,
3035    cx: &mut AsyncApp,
3036    ctx: &ClientContext,
3037) {
3038    let thread = match session_thread(ctx, &args.session_id) {
3039        Ok(t) => t,
3040        Err(e) => return respond_err(responder, e),
3041    };
3042
3043    cx.spawn(async move |cx| {
3044        let result: Result<_, acp::Error> = async {
3045            let task = thread
3046                .update(cx, |thread, cx| {
3047                    thread.request_tool_call_authorization(
3048                        args.tool_call,
3049                        acp_thread::PermissionOptions::Flat(args.options),
3050                        cx,
3051                    )
3052                })
3053                .flatten_acp()?;
3054            Ok(task.await)
3055        }
3056        .await;
3057
3058        match result {
3059            Ok(outcome) => {
3060                responder
3061                    .respond(acp::RequestPermissionResponse::new(outcome.into()))
3062                    .log_err();
3063            }
3064            Err(e) => respond_err(responder, e),
3065        }
3066    })
3067    .detach();
3068}
3069
3070fn handle_write_text_file(
3071    args: acp::WriteTextFileRequest,
3072    responder: Responder<acp::WriteTextFileResponse>,
3073    cx: &mut AsyncApp,
3074    ctx: &ClientContext,
3075) {
3076    let thread = match session_thread(ctx, &args.session_id) {
3077        Ok(t) => t,
3078        Err(e) => return respond_err(responder, e),
3079    };
3080
3081    cx.spawn(async move |cx| {
3082        let result: Result<_, acp::Error> = async {
3083            thread
3084                .update(cx, |thread, cx| {
3085                    thread.write_text_file(args.path, args.content, cx)
3086                })
3087                .map_err(acp::Error::from)?
3088                .await?;
3089            Ok(())
3090        }
3091        .await;
3092
3093        match result {
3094            Ok(()) => {
3095                responder
3096                    .respond(acp::WriteTextFileResponse::default())
3097                    .log_err();
3098            }
3099            Err(e) => respond_err(responder, e),
3100        }
3101    })
3102    .detach();
3103}
3104
3105fn handle_read_text_file(
3106    args: acp::ReadTextFileRequest,
3107    responder: Responder<acp::ReadTextFileResponse>,
3108    cx: &mut AsyncApp,
3109    ctx: &ClientContext,
3110) {
3111    let thread = match session_thread(ctx, &args.session_id) {
3112        Ok(t) => t,
3113        Err(e) => return respond_err(responder, e),
3114    };
3115
3116    cx.spawn(async move |cx| {
3117        let result: Result<_, acp::Error> = async {
3118            thread
3119                .update(cx, |thread, cx| {
3120                    thread.read_text_file(args.path, args.line, args.limit, false, cx)
3121                })
3122                .map_err(acp::Error::from)?
3123                .await
3124        }
3125        .await;
3126
3127        match result {
3128            Ok(content) => {
3129                responder
3130                    .respond(acp::ReadTextFileResponse::new(content))
3131                    .log_err();
3132            }
3133            Err(e) => respond_err(responder, e),
3134        }
3135    })
3136    .detach();
3137}
3138
3139fn handle_session_notification(
3140    notification: acp::SessionNotification,
3141    cx: &mut AsyncApp,
3142    ctx: &ClientContext,
3143) {
3144    // Extract everything we need from the session while briefly borrowing.
3145    let (thread, session_modes, config_opts_data) = {
3146        let sessions = ctx.sessions.borrow();
3147        let Some(session) = sessions.get(&notification.session_id) else {
3148            log::warn!(
3149                "Received session notification for unknown session: {:?}",
3150                notification.session_id
3151            );
3152            return;
3153        };
3154        (
3155            session.thread.clone(),
3156            session.session_modes.clone(),
3157            session
3158                .config_options
3159                .as_ref()
3160                .map(|opts| (opts.config_options.clone(), opts.tx.clone())),
3161        )
3162    };
3163    // Borrow is dropped here.
3164
3165    // Apply mode/config/session_list updates without holding the borrow.
3166    if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
3167        current_mode_id, ..
3168    }) = &notification.update
3169    {
3170        if let Some(session_modes) = &session_modes {
3171            session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
3172        }
3173    }
3174
3175    if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
3176        config_options, ..
3177    }) = &notification.update
3178    {
3179        if let Some((config_opts_cell, tx_cell)) = &config_opts_data {
3180            *config_opts_cell.borrow_mut() = config_options.clone();
3181            tx_cell.borrow_mut().send(()).ok();
3182        }
3183    }
3184
3185    if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
3186        && let Some(session_list) = ctx.session_list.borrow().as_ref()
3187    {
3188        session_list.send_info_update(notification.session_id.clone(), info_update.clone());
3189    }
3190
3191    // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
3192    if let acp::SessionUpdate::ToolCall(tc) = &notification.update {
3193        if let Some(meta) = &tc.meta {
3194            if let Some(terminal_info) = meta.get("terminal_info") {
3195                if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) {
3196                    let terminal_id = acp::TerminalId::new(id_str);
3197                    let cwd = terminal_info
3198                        .get("cwd")
3199                        .and_then(|v| v.as_str().map(PathBuf::from));
3200
3201                    thread
3202                        .update(cx, |thread, cx| {
3203                            let builder = TerminalBuilder::new_display_only(
3204                                CursorShape::default(),
3205                                AlternateScroll::On,
3206                                None,
3207                                0,
3208                                cx.background_executor(),
3209                                thread.project().read(cx).path_style(cx),
3210                            )?;
3211                            let lower = cx.new(|cx| builder.subscribe(cx));
3212                            thread.on_terminal_provider_event(
3213                                TerminalProviderEvent::Created {
3214                                    terminal_id,
3215                                    label: tc.title.clone(),
3216                                    cwd,
3217                                    output_byte_limit: None,
3218                                    terminal: lower,
3219                                },
3220                                cx,
3221                            );
3222                            anyhow::Ok(())
3223                        })
3224                        .log_err();
3225                }
3226            }
3227        }
3228    }
3229
3230    // Forward the update to the acp_thread as usual.
3231    if let Err(err) = thread
3232        .update(cx, |thread, cx| {
3233            thread.handle_session_update(notification.update.clone(), cx)
3234        })
3235        .flatten_acp()
3236    {
3237        log::error!(
3238            "Failed to handle session update for {:?}: {err:?}",
3239            notification.session_id
3240        );
3241    }
3242
3243    // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
3244    if let acp::SessionUpdate::ToolCallUpdate(tcu) = &notification.update {
3245        if let Some(meta) = &tcu.meta {
3246            if let Some(term_out) = meta.get("terminal_output") {
3247                if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
3248                    let terminal_id = acp::TerminalId::new(id_str);
3249                    if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
3250                        let data = s.as_bytes().to_vec();
3251                        thread
3252                            .update(cx, |thread, cx| {
3253                                thread.on_terminal_provider_event(
3254                                    TerminalProviderEvent::Output { terminal_id, data },
3255                                    cx,
3256                                );
3257                            })
3258                            .log_err();
3259                    }
3260                }
3261            }
3262
3263            if let Some(term_exit) = meta.get("terminal_exit") {
3264                if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
3265                    let terminal_id = acp::TerminalId::new(id_str);
3266                    let status = acp::TerminalExitStatus::new()
3267                        .exit_code(
3268                            term_exit
3269                                .get("exit_code")
3270                                .and_then(|v| v.as_u64())
3271                                .map(|i| i as u32),
3272                        )
3273                        .signal(
3274                            term_exit
3275                                .get("signal")
3276                                .and_then(|v| v.as_str().map(|s| s.to_string())),
3277                        );
3278
3279                    thread
3280                        .update(cx, |thread, cx| {
3281                            thread.on_terminal_provider_event(
3282                                TerminalProviderEvent::Exit {
3283                                    terminal_id,
3284                                    status,
3285                                },
3286                                cx,
3287                            );
3288                        })
3289                        .log_err();
3290                }
3291            }
3292        }
3293    }
3294}
3295
3296fn handle_create_terminal(
3297    args: acp::CreateTerminalRequest,
3298    responder: Responder<acp::CreateTerminalResponse>,
3299    cx: &mut AsyncApp,
3300    ctx: &ClientContext,
3301) {
3302    let thread = match session_thread(ctx, &args.session_id) {
3303        Ok(t) => t,
3304        Err(e) => return respond_err(responder, e),
3305    };
3306    let project = match thread
3307        .read_with(cx, |thread, _cx| thread.project().clone())
3308        .map_err(acp::Error::from)
3309    {
3310        Ok(p) => p,
3311        Err(e) => return respond_err(responder, e),
3312    };
3313
3314    cx.spawn(async move |cx| {
3315        let result: Result<_, acp::Error> = async {
3316            let terminal_entity = acp_thread::create_terminal_entity(
3317                args.command.clone(),
3318                &args.args,
3319                args.env
3320                    .into_iter()
3321                    .map(|env| (env.name, env.value))
3322                    .collect(),
3323                args.cwd.clone(),
3324                &project,
3325                cx,
3326            )
3327            .await?;
3328
3329            let terminal_entity = thread.update(cx, |thread, cx| {
3330                thread.register_terminal_created(
3331                    acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
3332                    format!("{} {}", args.command, args.args.join(" ")),
3333                    args.cwd.clone(),
3334                    args.output_byte_limit,
3335                    terminal_entity,
3336                    cx,
3337                )
3338            })?;
3339            let terminal_id = terminal_entity.read_with(cx, |terminal, _| terminal.id().clone());
3340            Ok(terminal_id)
3341        }
3342        .await;
3343
3344        match result {
3345            Ok(terminal_id) => {
3346                responder
3347                    .respond(acp::CreateTerminalResponse::new(terminal_id))
3348                    .log_err();
3349            }
3350            Err(e) => respond_err(responder, e),
3351        }
3352    })
3353    .detach();
3354}
3355
3356fn handle_kill_terminal(
3357    args: acp::KillTerminalRequest,
3358    responder: Responder<acp::KillTerminalResponse>,
3359    cx: &mut AsyncApp,
3360    ctx: &ClientContext,
3361) {
3362    let thread = match session_thread(ctx, &args.session_id) {
3363        Ok(t) => t,
3364        Err(e) => return respond_err(responder, e),
3365    };
3366
3367    match thread
3368        .update(cx, |thread, cx| thread.kill_terminal(args.terminal_id, cx))
3369        .flatten_acp()
3370    {
3371        Ok(()) => {
3372            responder
3373                .respond(acp::KillTerminalResponse::default())
3374                .log_err();
3375        }
3376        Err(e) => respond_err(responder, e),
3377    }
3378}
3379
3380fn handle_release_terminal(
3381    args: acp::ReleaseTerminalRequest,
3382    responder: Responder<acp::ReleaseTerminalResponse>,
3383    cx: &mut AsyncApp,
3384    ctx: &ClientContext,
3385) {
3386    let thread = match session_thread(ctx, &args.session_id) {
3387        Ok(t) => t,
3388        Err(e) => return respond_err(responder, e),
3389    };
3390
3391    match thread
3392        .update(cx, |thread, cx| {
3393            thread.release_terminal(args.terminal_id, cx)
3394        })
3395        .flatten_acp()
3396    {
3397        Ok(()) => {
3398            responder
3399                .respond(acp::ReleaseTerminalResponse::default())
3400                .log_err();
3401        }
3402        Err(e) => respond_err(responder, e),
3403    }
3404}
3405
3406fn handle_terminal_output(
3407    args: acp::TerminalOutputRequest,
3408    responder: Responder<acp::TerminalOutputResponse>,
3409    cx: &mut AsyncApp,
3410    ctx: &ClientContext,
3411) {
3412    let thread = match session_thread(ctx, &args.session_id) {
3413        Ok(t) => t,
3414        Err(e) => return respond_err(responder, e),
3415    };
3416
3417    match thread
3418        .read_with(cx, |thread, cx| -> anyhow::Result<_> {
3419            let out = thread
3420                .terminal(args.terminal_id)?
3421                .read(cx)
3422                .current_output(cx);
3423            Ok(out)
3424        })
3425        .flatten_acp()
3426    {
3427        Ok(output) => {
3428            responder.respond(output).log_err();
3429        }
3430        Err(e) => respond_err(responder, e),
3431    }
3432}
3433
3434fn handle_wait_for_terminal_exit(
3435    args: acp::WaitForTerminalExitRequest,
3436    responder: Responder<acp::WaitForTerminalExitResponse>,
3437    cx: &mut AsyncApp,
3438    ctx: &ClientContext,
3439) {
3440    let thread = match session_thread(ctx, &args.session_id) {
3441        Ok(t) => t,
3442        Err(e) => return respond_err(responder, e),
3443    };
3444
3445    cx.spawn(async move |cx| {
3446        let result: Result<_, acp::Error> = async {
3447            let exit_status = thread
3448                .update(cx, |thread, cx| {
3449                    anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
3450                })
3451                .flatten_acp()?
3452                .await;
3453            Ok(exit_status)
3454        }
3455        .await;
3456
3457        match result {
3458            Ok(exit_status) => {
3459                responder
3460                    .respond(acp::WaitForTerminalExitResponse::new(exit_status))
3461                    .log_err();
3462            }
3463            Err(e) => respond_err(responder, e),
3464        }
3465    })
3466    .detach();
3467}