acp.rs

   1use acp_thread::{
   2    AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
   3    AgentSessionListResponse,
   4};
   5use acp_tools::{AcpConnectionRegistry, RawStreamLine, StreamMessageDirection};
   6use action_log::ActionLog;
   7use agent_client_protocol::schema::{self as acp, ErrorCode};
   8use agent_client_protocol::{
   9    Agent, Client, ConnectionTo, JsonRpcResponse, Lines, Responder, SentRequest,
  10};
  11use anyhow::anyhow;
  12use collections::HashMap;
  13use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  14use futures::channel::mpsc;
  15use futures::io::BufReader;
  16use futures::{AsyncBufReadExt as _, Future, StreamExt as _};
  17use project::agent_server_store::AgentServerCommand;
  18use project::{AgentId, Project};
  19use serde::Deserialize;
  20use settings::Settings as _;
  21use task::{ShellBuilder, SpawnInTerminal};
  22use util::ResultExt as _;
  23use util::path_list::PathList;
  24use util::process::Child;
  25
  26use std::path::PathBuf;
  27use std::process::Stdio;
  28use std::rc::Rc;
  29use std::sync::Arc;
  30use std::{any::Any, cell::RefCell};
  31use thiserror::Error;
  32
  33use anyhow::{Context as _, Result};
  34use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
  35
  36use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
  37use terminal::TerminalBuilder;
  38use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
  39
  40use crate::GEMINI_ID;
  41
  42pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
  43
  44/// Converts a [`SentRequest`] into a `Future` that can be safely awaited from
  45/// the GPUI foreground thread.
  46///
  47/// Unlike [`SentRequest::block_task`], which is only safe inside
  48/// [`ConnectionTo::spawn`] tasks, this uses [`SentRequest::on_receiving_result`]
  49/// to bridge the response through a oneshot channel. The SDK callback is trivial
  50/// (just a channel send), so it doesn't meaningfully block the dispatch loop.
  51fn into_foreground_future<T: JsonRpcResponse + Send + 'static>(
  52    sent: SentRequest<T>,
  53) -> impl Future<Output = Result<T, acp::Error>> {
  54    let (tx, rx) = futures::channel::oneshot::channel();
  55    let spawn_result = sent.on_receiving_result(async move |result| {
  56        tx.send(result).ok();
  57        Ok(())
  58    });
  59    async move {
  60        spawn_result?;
  61        rx.await.map_err(|_| {
  62            acp::Error::internal_error()
  63                .data("response channel cancelled — connection may have dropped")
  64        })?
  65    }
  66}
  67
  68#[derive(Debug, Error)]
  69#[error("Unsupported version")]
  70pub struct UnsupportedVersion;
  71
  72/// Holds state needed by foreground work dispatched from background handler closures.
  73struct ClientContext {
  74    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  75    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
  76}
  77
  78/// Work items sent from `Send` handler closures to the `!Send` foreground thread.
  79type ForegroundWork = Box<dyn FnOnce(&mut AsyncApp, &ClientContext) + Send>;
  80
  81pub struct AcpConnection {
  82    id: AgentId,
  83    telemetry_id: SharedString,
  84    connection: ConnectionTo<Agent>,
  85    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
  86    auth_methods: Vec<acp::AuthMethod>,
  87    command: AgentServerCommand,
  88    agent_capabilities: acp::AgentCapabilities,
  89    default_mode: Option<acp::SessionModeId>,
  90    default_model: Option<acp::ModelId>,
  91    default_config_options: HashMap<String, String>,
  92    child: Child,
  93    session_list: Option<Rc<AcpSessionList>>,
  94    _io_task: Task<()>,
  95    _dispatch_task: Task<()>,
  96    _wait_task: Task<Result<()>>,
  97    _stderr_task: Task<Result<()>>,
  98}
  99
 100struct ConfigOptions {
 101    config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 102    tx: Rc<RefCell<watch::Sender<()>>>,
 103    rx: watch::Receiver<()>,
 104}
 105
 106impl ConfigOptions {
 107    fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
 108        let (tx, rx) = watch::channel(());
 109        Self {
 110            config_options,
 111            tx: Rc::new(RefCell::new(tx)),
 112            rx,
 113        }
 114    }
 115}
 116
 117pub struct AcpSession {
 118    thread: WeakEntity<AcpThread>,
 119    suppress_abort_err: bool,
 120    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 121    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 122    config_options: Option<ConfigOptions>,
 123}
 124
 125pub struct AcpSessionList {
 126    connection: ConnectionTo<Agent>,
 127    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
 128    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
 129}
 130
 131impl AcpSessionList {
 132    fn new(connection: ConnectionTo<Agent>) -> Self {
 133        let (tx, rx) = smol::channel::unbounded();
 134        Self {
 135            connection,
 136            updates_tx: tx,
 137            updates_rx: rx,
 138        }
 139    }
 140
 141    fn notify_update(&self) {
 142        self.updates_tx
 143            .try_send(acp_thread::SessionListUpdate::Refresh)
 144            .log_err();
 145    }
 146
 147    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
 148        self.updates_tx
 149            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
 150            .log_err();
 151    }
 152}
 153
 154impl AgentSessionList for AcpSessionList {
 155    fn list_sessions(
 156        &self,
 157        request: AgentSessionListRequest,
 158        cx: &mut App,
 159    ) -> Task<Result<AgentSessionListResponse>> {
 160        let conn = self.connection.clone();
 161        cx.foreground_executor().spawn(async move {
 162            let acp_request = acp::ListSessionsRequest::new()
 163                .cwd(request.cwd)
 164                .cursor(request.cursor);
 165            let response = into_foreground_future(conn.send_request(acp_request))
 166                .await
 167                .map_err(map_acp_error)?;
 168            Ok(AgentSessionListResponse {
 169                sessions: response
 170                    .sessions
 171                    .into_iter()
 172                    .map(|s| AgentSessionInfo {
 173                        session_id: s.session_id,
 174                        work_dirs: Some(PathList::new(&[s.cwd])),
 175                        title: s.title.map(Into::into),
 176                        updated_at: s.updated_at.and_then(|date_str| {
 177                            chrono::DateTime::parse_from_rfc3339(&date_str)
 178                                .ok()
 179                                .map(|dt| dt.with_timezone(&chrono::Utc))
 180                        }),
 181                        created_at: None,
 182                        meta: s.meta,
 183                    })
 184                    .collect(),
 185                next_cursor: response.next_cursor,
 186                meta: response.meta,
 187            })
 188        })
 189    }
 190
 191    fn watch(
 192        &self,
 193        _cx: &mut App,
 194    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
 195        Some(self.updates_rx.clone())
 196    }
 197
 198    fn notify_refresh(&self) {
 199        self.notify_update();
 200    }
 201
 202    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
 203        self
 204    }
 205}
 206
 207pub async fn connect(
 208    agent_id: AgentId,
 209    project: Entity<Project>,
 210    command: AgentServerCommand,
 211    default_mode: Option<acp::SessionModeId>,
 212    default_model: Option<acp::ModelId>,
 213    default_config_options: HashMap<String, String>,
 214    cx: &mut AsyncApp,
 215) -> Result<Rc<dyn AgentConnection>> {
 216    let conn = AcpConnection::stdio(
 217        agent_id,
 218        project,
 219        command.clone(),
 220        default_mode,
 221        default_model,
 222        default_config_options,
 223        cx,
 224    )
 225    .await?;
 226    Ok(Rc::new(conn) as _)
 227}
 228
 229const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 230
 231macro_rules! dispatch_request_handler {
 232    ($dispatch_tx:expr, $handler:expr) => {{
 233        let dispatch_tx = $dispatch_tx.clone();
 234        async move |args, responder, _connection| {
 235            dispatch_tx
 236                .unbounded_send(Box::new(move |cx, ctx| {
 237                    $handler(args, responder, cx, ctx);
 238                }))
 239                .log_err();
 240            Ok(())
 241        }
 242    }};
 243}
 244
 245macro_rules! dispatch_notification_handler {
 246    ($dispatch_tx:expr, $handler:expr) => {{
 247        let dispatch_tx = $dispatch_tx.clone();
 248        async move |notification, _connection| {
 249            dispatch_tx
 250                .unbounded_send(Box::new(move |cx, ctx| {
 251                    $handler(notification, cx, ctx);
 252                }))
 253                .log_err();
 254            Ok(())
 255        }
 256    }};
 257}
 258
 259impl AcpConnection {
 260    pub async fn stdio(
 261        agent_id: AgentId,
 262        project: Entity<Project>,
 263        command: AgentServerCommand,
 264        default_mode: Option<acp::SessionModeId>,
 265        default_model: Option<acp::ModelId>,
 266        default_config_options: HashMap<String, String>,
 267        cx: &mut AsyncApp,
 268    ) -> Result<Self> {
 269        let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
 270        let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
 271        let mut child =
 272            builder.build_std_command(Some(command.path.display().to_string()), &command.args);
 273        child.envs(command.env.iter().flatten());
 274        if let Some(cwd) = project.update(cx, |project, cx| {
 275            if project.is_local() {
 276                project
 277                    .default_path_list(cx)
 278                    .ordered_paths()
 279                    .next()
 280                    .cloned()
 281            } else {
 282                None
 283            }
 284        }) {
 285            child.current_dir(cwd);
 286        }
 287        let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 288
 289        let stdout = child.stdout.take().context("Failed to take stdout")?;
 290        let stdin = child.stdin.take().context("Failed to take stdin")?;
 291        let stderr = child.stderr.take().context("Failed to take stderr")?;
 292        log::debug!(
 293            "Spawning external agent server: {:?}, {:?}",
 294            command.path,
 295            command.args
 296        );
 297        log::trace!("Spawned (pid: {})", child.id());
 298
 299        let sessions = Rc::new(RefCell::new(HashMap::default()));
 300
 301        let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
 302            (
 303                release_channel::ReleaseChannel::try_global(cx)
 304                    .map(|release_channel| release_channel.display_name()),
 305                release_channel::AppVersion::global(cx).to_string(),
 306            )
 307        });
 308
 309        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
 310            Rc::new(RefCell::new(None));
 311
 312        // Set up the foreground dispatch channel for bridging Send handler
 313        // closures to the !Send foreground thread.
 314        let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
 315
 316        // Build a tapped transport that intercepts raw JSON-RPC lines for
 317        // the ACP logs panel. Raw lines are sent without parsing — deserialization
 318        // is deferred until a subscriber is actually listening.
 319        let (stream_tap_tx, stream_tap_rx) = smol::channel::unbounded::<RawStreamLine>();
 320
 321        let incoming_lines = futures::io::BufReader::new(stdout).lines();
 322        let tapped_incoming = incoming_lines.inspect({
 323            let tap_tx = stream_tap_tx.clone();
 324            move |result| {
 325                if let Ok(line) = result {
 326                    tap_tx
 327                        .try_send(RawStreamLine {
 328                            direction: StreamMessageDirection::Incoming,
 329                            line: Arc::from(line.as_str()),
 330                        })
 331                        .log_err();
 332                }
 333            }
 334        });
 335
 336        let tapped_outgoing = futures::sink::unfold(
 337            (Box::pin(stdin), stream_tap_tx),
 338            async move |(mut writer, tap_tx), line: String| {
 339                use futures::AsyncWriteExt;
 340                tap_tx
 341                    .try_send(RawStreamLine {
 342                        direction: StreamMessageDirection::Outgoing,
 343                        line: Arc::from(line.as_str()),
 344                    })
 345                    .log_err();
 346                let mut bytes = line.into_bytes();
 347                bytes.push(b'\n');
 348                writer.write_all(&bytes).await?;
 349                Ok::<_, std::io::Error>((writer, tap_tx))
 350            },
 351        );
 352
 353        let transport = Lines::new(tapped_outgoing, tapped_incoming);
 354
 355        // Use a oneshot channel to extract the ConnectionTo<Agent> from the
 356        // connect_with closure.
 357        let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
 358
 359        // Build the connection with handler closures.
 360        // All handlers forward work to the foreground dispatch channel.
 361        let connection_future = {
 362            let dispatch_tx = dispatch_tx.clone();
 363            Client
 364                .builder()
 365                .name("zed")
 366                // --- Request handlers (agent→client) ---
 367                .on_receive_request(
 368                    dispatch_request_handler!(dispatch_tx, handle_request_permission),
 369                    agent_client_protocol::on_receive_request!(),
 370                )
 371                .on_receive_request(
 372                    dispatch_request_handler!(dispatch_tx, handle_write_text_file),
 373                    agent_client_protocol::on_receive_request!(),
 374                )
 375                .on_receive_request(
 376                    dispatch_request_handler!(dispatch_tx, handle_read_text_file),
 377                    agent_client_protocol::on_receive_request!(),
 378                )
 379                .on_receive_request(
 380                    dispatch_request_handler!(dispatch_tx, handle_create_terminal),
 381                    agent_client_protocol::on_receive_request!(),
 382                )
 383                .on_receive_request(
 384                    dispatch_request_handler!(dispatch_tx, handle_kill_terminal),
 385                    agent_client_protocol::on_receive_request!(),
 386                )
 387                .on_receive_request(
 388                    dispatch_request_handler!(dispatch_tx, handle_release_terminal),
 389                    agent_client_protocol::on_receive_request!(),
 390                )
 391                .on_receive_request(
 392                    dispatch_request_handler!(dispatch_tx, handle_terminal_output),
 393                    agent_client_protocol::on_receive_request!(),
 394                )
 395                .on_receive_request(
 396                    dispatch_request_handler!(dispatch_tx, handle_wait_for_terminal_exit),
 397                    agent_client_protocol::on_receive_request!(),
 398                )
 399                // --- Notification handlers (agent→client) ---
 400                .on_receive_notification(
 401                    dispatch_notification_handler!(dispatch_tx, handle_session_notification),
 402                    agent_client_protocol::on_receive_notification!(),
 403                )
 404                .connect_with(
 405                    transport,
 406                    move |connection: ConnectionTo<Agent>| async move {
 407                        if connection_tx.send(connection.clone()).is_err() {
 408                            log::error!(
 409                                "failed to send ACP connection handle — receiver was dropped"
 410                            );
 411                        }
 412                        // Keep the connection alive until the transport closes.
 413                        futures::future::pending::<Result<(), acp::Error>>().await
 414                    },
 415                )
 416        };
 417
 418        // Spawn the connection loop on a background thread.
 419        let io_task = cx.background_spawn(async move {
 420            if let Err(err) = connection_future.await {
 421                log::error!("ACP connection error: {err}");
 422            }
 423        });
 424
 425        // Wait for the ConnectionTo<Agent> handle.
 426        let connection: ConnectionTo<Agent> = connection_rx
 427            .await
 428            .context("Failed to receive ACP connection handle")?;
 429
 430        // Set up the foreground dispatch loop to process work items from handlers.
 431        let dispatch_context = ClientContext {
 432            sessions: sessions.clone(),
 433            session_list: client_session_list.clone(),
 434        };
 435        let dispatch_task = cx.spawn({
 436            let mut dispatch_rx = dispatch_rx;
 437            async move |cx| {
 438                while let Some(work) = dispatch_rx.next().await {
 439                    work(cx, &dispatch_context);
 440                }
 441            }
 442        });
 443
 444        let stderr_task = cx.background_spawn(async move {
 445            let mut stderr = BufReader::new(stderr);
 446            let mut line = String::new();
 447            while let Ok(n) = stderr.read_line(&mut line).await
 448                && n > 0
 449            {
 450                log::warn!("agent stderr: {}", line.trim());
 451                line.clear();
 452            }
 453            Ok(())
 454        });
 455
 456        let wait_task = cx.spawn({
 457            let sessions = sessions.clone();
 458            let status_fut = child.status();
 459            async move |cx| {
 460                let status = status_fut.await?;
 461
 462                for session in sessions.borrow().values() {
 463                    session
 464                        .thread
 465                        .update(cx, |thread, cx| {
 466                            thread.emit_load_error(LoadError::Exited { status }, cx)
 467                        })
 468                        .ok();
 469                }
 470
 471                anyhow::Ok(())
 472            }
 473        });
 474
 475        cx.update(|cx| {
 476            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
 477                registry.set_active_connection(agent_id.clone(), stream_tap_rx, cx)
 478            });
 479        });
 480
 481        let response = into_foreground_future(
 482            connection.send_request(
 483                acp::InitializeRequest::new(acp::ProtocolVersion::V1)
 484                    .client_capabilities(
 485                        acp::ClientCapabilities::new()
 486                            .fs(acp::FileSystemCapabilities::new()
 487                                .read_text_file(true)
 488                                .write_text_file(true))
 489                            .terminal(true)
 490                            .auth(acp::AuthCapabilities::new().terminal(true))
 491                            .meta(acp::Meta::from_iter([
 492                                ("terminal_output".into(), true.into()),
 493                                ("terminal-auth".into(), true.into()),
 494                            ])),
 495                    )
 496                    .client_info(
 497                        acp::Implementation::new("zed", version)
 498                            .title(release_channel.map(ToOwned::to_owned)),
 499                    ),
 500            ),
 501        )
 502        .await?;
 503
 504        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
 505            return Err(UnsupportedVersion.into());
 506        }
 507
 508        let telemetry_id = response
 509            .agent_info
 510            // Use the one the agent provides if we have one
 511            .map(|info| info.name.into())
 512            // Otherwise, just use the name
 513            .unwrap_or_else(|| agent_id.0.to_string().into());
 514
 515        let session_list = if response
 516            .agent_capabilities
 517            .session_capabilities
 518            .list
 519            .is_some()
 520        {
 521            let list = Rc::new(AcpSessionList::new(connection.clone()));
 522            *client_session_list.borrow_mut() = Some(list.clone());
 523            Some(list)
 524        } else {
 525            None
 526        };
 527
 528        // TODO: Remove this override once Google team releases their official auth methods
 529        let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
 530            let mut args = command.args.clone();
 531            args.retain(|a| a != "--experimental-acp" && a != "--acp");
 532            let value = serde_json::json!({
 533                "label": "gemini /auth",
 534                "command": command.path.to_string_lossy().into_owned(),
 535                "args": args,
 536                "env": command.env.clone().unwrap_or_default(),
 537            });
 538            let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
 539            vec![acp::AuthMethod::Agent(
 540                acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
 541                    .description("Login with your Google or Vertex AI account")
 542                    .meta(meta),
 543            )]
 544        } else {
 545            response.auth_methods
 546        };
 547        Ok(Self {
 548            id: agent_id,
 549            auth_methods,
 550            command,
 551            connection,
 552            telemetry_id,
 553            sessions,
 554            agent_capabilities: response.agent_capabilities,
 555            default_mode,
 556            default_model,
 557            default_config_options,
 558            session_list,
 559            _io_task: io_task,
 560            _dispatch_task: dispatch_task,
 561            _wait_task: wait_task,
 562            _stderr_task: stderr_task,
 563            child,
 564        })
 565    }
 566
 567    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
 568        &self.agent_capabilities.prompt_capabilities
 569    }
 570
 571    fn apply_default_config_options(
 572        &self,
 573        session_id: &acp::SessionId,
 574        config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
 575        cx: &mut AsyncApp,
 576    ) {
 577        let id = self.id.clone();
 578        let defaults_to_apply: Vec<_> = {
 579            let config_opts_ref = config_options.borrow();
 580            config_opts_ref
 581                .iter()
 582                .filter_map(|config_option| {
 583                    let default_value = self.default_config_options.get(&*config_option.id.0)?;
 584
 585                    let is_valid = match &config_option.kind {
 586                        acp::SessionConfigKind::Select(select) => match &select.options {
 587                            acp::SessionConfigSelectOptions::Ungrouped(options) => options
 588                                .iter()
 589                                .any(|opt| &*opt.value.0 == default_value.as_str()),
 590                            acp::SessionConfigSelectOptions::Grouped(groups) => {
 591                                groups.iter().any(|g| {
 592                                    g.options
 593                                        .iter()
 594                                        .any(|opt| &*opt.value.0 == default_value.as_str())
 595                                })
 596                            }
 597                            _ => false,
 598                        },
 599                        _ => false,
 600                    };
 601
 602                    if is_valid {
 603                        let initial_value = match &config_option.kind {
 604                            acp::SessionConfigKind::Select(select) => {
 605                                Some(select.current_value.clone())
 606                            }
 607                            _ => None,
 608                        };
 609                        Some((
 610                            config_option.id.clone(),
 611                            default_value.clone(),
 612                            initial_value,
 613                        ))
 614                    } else {
 615                        log::warn!(
 616                            "`{}` is not a valid value for config option `{}` in {}",
 617                            default_value,
 618                            config_option.id.0,
 619                            id
 620                        );
 621                        None
 622                    }
 623                })
 624                .collect()
 625        };
 626
 627        for (config_id, default_value, initial_value) in defaults_to_apply {
 628            cx.spawn({
 629                let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
 630                let session_id = session_id.clone();
 631                let config_id_clone = config_id.clone();
 632                let config_opts = config_options.clone();
 633                let conn = self.connection.clone();
 634                async move |_| {
 635                    let result = into_foreground_future(conn.send_request(
 636                        acp::SetSessionConfigOptionRequest::new(
 637                            session_id,
 638                            config_id_clone.clone(),
 639                            default_value_id,
 640                        ),
 641                    ))
 642                    .await
 643                    .log_err();
 644
 645                    if result.is_none() {
 646                        if let Some(initial) = initial_value {
 647                            let mut opts = config_opts.borrow_mut();
 648                            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
 649                                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 650                                    select.current_value = initial;
 651                                }
 652                            }
 653                        }
 654                    }
 655                }
 656            })
 657            .detach();
 658
 659            let mut opts = config_options.borrow_mut();
 660            if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
 661                if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
 662                    select.current_value = acp::SessionConfigValueId::new(default_value);
 663                }
 664            }
 665        }
 666    }
 667}
 668
 669impl Drop for AcpConnection {
 670    fn drop(&mut self) {
 671        self.child.kill().log_err();
 672    }
 673}
 674
 675fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
 676    format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
 677}
 678
 679fn terminal_auth_task(
 680    command: &AgentServerCommand,
 681    agent_id: &AgentId,
 682    method: &acp::AuthMethodTerminal,
 683) -> SpawnInTerminal {
 684    let mut args = command.args.clone();
 685    args.extend(method.args.clone());
 686
 687    let mut env = command.env.clone().unwrap_or_default();
 688    env.extend(method.env.clone());
 689
 690    acp_thread::build_terminal_auth_task(
 691        terminal_auth_task_id(agent_id, &method.id),
 692        method.name.clone(),
 693        command.path.to_string_lossy().into_owned(),
 694        args,
 695        env,
 696    )
 697}
 698
 699/// Used to support the _meta method prior to stabilization
 700fn meta_terminal_auth_task(
 701    agent_id: &AgentId,
 702    method_id: &acp::AuthMethodId,
 703    method: &acp::AuthMethod,
 704) -> Option<SpawnInTerminal> {
 705    #[derive(Deserialize)]
 706    struct MetaTerminalAuth {
 707        label: String,
 708        command: String,
 709        #[serde(default)]
 710        args: Vec<String>,
 711        #[serde(default)]
 712        env: HashMap<String, String>,
 713    }
 714
 715    let meta = match method {
 716        acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
 717        acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
 718        acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
 719        _ => None,
 720    }?;
 721    let terminal_auth =
 722        serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
 723
 724    Some(acp_thread::build_terminal_auth_task(
 725        terminal_auth_task_id(agent_id, method_id),
 726        terminal_auth.label.clone(),
 727        terminal_auth.command,
 728        terminal_auth.args,
 729        terminal_auth.env,
 730    ))
 731}
 732
 733impl AgentConnection for AcpConnection {
 734    fn agent_id(&self) -> AgentId {
 735        self.id.clone()
 736    }
 737
 738    fn telemetry_id(&self) -> SharedString {
 739        self.telemetry_id.clone()
 740    }
 741
 742    fn new_session(
 743        self: Rc<Self>,
 744        project: Entity<Project>,
 745        work_dirs: PathList,
 746        cx: &mut App,
 747    ) -> Task<Result<Entity<AcpThread>>> {
 748        // TODO: remove this once ACP supports multiple working directories
 749        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 750            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 751        };
 752        let name = self.id.0.clone();
 753        let mcp_servers = mcp_servers_for_project(&project, cx);
 754
 755        cx.spawn(async move |cx| {
 756            let response = into_foreground_future(
 757                self.connection
 758                    .send_request(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers)),
 759            )
 760            .await
 761            .map_err(map_acp_error)?;
 762
 763            let (modes, models, config_options) =
 764                config_state(response.modes, response.models, response.config_options);
 765
 766            if let Some(default_mode) = self.default_mode.clone() {
 767                if let Some(modes) = modes.as_ref() {
 768                    let mut modes_ref = modes.borrow_mut();
 769                    let has_mode = modes_ref
 770                        .available_modes
 771                        .iter()
 772                        .any(|mode| mode.id == default_mode);
 773
 774                    if has_mode {
 775                        let initial_mode_id = modes_ref.current_mode_id.clone();
 776
 777                        cx.spawn({
 778                            let default_mode = default_mode.clone();
 779                            let session_id = response.session_id.clone();
 780                            let modes = modes.clone();
 781                            let conn = self.connection.clone();
 782                            async move |_| {
 783                                let result = into_foreground_future(
 784                                    conn.send_request(acp::SetSessionModeRequest::new(
 785                                        session_id,
 786                                        default_mode,
 787                                    )),
 788                                )
 789                                .await
 790                                .log_err();
 791
 792                                if result.is_none() {
 793                                    modes.borrow_mut().current_mode_id = initial_mode_id;
 794                                }
 795                            }
 796                        })
 797                        .detach();
 798
 799                        modes_ref.current_mode_id = default_mode;
 800                    } else {
 801                        let available_modes = modes_ref
 802                            .available_modes
 803                            .iter()
 804                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
 805                            .collect::<Vec<_>>()
 806                            .join("\n");
 807
 808                        log::warn!(
 809                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
 810                        );
 811                    }
 812                }
 813            }
 814
 815            if let Some(default_model) = self.default_model.clone() {
 816                if let Some(models) = models.as_ref() {
 817                    let mut models_ref = models.borrow_mut();
 818                    let has_model = models_ref
 819                        .available_models
 820                        .iter()
 821                        .any(|model| model.model_id == default_model);
 822
 823                    if has_model {
 824                        let initial_model_id = models_ref.current_model_id.clone();
 825
 826                        cx.spawn({
 827                            let default_model = default_model.clone();
 828                            let session_id = response.session_id.clone();
 829                            let models = models.clone();
 830                            let conn = self.connection.clone();
 831                            async move |_| {
 832                                let result = into_foreground_future(
 833                                    conn.send_request(acp::SetSessionModelRequest::new(
 834                                        session_id,
 835                                        default_model,
 836                                    )),
 837                                )
 838                                .await
 839                                .log_err();
 840
 841                                if result.is_none() {
 842                                    models.borrow_mut().current_model_id = initial_model_id;
 843                                }
 844                            }
 845                        })
 846                        .detach();
 847
 848                        models_ref.current_model_id = default_model;
 849                    } else {
 850                        let available_models = models_ref
 851                            .available_models
 852                            .iter()
 853                            .map(|model| format!("- `{}`: {}", model.model_id, model.name))
 854                            .collect::<Vec<_>>()
 855                            .join("\n");
 856
 857                        log::warn!(
 858                            "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
 859                        );
 860                    }
 861                }
 862            }
 863
 864            if let Some(config_opts) = config_options.as_ref() {
 865                self.apply_default_config_options(&response.session_id, config_opts, cx);
 866            }
 867
 868            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 869            let thread: Entity<AcpThread> = cx.new(|cx| {
 870                AcpThread::new(
 871                    None,
 872                    None,
 873                    Some(work_dirs),
 874                    self.clone(),
 875                    project,
 876                    action_log,
 877                    response.session_id.clone(),
 878                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
 879                    watch::Receiver::constant(
 880                        self.agent_capabilities.prompt_capabilities.clone(),
 881                    ),
 882                    cx,
 883                )
 884            });
 885
 886            self.sessions.borrow_mut().insert(
 887                response.session_id,
 888                AcpSession {
 889                    thread: thread.downgrade(),
 890                    suppress_abort_err: false,
 891                    session_modes: modes,
 892                    models,
 893                    config_options: config_options.map(ConfigOptions::new),
 894                },
 895            );
 896
 897            Ok(thread)
 898        })
 899    }
 900
 901    fn supports_load_session(&self) -> bool {
 902        self.agent_capabilities.load_session
 903    }
 904
 905    fn supports_resume_session(&self) -> bool {
 906        self.agent_capabilities
 907            .session_capabilities
 908            .resume
 909            .is_some()
 910    }
 911
 912    fn load_session(
 913        self: Rc<Self>,
 914        session_id: acp::SessionId,
 915        project: Entity<Project>,
 916        work_dirs: PathList,
 917        title: Option<SharedString>,
 918        cx: &mut App,
 919    ) -> Task<Result<Entity<AcpThread>>> {
 920        if !self.agent_capabilities.load_session {
 921            return Task::ready(Err(anyhow!(LoadError::Other(
 922                "Loading sessions is not supported by this agent.".into()
 923            ))));
 924        }
 925        // TODO: remove this once ACP supports multiple working directories
 926        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
 927            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
 928        };
 929
 930        let mcp_servers = mcp_servers_for_project(&project, cx);
 931        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 932        let thread: Entity<AcpThread> = cx.new(|cx| {
 933            AcpThread::new(
 934                None,
 935                title,
 936                Some(work_dirs.clone()),
 937                self.clone(),
 938                project,
 939                action_log,
 940                session_id.clone(),
 941                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
 942                cx,
 943            )
 944        });
 945
 946        self.sessions.borrow_mut().insert(
 947            session_id.clone(),
 948            AcpSession {
 949                thread: thread.downgrade(),
 950                suppress_abort_err: false,
 951                session_modes: None,
 952                models: None,
 953                config_options: None,
 954            },
 955        );
 956
 957        cx.spawn(async move |cx| {
 958            let response = match into_foreground_future(self.connection.send_request(
 959                acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
 960            ))
 961            .await
 962            {
 963                Ok(response) => response,
 964                Err(err) => {
 965                    self.sessions.borrow_mut().remove(&session_id);
 966                    return Err(map_acp_error(err));
 967                }
 968            };
 969
 970            let (modes, models, config_options) =
 971                config_state(response.modes, response.models, response.config_options);
 972
 973            if let Some(config_opts) = config_options.as_ref() {
 974                self.apply_default_config_options(&session_id, config_opts, cx);
 975            }
 976
 977            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
 978                session.session_modes = modes;
 979                session.models = models;
 980                session.config_options = config_options.map(ConfigOptions::new);
 981            }
 982
 983            Ok(thread)
 984        })
 985    }
 986
 987    fn resume_session(
 988        self: Rc<Self>,
 989        session_id: acp::SessionId,
 990        project: Entity<Project>,
 991        work_dirs: PathList,
 992        title: Option<SharedString>,
 993        cx: &mut App,
 994    ) -> Task<Result<Entity<AcpThread>>> {
 995        if self
 996            .agent_capabilities
 997            .session_capabilities
 998            .resume
 999            .is_none()
1000        {
1001            return Task::ready(Err(anyhow!(LoadError::Other(
1002                "Resuming sessions is not supported by this agent.".into()
1003            ))));
1004        }
1005        // TODO: remove this once ACP supports multiple working directories
1006        let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
1007            return Task::ready(Err(anyhow!("Working directory cannot be empty")));
1008        };
1009
1010        let mcp_servers = mcp_servers_for_project(&project, cx);
1011        let action_log = cx.new(|_| ActionLog::new(project.clone()));
1012        let thread: Entity<AcpThread> = cx.new(|cx| {
1013            AcpThread::new(
1014                None,
1015                title,
1016                Some(work_dirs),
1017                self.clone(),
1018                project,
1019                action_log,
1020                session_id.clone(),
1021                watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
1022                cx,
1023            )
1024        });
1025
1026        self.sessions.borrow_mut().insert(
1027            session_id.clone(),
1028            AcpSession {
1029                thread: thread.downgrade(),
1030                suppress_abort_err: false,
1031                session_modes: None,
1032                models: None,
1033                config_options: None,
1034            },
1035        );
1036
1037        cx.spawn(async move |cx| {
1038            let response = match into_foreground_future(self.connection.send_request(
1039                acp::ResumeSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
1040            ))
1041            .await
1042            {
1043                Ok(response) => response,
1044                Err(err) => {
1045                    self.sessions.borrow_mut().remove(&session_id);
1046                    return Err(map_acp_error(err));
1047                }
1048            };
1049
1050            let (modes, models, config_options) =
1051                config_state(response.modes, response.models, response.config_options);
1052
1053            if let Some(config_opts) = config_options.as_ref() {
1054                self.apply_default_config_options(&session_id, config_opts, cx);
1055            }
1056
1057            if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
1058                session.session_modes = modes;
1059                session.models = models;
1060                session.config_options = config_options.map(ConfigOptions::new);
1061            }
1062
1063            Ok(thread)
1064        })
1065    }
1066
1067    fn supports_close_session(&self) -> bool {
1068        self.agent_capabilities.session_capabilities.close.is_some()
1069    }
1070
1071    fn close_session(
1072        self: Rc<Self>,
1073        session_id: &acp::SessionId,
1074        cx: &mut App,
1075    ) -> Task<Result<()>> {
1076        if !self.supports_close_session() {
1077            return Task::ready(Err(anyhow!(LoadError::Other(
1078                "Closing sessions is not supported by this agent.".into()
1079            ))));
1080        }
1081
1082        let conn = self.connection.clone();
1083        let session_id = session_id.clone();
1084        cx.foreground_executor().spawn(async move {
1085            into_foreground_future(
1086                conn.send_request(acp::CloseSessionRequest::new(session_id.clone())),
1087            )
1088            .await?;
1089            self.sessions.borrow_mut().remove(&session_id);
1090            Ok(())
1091        })
1092    }
1093
1094    fn auth_methods(&self) -> &[acp::AuthMethod] {
1095        &self.auth_methods
1096    }
1097
1098    fn terminal_auth_task(
1099        &self,
1100        method_id: &acp::AuthMethodId,
1101        cx: &App,
1102    ) -> Option<SpawnInTerminal> {
1103        let method = self
1104            .auth_methods
1105            .iter()
1106            .find(|method| method.id() == method_id)?;
1107
1108        match method {
1109            acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1110                Some(terminal_auth_task(&self.command, &self.id, terminal))
1111            }
1112            _ => meta_terminal_auth_task(&self.id, method_id, method),
1113        }
1114    }
1115
1116    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1117        let conn = self.connection.clone();
1118        cx.foreground_executor().spawn(async move {
1119            into_foreground_future(conn.send_request(acp::AuthenticateRequest::new(method_id)))
1120                .await?;
1121            Ok(())
1122        })
1123    }
1124
1125    fn prompt(
1126        &self,
1127        _id: Option<acp_thread::UserMessageId>,
1128        params: acp::PromptRequest,
1129        cx: &mut App,
1130    ) -> Task<Result<acp::PromptResponse>> {
1131        let conn = self.connection.clone();
1132        let sessions = self.sessions.clone();
1133        let session_id = params.session_id.clone();
1134        cx.foreground_executor().spawn(async move {
1135            let result = into_foreground_future(conn.send_request(params)).await;
1136
1137            let mut suppress_abort_err = false;
1138
1139            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1140                suppress_abort_err = session.suppress_abort_err;
1141                session.suppress_abort_err = false;
1142            }
1143
1144            match result {
1145                Ok(response) => Ok(response),
1146                Err(err) => {
1147                    if err.code == acp::ErrorCode::AuthRequired {
1148                        return Err(anyhow!(acp::Error::auth_required()));
1149                    }
1150
1151                    if err.code != ErrorCode::InternalError {
1152                        anyhow::bail!(err)
1153                    }
1154
1155                    let Some(data) = &err.data else {
1156                        anyhow::bail!(err)
1157                    };
1158
1159                    // Temporary workaround until the following PR is generally available:
1160                    // https://github.com/google-gemini/gemini-cli/pull/6656
1161
1162                    #[derive(Deserialize)]
1163                    #[serde(deny_unknown_fields)]
1164                    struct ErrorDetails {
1165                        details: Box<str>,
1166                    }
1167
1168                    match serde_json::from_value(data.clone()) {
1169                        Ok(ErrorDetails { details }) => {
1170                            if suppress_abort_err
1171                                && (details.contains("This operation was aborted")
1172                                    || details.contains("The user aborted a request"))
1173                            {
1174                                Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1175                            } else {
1176                                Err(anyhow!(details))
1177                            }
1178                        }
1179                        Err(_) => Err(anyhow!(err)),
1180                    }
1181                }
1182            }
1183        })
1184    }
1185
1186    fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) {
1187        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1188            session.suppress_abort_err = true;
1189        }
1190        let params = acp::CancelNotification::new(session_id.clone());
1191        self.connection.send_notification(params).log_err();
1192    }
1193
1194    fn session_modes(
1195        &self,
1196        session_id: &acp::SessionId,
1197        _cx: &App,
1198    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1199        let sessions = self.sessions.clone();
1200        let sessions_ref = sessions.borrow();
1201        let Some(session) = sessions_ref.get(session_id) else {
1202            return None;
1203        };
1204
1205        if let Some(modes) = session.session_modes.as_ref() {
1206            Some(Rc::new(AcpSessionModes {
1207                connection: self.connection.clone(),
1208                session_id: session_id.clone(),
1209                state: modes.clone(),
1210            }) as _)
1211        } else {
1212            None
1213        }
1214    }
1215
1216    fn model_selector(
1217        &self,
1218        session_id: &acp::SessionId,
1219    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1220        let sessions = self.sessions.clone();
1221        let sessions_ref = sessions.borrow();
1222        let Some(session) = sessions_ref.get(session_id) else {
1223            return None;
1224        };
1225
1226        if let Some(models) = session.models.as_ref() {
1227            Some(Rc::new(AcpModelSelector::new(
1228                session_id.clone(),
1229                self.connection.clone(),
1230                models.clone(),
1231            )) as _)
1232        } else {
1233            None
1234        }
1235    }
1236
1237    fn session_config_options(
1238        &self,
1239        session_id: &acp::SessionId,
1240        _cx: &App,
1241    ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1242        let sessions = self.sessions.borrow();
1243        let session = sessions.get(session_id)?;
1244
1245        let config_opts = session.config_options.as_ref()?;
1246
1247        Some(Rc::new(AcpSessionConfigOptions {
1248            session_id: session_id.clone(),
1249            connection: self.connection.clone(),
1250            state: config_opts.config_options.clone(),
1251            watch_tx: config_opts.tx.clone(),
1252            watch_rx: config_opts.rx.clone(),
1253        }) as _)
1254    }
1255
1256    fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1257        self.session_list.clone().map(|s| s as _)
1258    }
1259
1260    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1261        self
1262    }
1263}
1264
1265fn map_acp_error(err: acp::Error) -> anyhow::Error {
1266    if err.code == acp::ErrorCode::AuthRequired {
1267        let mut error = AuthRequired::new();
1268
1269        if err.message != acp::ErrorCode::AuthRequired.to_string() {
1270            error = error.with_description(err.message);
1271        }
1272
1273        anyhow!(error)
1274    } else {
1275        anyhow!(err)
1276    }
1277}
1278
1279#[cfg(test)]
1280mod tests {
1281    use super::*;
1282
1283    #[test]
1284    fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1285        let command = AgentServerCommand {
1286            path: "/path/to/agent".into(),
1287            args: vec!["--acp".into(), "--verbose".into()],
1288            env: Some(HashMap::from_iter([
1289                ("BASE".into(), "1".into()),
1290                ("SHARED".into(), "base".into()),
1291            ])),
1292        };
1293        let method = acp::AuthMethodTerminal::new("login", "Login")
1294            .args(vec!["/auth".into()])
1295            .env(std::collections::HashMap::from_iter([
1296                ("EXTRA".into(), "2".into()),
1297                ("SHARED".into(), "override".into()),
1298            ]));
1299
1300        let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1301
1302        assert_eq!(
1303            terminal_auth_task.command.as_deref(),
1304            Some("/path/to/agent")
1305        );
1306        assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1307        assert_eq!(
1308            terminal_auth_task.env,
1309            HashMap::from_iter([
1310                ("BASE".into(), "1".into()),
1311                ("SHARED".into(), "override".into()),
1312                ("EXTRA".into(), "2".into()),
1313            ])
1314        );
1315        assert_eq!(terminal_auth_task.label, "Login");
1316        assert_eq!(terminal_auth_task.command_label, "Login");
1317    }
1318
1319    #[test]
1320    fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1321        let method_id = acp::AuthMethodId::new("legacy-login");
1322        let method = acp::AuthMethod::Agent(
1323            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1324                "terminal-auth".to_string(),
1325                serde_json::json!({
1326                    "label": "legacy /auth",
1327                    "command": "legacy-agent",
1328                    "args": ["auth", "--interactive"],
1329                    "env": {
1330                        "AUTH_MODE": "interactive",
1331                    },
1332                }),
1333            )])),
1334        );
1335
1336        let terminal_auth_task =
1337            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1338                .expect("expected legacy terminal auth task");
1339
1340        assert_eq!(
1341            terminal_auth_task.id.0,
1342            "external-agent-test-agent-legacy-login-login"
1343        );
1344        assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1345        assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1346        assert_eq!(
1347            terminal_auth_task.env,
1348            HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1349        );
1350        assert_eq!(terminal_auth_task.label, "legacy /auth");
1351    }
1352
1353    #[test]
1354    fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1355        let method_id = acp::AuthMethodId::new("legacy-login");
1356        let method = acp::AuthMethod::Agent(
1357            acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1358                "terminal-auth".to_string(),
1359                serde_json::json!({
1360                    "label": "legacy /auth",
1361                }),
1362            )])),
1363        );
1364
1365        assert!(
1366            meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1367        );
1368    }
1369
1370    #[test]
1371    fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1372        let method_id = acp::AuthMethodId::new("login");
1373        let method = acp::AuthMethod::Terminal(
1374            acp::AuthMethodTerminal::new(method_id, "Login")
1375                .args(vec!["/auth".into()])
1376                .env(std::collections::HashMap::from_iter([(
1377                    "AUTH_MODE".into(),
1378                    "first-class".into(),
1379                )]))
1380                .meta(acp::Meta::from_iter([(
1381                    "terminal-auth".to_string(),
1382                    serde_json::json!({
1383                        "label": "legacy /auth",
1384                        "command": "legacy-agent",
1385                        "args": ["legacy-auth"],
1386                        "env": {
1387                            "AUTH_MODE": "legacy",
1388                        },
1389                    }),
1390                )])),
1391        );
1392
1393        let command = AgentServerCommand {
1394            path: "/path/to/agent".into(),
1395            args: vec!["--acp".into()],
1396            env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1397        };
1398
1399        let terminal_auth_task = match &method {
1400            acp::AuthMethod::Terminal(terminal) => {
1401                terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1402            }
1403            _ => unreachable!(),
1404        };
1405
1406        assert_eq!(
1407            terminal_auth_task.command.as_deref(),
1408            Some("/path/to/agent")
1409        );
1410        assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1411        assert_eq!(
1412            terminal_auth_task.env,
1413            HashMap::from_iter([
1414                ("BASE".into(), "1".into()),
1415                ("AUTH_MODE".into(), "first-class".into()),
1416            ])
1417        );
1418        assert_eq!(terminal_auth_task.label, "Login");
1419    }
1420}
1421
1422fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1423    let context_server_store = project.read(cx).context_server_store().read(cx);
1424    let is_local = project.read(cx).is_local();
1425    context_server_store
1426        .configured_server_ids()
1427        .iter()
1428        .filter_map(|id| {
1429            let configuration = context_server_store.configuration_for_server(id)?;
1430            match &*configuration {
1431                project::context_server_store::ContextServerConfiguration::Custom {
1432                    command,
1433                    remote,
1434                    ..
1435                }
1436                | project::context_server_store::ContextServerConfiguration::Extension {
1437                    command,
1438                    remote,
1439                    ..
1440                } if is_local || *remote => Some(acp::McpServer::Stdio(
1441                    acp::McpServerStdio::new(id.0.to_string(), &command.path)
1442                        .args(command.args.clone())
1443                        .env(if let Some(env) = command.env.as_ref() {
1444                            env.iter()
1445                                .map(|(name, value)| acp::EnvVariable::new(name, value))
1446                                .collect()
1447                        } else {
1448                            vec![]
1449                        }),
1450                )),
1451                project::context_server_store::ContextServerConfiguration::Http {
1452                    url,
1453                    headers,
1454                    timeout: _,
1455                } => Some(acp::McpServer::Http(
1456                    acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1457                        headers
1458                            .iter()
1459                            .map(|(name, value)| acp::HttpHeader::new(name, value))
1460                            .collect(),
1461                    ),
1462                )),
1463                _ => None,
1464            }
1465        })
1466        .collect()
1467}
1468
1469fn config_state(
1470    modes: Option<acp::SessionModeState>,
1471    models: Option<acp::SessionModelState>,
1472    config_options: Option<Vec<acp::SessionConfigOption>>,
1473) -> (
1474    Option<Rc<RefCell<acp::SessionModeState>>>,
1475    Option<Rc<RefCell<acp::SessionModelState>>>,
1476    Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1477) {
1478    if let Some(opts) = config_options {
1479        return (None, None, Some(Rc::new(RefCell::new(opts))));
1480    }
1481
1482    let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1483    let models = models.map(|models| Rc::new(RefCell::new(models)));
1484    (modes, models, None)
1485}
1486
1487struct AcpSessionModes {
1488    session_id: acp::SessionId,
1489    connection: ConnectionTo<Agent>,
1490    state: Rc<RefCell<acp::SessionModeState>>,
1491}
1492
1493impl acp_thread::AgentSessionModes for AcpSessionModes {
1494    fn current_mode(&self) -> acp::SessionModeId {
1495        self.state.borrow().current_mode_id.clone()
1496    }
1497
1498    fn all_modes(&self) -> Vec<acp::SessionMode> {
1499        self.state.borrow().available_modes.clone()
1500    }
1501
1502    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1503        let connection = self.connection.clone();
1504        let session_id = self.session_id.clone();
1505        let old_mode_id;
1506        {
1507            let mut state = self.state.borrow_mut();
1508            old_mode_id = state.current_mode_id.clone();
1509            state.current_mode_id = mode_id.clone();
1510        };
1511        let state = self.state.clone();
1512        cx.foreground_executor().spawn(async move {
1513            let result = into_foreground_future(
1514                connection.send_request(acp::SetSessionModeRequest::new(session_id, mode_id)),
1515            )
1516            .await;
1517
1518            if result.is_err() {
1519                state.borrow_mut().current_mode_id = old_mode_id;
1520            }
1521
1522            result?;
1523
1524            Ok(())
1525        })
1526    }
1527}
1528
1529struct AcpModelSelector {
1530    session_id: acp::SessionId,
1531    connection: ConnectionTo<Agent>,
1532    state: Rc<RefCell<acp::SessionModelState>>,
1533}
1534
1535impl AcpModelSelector {
1536    fn new(
1537        session_id: acp::SessionId,
1538        connection: ConnectionTo<Agent>,
1539        state: Rc<RefCell<acp::SessionModelState>>,
1540    ) -> Self {
1541        Self {
1542            session_id,
1543            connection,
1544            state,
1545        }
1546    }
1547}
1548
1549impl acp_thread::AgentModelSelector for AcpModelSelector {
1550    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1551        Task::ready(Ok(acp_thread::AgentModelList::Flat(
1552            self.state
1553                .borrow()
1554                .available_models
1555                .clone()
1556                .into_iter()
1557                .map(acp_thread::AgentModelInfo::from)
1558                .collect(),
1559        )))
1560    }
1561
1562    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1563        let connection = self.connection.clone();
1564        let session_id = self.session_id.clone();
1565        let old_model_id;
1566        {
1567            let mut state = self.state.borrow_mut();
1568            old_model_id = state.current_model_id.clone();
1569            state.current_model_id = model_id.clone();
1570        };
1571        let state = self.state.clone();
1572        cx.foreground_executor().spawn(async move {
1573            let result = into_foreground_future(
1574                connection.send_request(acp::SetSessionModelRequest::new(session_id, model_id)),
1575            )
1576            .await;
1577
1578            if result.is_err() {
1579                state.borrow_mut().current_model_id = old_model_id;
1580            }
1581
1582            result?;
1583
1584            Ok(())
1585        })
1586    }
1587
1588    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1589        let state = self.state.borrow();
1590        Task::ready(
1591            state
1592                .available_models
1593                .iter()
1594                .find(|m| m.model_id == state.current_model_id)
1595                .cloned()
1596                .map(acp_thread::AgentModelInfo::from)
1597                .ok_or_else(|| anyhow::anyhow!("Model not found")),
1598        )
1599    }
1600}
1601
1602struct AcpSessionConfigOptions {
1603    session_id: acp::SessionId,
1604    connection: ConnectionTo<Agent>,
1605    state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1606    watch_tx: Rc<RefCell<watch::Sender<()>>>,
1607    watch_rx: watch::Receiver<()>,
1608}
1609
1610impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1611    fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1612        self.state.borrow().clone()
1613    }
1614
1615    fn set_config_option(
1616        &self,
1617        config_id: acp::SessionConfigId,
1618        value: acp::SessionConfigValueId,
1619        cx: &mut App,
1620    ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1621        let connection = self.connection.clone();
1622        let session_id = self.session_id.clone();
1623        let state = self.state.clone();
1624
1625        let watch_tx = self.watch_tx.clone();
1626
1627        cx.foreground_executor().spawn(async move {
1628            let response = into_foreground_future(connection.send_request(
1629                acp::SetSessionConfigOptionRequest::new(session_id, config_id, value),
1630            ))
1631            .await?;
1632
1633            *state.borrow_mut() = response.config_options.clone();
1634            watch_tx.borrow_mut().send(()).ok();
1635            Ok(response.config_options)
1636        })
1637    }
1638
1639    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1640        Some(self.watch_rx.clone())
1641    }
1642}
1643
1644// ---------------------------------------------------------------------------
1645// Handler functions dispatched from background handler closures to the
1646// foreground thread via the ForegroundWork channel.
1647// ---------------------------------------------------------------------------
1648
1649fn session_thread(
1650    ctx: &ClientContext,
1651    session_id: &acp::SessionId,
1652) -> Result<WeakEntity<AcpThread>, acp::Error> {
1653    let sessions = ctx.sessions.borrow();
1654    sessions
1655        .get(session_id)
1656        .map(|session| session.thread.clone())
1657        .ok_or_else(|| acp::Error::internal_error().data(format!("unknown session: {session_id}")))
1658}
1659
1660fn respond_err<T: JsonRpcResponse + Send + 'static>(responder: Responder<T>, err: acp::Error) {
1661    responder.respond_with_error(err).log_err();
1662}
1663
1664fn handle_request_permission(
1665    args: acp::RequestPermissionRequest,
1666    responder: Responder<acp::RequestPermissionResponse>,
1667    cx: &mut AsyncApp,
1668    ctx: &ClientContext,
1669) {
1670    let thread = match session_thread(ctx, &args.session_id) {
1671        Ok(t) => t,
1672        Err(e) => return respond_err(responder, e),
1673    };
1674
1675    cx.spawn(async move |cx| {
1676        let result: Result<_, acp::Error> = async {
1677            let task = thread
1678                .update(cx, |thread, cx| {
1679                    thread.request_tool_call_authorization(
1680                        args.tool_call,
1681                        acp_thread::PermissionOptions::Flat(args.options),
1682                        cx,
1683                    )
1684                })
1685                .map_err(acp::Error::from)?
1686                .map_err(acp::Error::from)?;
1687            Ok(task.await)
1688        }
1689        .await;
1690
1691        match result {
1692            Ok(outcome) => {
1693                responder
1694                    .respond(acp::RequestPermissionResponse::new(outcome.into()))
1695                    .log_err();
1696            }
1697            Err(e) => respond_err(responder, e),
1698        }
1699    })
1700    .detach();
1701}
1702
1703fn handle_write_text_file(
1704    args: acp::WriteTextFileRequest,
1705    responder: Responder<acp::WriteTextFileResponse>,
1706    cx: &mut AsyncApp,
1707    ctx: &ClientContext,
1708) {
1709    let thread = match session_thread(ctx, &args.session_id) {
1710        Ok(t) => t,
1711        Err(e) => return respond_err(responder, e),
1712    };
1713
1714    cx.spawn(async move |cx| {
1715        let result: Result<_, acp::Error> = async {
1716            thread
1717                .update(cx, |thread, cx| {
1718                    thread.write_text_file(args.path, args.content, cx)
1719                })
1720                .map_err(acp::Error::from)?
1721                .await
1722                .map_err(acp::Error::from)?;
1723            Ok(())
1724        }
1725        .await;
1726
1727        match result {
1728            Ok(()) => {
1729                responder
1730                    .respond(acp::WriteTextFileResponse::default())
1731                    .log_err();
1732            }
1733            Err(e) => respond_err(responder, e),
1734        }
1735    })
1736    .detach();
1737}
1738
1739fn handle_read_text_file(
1740    args: acp::ReadTextFileRequest,
1741    responder: Responder<acp::ReadTextFileResponse>,
1742    cx: &mut AsyncApp,
1743    ctx: &ClientContext,
1744) {
1745    let thread = match session_thread(ctx, &args.session_id) {
1746        Ok(t) => t,
1747        Err(e) => return respond_err(responder, e),
1748    };
1749
1750    cx.spawn(async move |cx| {
1751        let result: Result<_, acp::Error> = async {
1752            thread
1753                .update(cx, |thread, cx| {
1754                    thread.read_text_file(args.path, args.line, args.limit, false, cx)
1755                })
1756                .map_err(acp::Error::from)?
1757                .await
1758        }
1759        .await;
1760
1761        match result {
1762            Ok(content) => {
1763                responder
1764                    .respond(acp::ReadTextFileResponse::new(content))
1765                    .log_err();
1766            }
1767            Err(e) => respond_err(responder, e),
1768        }
1769    })
1770    .detach();
1771}
1772
1773fn handle_session_notification(
1774    notification: acp::SessionNotification,
1775    cx: &mut AsyncApp,
1776    ctx: &ClientContext,
1777) {
1778    // Extract everything we need from the session while briefly borrowing.
1779    let (thread, session_modes, config_opts_data) = {
1780        let sessions = ctx.sessions.borrow();
1781        let Some(session) = sessions.get(&notification.session_id) else {
1782            log::warn!(
1783                "Received session notification for unknown session: {:?}",
1784                notification.session_id
1785            );
1786            return;
1787        };
1788        (
1789            session.thread.clone(),
1790            session.session_modes.clone(),
1791            session
1792                .config_options
1793                .as_ref()
1794                .map(|opts| (opts.config_options.clone(), opts.tx.clone())),
1795        )
1796    };
1797    // Borrow is dropped here.
1798
1799    // Apply mode/config/session_list updates without holding the borrow.
1800    if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1801        current_mode_id, ..
1802    }) = &notification.update
1803    {
1804        if let Some(session_modes) = &session_modes {
1805            session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1806        }
1807    }
1808
1809    if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1810        config_options, ..
1811    }) = &notification.update
1812    {
1813        if let Some((config_opts_cell, tx_cell)) = &config_opts_data {
1814            *config_opts_cell.borrow_mut() = config_options.clone();
1815            tx_cell.borrow_mut().send(()).ok();
1816        }
1817    }
1818
1819    if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
1820        && let Some(session_list) = ctx.session_list.borrow().as_ref()
1821    {
1822        session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1823    }
1824
1825    // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1826    if let acp::SessionUpdate::ToolCall(tc) = &notification.update {
1827        if let Some(meta) = &tc.meta {
1828            if let Some(terminal_info) = meta.get("terminal_info") {
1829                if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) {
1830                    let terminal_id = acp::TerminalId::new(id_str);
1831                    let cwd = terminal_info
1832                        .get("cwd")
1833                        .and_then(|v| v.as_str().map(PathBuf::from));
1834
1835                    thread
1836                        .update(cx, |thread, cx| {
1837                            let builder = TerminalBuilder::new_display_only(
1838                                CursorShape::default(),
1839                                AlternateScroll::On,
1840                                None,
1841                                0,
1842                                cx.background_executor(),
1843                                thread.project().read(cx).path_style(cx),
1844                            )?;
1845                            let lower = cx.new(|cx| builder.subscribe(cx));
1846                            thread.on_terminal_provider_event(
1847                                TerminalProviderEvent::Created {
1848                                    terminal_id,
1849                                    label: tc.title.clone(),
1850                                    cwd,
1851                                    output_byte_limit: None,
1852                                    terminal: lower,
1853                                },
1854                                cx,
1855                            );
1856                            anyhow::Ok(())
1857                        })
1858                        .log_err();
1859                }
1860            }
1861        }
1862    }
1863
1864    // Forward the update to the acp_thread as usual.
1865    if let Err(err) = thread
1866        .update(cx, |thread, cx| {
1867            thread.handle_session_update(notification.update.clone(), cx)
1868        })
1869        .and_then(|inner| inner.map_err(anyhow::Error::from))
1870    {
1871        log::error!(
1872            "Failed to handle session update for {:?}: {err:?}",
1873            notification.session_id
1874        );
1875    }
1876
1877    // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1878    if let acp::SessionUpdate::ToolCallUpdate(tcu) = &notification.update {
1879        if let Some(meta) = &tcu.meta {
1880            if let Some(term_out) = meta.get("terminal_output") {
1881                if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1882                    let terminal_id = acp::TerminalId::new(id_str);
1883                    if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1884                        let data = s.as_bytes().to_vec();
1885                        thread
1886                            .update(cx, |thread, cx| {
1887                                thread.on_terminal_provider_event(
1888                                    TerminalProviderEvent::Output { terminal_id, data },
1889                                    cx,
1890                                );
1891                            })
1892                            .log_err();
1893                    }
1894                }
1895            }
1896
1897            if let Some(term_exit) = meta.get("terminal_exit") {
1898                if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1899                    let terminal_id = acp::TerminalId::new(id_str);
1900                    let status = acp::TerminalExitStatus::new()
1901                        .exit_code(
1902                            term_exit
1903                                .get("exit_code")
1904                                .and_then(|v| v.as_u64())
1905                                .map(|i| i as u32),
1906                        )
1907                        .signal(
1908                            term_exit
1909                                .get("signal")
1910                                .and_then(|v| v.as_str().map(|s| s.to_string())),
1911                        );
1912
1913                    thread
1914                        .update(cx, |thread, cx| {
1915                            thread.on_terminal_provider_event(
1916                                TerminalProviderEvent::Exit {
1917                                    terminal_id,
1918                                    status,
1919                                },
1920                                cx,
1921                            );
1922                        })
1923                        .log_err();
1924                }
1925            }
1926        }
1927    }
1928}
1929
1930fn handle_create_terminal(
1931    args: acp::CreateTerminalRequest,
1932    responder: Responder<acp::CreateTerminalResponse>,
1933    cx: &mut AsyncApp,
1934    ctx: &ClientContext,
1935) {
1936    let thread = match session_thread(ctx, &args.session_id) {
1937        Ok(t) => t,
1938        Err(e) => return respond_err(responder, e),
1939    };
1940    let project = match thread
1941        .read_with(cx, |thread, _cx| thread.project().clone())
1942        .map_err(acp::Error::from)
1943    {
1944        Ok(p) => p,
1945        Err(e) => return respond_err(responder, e),
1946    };
1947
1948    cx.spawn(async move |cx| {
1949        let result: Result<_, acp::Error> = async {
1950            let terminal_entity = acp_thread::create_terminal_entity(
1951                args.command.clone(),
1952                &args.args,
1953                args.env
1954                    .into_iter()
1955                    .map(|env| (env.name, env.value))
1956                    .collect(),
1957                args.cwd.clone(),
1958                &project,
1959                cx,
1960            )
1961            .await
1962            .map_err(acp::Error::from)?;
1963
1964            let terminal_entity = thread
1965                .update(cx, |thread, cx| {
1966                    thread.register_terminal_created(
1967                        acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1968                        format!("{} {}", args.command, args.args.join(" ")),
1969                        args.cwd.clone(),
1970                        args.output_byte_limit,
1971                        terminal_entity,
1972                        cx,
1973                    )
1974                })
1975                .map_err(acp::Error::from)?;
1976            let terminal_id = terminal_entity.read_with(cx, |terminal, _| terminal.id().clone());
1977            Ok(terminal_id)
1978        }
1979        .await;
1980
1981        match result {
1982            Ok(terminal_id) => {
1983                responder
1984                    .respond(acp::CreateTerminalResponse::new(terminal_id))
1985                    .log_err();
1986            }
1987            Err(e) => respond_err(responder, e),
1988        }
1989    })
1990    .detach();
1991}
1992
1993fn handle_kill_terminal(
1994    args: acp::KillTerminalRequest,
1995    responder: Responder<acp::KillTerminalResponse>,
1996    cx: &mut AsyncApp,
1997    ctx: &ClientContext,
1998) {
1999    let thread = match session_thread(ctx, &args.session_id) {
2000        Ok(t) => t,
2001        Err(e) => return respond_err(responder, e),
2002    };
2003
2004    match thread
2005        .update(cx, |thread, cx| thread.kill_terminal(args.terminal_id, cx))
2006        .map_err(acp::Error::from)
2007        .and_then(|r| r.map_err(acp::Error::from))
2008    {
2009        Ok(()) => {
2010            responder
2011                .respond(acp::KillTerminalResponse::default())
2012                .log_err();
2013        }
2014        Err(e) => respond_err(responder, e),
2015    }
2016}
2017
2018fn handle_release_terminal(
2019    args: acp::ReleaseTerminalRequest,
2020    responder: Responder<acp::ReleaseTerminalResponse>,
2021    cx: &mut AsyncApp,
2022    ctx: &ClientContext,
2023) {
2024    let thread = match session_thread(ctx, &args.session_id) {
2025        Ok(t) => t,
2026        Err(e) => return respond_err(responder, e),
2027    };
2028
2029    match thread
2030        .update(cx, |thread, cx| {
2031            thread.release_terminal(args.terminal_id, cx)
2032        })
2033        .map_err(acp::Error::from)
2034        .and_then(|r| r.map_err(acp::Error::from))
2035    {
2036        Ok(()) => {
2037            responder
2038                .respond(acp::ReleaseTerminalResponse::default())
2039                .log_err();
2040        }
2041        Err(e) => respond_err(responder, e),
2042    }
2043}
2044
2045fn handle_terminal_output(
2046    args: acp::TerminalOutputRequest,
2047    responder: Responder<acp::TerminalOutputResponse>,
2048    cx: &mut AsyncApp,
2049    ctx: &ClientContext,
2050) {
2051    let thread = match session_thread(ctx, &args.session_id) {
2052        Ok(t) => t,
2053        Err(e) => return respond_err(responder, e),
2054    };
2055
2056    match thread
2057        .read_with(cx, |thread, cx| -> Result<_, anyhow::Error> {
2058            let out = thread
2059                .terminal(args.terminal_id)?
2060                .read(cx)
2061                .current_output(cx);
2062            Ok(out)
2063        })
2064        .map_err(acp::Error::from)
2065        .and_then(|r| r.map_err(acp::Error::from))
2066    {
2067        Ok(output) => {
2068            responder.respond(output).log_err();
2069        }
2070        Err(e) => respond_err(responder, e),
2071    }
2072}
2073
2074fn handle_wait_for_terminal_exit(
2075    args: acp::WaitForTerminalExitRequest,
2076    responder: Responder<acp::WaitForTerminalExitResponse>,
2077    cx: &mut AsyncApp,
2078    ctx: &ClientContext,
2079) {
2080    let thread = match session_thread(ctx, &args.session_id) {
2081        Ok(t) => t,
2082        Err(e) => return respond_err(responder, e),
2083    };
2084
2085    cx.spawn(async move |cx| {
2086        let result: Result<_, acp::Error> = async {
2087            let exit_status = thread
2088                .update(cx, |thread, cx| {
2089                    anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
2090                })
2091                .map_err(acp::Error::from)?
2092                .map_err(acp::Error::from)?
2093                .await;
2094            Ok(exit_status)
2095        }
2096        .await;
2097
2098        match result {
2099            Ok(exit_status) => {
2100                responder
2101                    .respond(acp::WaitForTerminalExitResponse::new(exit_status))
2102                    .log_err();
2103            }
2104            Err(e) => respond_err(responder, e),
2105        }
2106    })
2107    .detach();
2108}