1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::schema::{self as acp, ErrorCode};
8use agent_client_protocol::{
9 Agent, Client, ConnectionTo, JsonRpcResponse, Lines, Responder, SentRequest,
10};
11use anyhow::anyhow;
12use collections::HashMap;
13use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
14use futures::channel::mpsc;
15use futures::future::Shared;
16use futures::io::BufReader;
17use futures::{AsyncBufReadExt as _, Future, FutureExt as _, StreamExt as _};
18use project::agent_server_store::{AgentServerCommand, AgentServerStore};
19use project::{AgentId, Project};
20use remote::remote_client::Interactive;
21use serde::Deserialize;
22use std::path::PathBuf;
23use std::process::Stdio;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{any::Any, cell::RefCell};
27use task::{Shell, ShellBuilder, SpawnInTerminal};
28use thiserror::Error;
29use util::ResultExt as _;
30use util::path_list::PathList;
31use util::process::Child;
32
33use anyhow::{Context as _, Result};
34use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
35
36use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
37use terminal::TerminalBuilder;
38use terminal::terminal_settings::{AlternateScroll, CursorShape};
39
40use crate::GEMINI_ID;
41
42pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
43
44/// Awaits the response to an ACP request from a GPUI foreground task.
45///
46/// The ACP SDK offers two ways to consume a [`SentRequest`]:
47/// - [`SentRequest::block_task`]: linear `.await` inside a spawned task.
48/// - [`SentRequest::on_receiving_result`]: a callback invoked when the
49/// response arrives, with the guarantee that no other inbound messages
50/// are processed while the callback runs. This is the recommended form
51/// inside SDK handler callbacks, where [`block_task`] would deadlock.
52///
53/// We use `on_receiving_result` with a oneshot bridge here (rather than
54/// [`block_task`]) so that our handler-side code paths can share a single
55/// request-awaiting helper. The SDK callback itself is trivial (one channel
56/// send) so the extra ordering guarantee it imposes on the dispatch loop is
57/// negligible.
58fn into_foreground_future<T: JsonRpcResponse>(
59 sent: SentRequest<T>,
60) -> impl Future<Output = Result<T, acp::Error>> {
61 let (tx, rx) = futures::channel::oneshot::channel();
62 let spawn_result = sent.on_receiving_result(async move |result| {
63 tx.send(result).ok();
64 Ok(())
65 });
66 async move {
67 spawn_result?;
68 rx.await.map_err(|_| {
69 acp::Error::internal_error()
70 .data("response channel cancelled — connection may have dropped")
71 })?
72 }
73}
74
75#[derive(Debug, Error)]
76#[error("Unsupported version")]
77pub struct UnsupportedVersion;
78
79/// Helper for flattening the nested `Result` shapes that come out of
80/// `entity.update(cx, |_, cx| fallible_op(cx))` into a single `Result<T,
81/// acp::Error>`.
82///
83/// `anyhow::Error` values get converted via `acp::Error::from`, which
84/// downcasts an `acp::Error` back out of `anyhow` when present, so typed
85/// errors like auth-required survive the trip.
86trait FlattenAcpResult<T> {
87 fn flatten_acp(self) -> Result<T, acp::Error>;
88}
89
90impl<T> FlattenAcpResult<T> for Result<Result<T, anyhow::Error>, anyhow::Error> {
91 fn flatten_acp(self) -> Result<T, acp::Error> {
92 match self {
93 Ok(Ok(value)) => Ok(value),
94 Ok(Err(err)) => Err(err.into()),
95 Err(err) => Err(err.into()),
96 }
97 }
98}
99
100impl<T> FlattenAcpResult<T> for Result<Result<T, acp::Error>, anyhow::Error> {
101 fn flatten_acp(self) -> Result<T, acp::Error> {
102 match self {
103 Ok(Ok(value)) => Ok(value),
104 Ok(Err(err)) => Err(err),
105 Err(err) => Err(err.into()),
106 }
107 }
108}
109
110/// Holds state needed by foreground work dispatched from background handler closures.
111struct ClientContext {
112 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
113 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
114}
115
116fn dispatch_queue_closed_error() -> acp::Error {
117 acp::Error::internal_error().data("ACP foreground dispatch queue closed")
118}
119
120/// Work items sent from `Send` handler closures to the `!Send` foreground thread.
121trait ForegroundWorkItem: Send {
122 fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext);
123 fn reject(self: Box<Self>);
124}
125
126type ForegroundWork = Box<dyn ForegroundWorkItem>;
127
128struct RequestForegroundWork<Req, Res>
129where
130 Req: Send + 'static,
131 Res: JsonRpcResponse + Send + 'static,
132{
133 request: Req,
134 responder: Responder<Res>,
135 handler: fn(Req, Responder<Res>, &mut AsyncApp, &ClientContext),
136}
137
138impl<Req, Res> ForegroundWorkItem for RequestForegroundWork<Req, Res>
139where
140 Req: Send + 'static,
141 Res: JsonRpcResponse + Send + 'static,
142{
143 fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext) {
144 let Self {
145 request,
146 responder,
147 handler,
148 } = *self;
149 handler(request, responder, cx, ctx);
150 }
151
152 fn reject(self: Box<Self>) {
153 let Self { responder, .. } = *self;
154 log::error!("ACP foreground dispatch queue closed while handling inbound request");
155 responder
156 .respond_with_error(dispatch_queue_closed_error())
157 .log_err();
158 }
159}
160
161struct NotificationForegroundWork<Notif>
162where
163 Notif: Send + 'static,
164{
165 notification: Notif,
166 connection: ConnectionTo<Agent>,
167 handler: fn(Notif, &mut AsyncApp, &ClientContext),
168}
169
170impl<Notif> ForegroundWorkItem for NotificationForegroundWork<Notif>
171where
172 Notif: Send + 'static,
173{
174 fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext) {
175 let Self {
176 notification,
177 handler,
178 ..
179 } = *self;
180 handler(notification, cx, ctx);
181 }
182
183 fn reject(self: Box<Self>) {
184 let Self { connection, .. } = *self;
185 log::error!("ACP foreground dispatch queue closed while handling inbound notification");
186 connection
187 .send_error_notification(dispatch_queue_closed_error())
188 .log_err();
189 }
190}
191
192fn enqueue_request<Req, Res>(
193 dispatch_tx: &mpsc::UnboundedSender<ForegroundWork>,
194 request: Req,
195 responder: Responder<Res>,
196 handler: fn(Req, Responder<Res>, &mut AsyncApp, &ClientContext),
197) where
198 Req: Send + 'static,
199 Res: JsonRpcResponse + Send + 'static,
200{
201 let work: ForegroundWork = Box::new(RequestForegroundWork {
202 request,
203 responder,
204 handler,
205 });
206 if let Err(err) = dispatch_tx.unbounded_send(work) {
207 err.into_inner().reject();
208 }
209}
210
211fn enqueue_notification<Notif>(
212 dispatch_tx: &mpsc::UnboundedSender<ForegroundWork>,
213 notification: Notif,
214 connection: ConnectionTo<Agent>,
215 handler: fn(Notif, &mut AsyncApp, &ClientContext),
216) where
217 Notif: Send + 'static,
218{
219 let work: ForegroundWork = Box::new(NotificationForegroundWork {
220 notification,
221 connection,
222 handler,
223 });
224 if let Err(err) = dispatch_tx.unbounded_send(work) {
225 err.into_inner().reject();
226 }
227}
228
229pub struct AcpConnection {
230 id: AgentId,
231 telemetry_id: SharedString,
232 connection: ConnectionTo<Agent>,
233 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
234 pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
235 auth_methods: Vec<acp::AuthMethod>,
236 agent_server_store: WeakEntity<AgentServerStore>,
237 agent_capabilities: acp::AgentCapabilities,
238 default_mode: Option<acp::SessionModeId>,
239 default_model: Option<acp::ModelId>,
240 default_config_options: HashMap<String, String>,
241 child: Option<Child>,
242 session_list: Option<Rc<AcpSessionList>>,
243 _io_task: Task<()>,
244 _dispatch_task: Task<()>,
245 _wait_task: Task<Result<()>>,
246 _stderr_task: Task<Result<()>>,
247}
248
249struct PendingAcpSession {
250 task: Shared<Task<Result<Entity<AcpThread>, Arc<anyhow::Error>>>>,
251 ref_count: usize,
252}
253
254struct SessionConfigResponse {
255 modes: Option<acp::SessionModeState>,
256 models: Option<acp::SessionModelState>,
257 config_options: Option<Vec<acp::SessionConfigOption>>,
258}
259
260#[derive(Clone)]
261struct ConfigOptions {
262 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
263 tx: Rc<RefCell<watch::Sender<()>>>,
264 rx: watch::Receiver<()>,
265}
266
267impl ConfigOptions {
268 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
269 let (tx, rx) = watch::channel(());
270 Self {
271 config_options,
272 tx: Rc::new(RefCell::new(tx)),
273 rx,
274 }
275 }
276}
277
278pub struct AcpSession {
279 thread: WeakEntity<AcpThread>,
280 suppress_abort_err: bool,
281 models: Option<Rc<RefCell<acp::SessionModelState>>>,
282 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
283 config_options: Option<ConfigOptions>,
284 ref_count: usize,
285}
286
287pub struct AcpSessionList {
288 connection: ConnectionTo<Agent>,
289 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
290 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
291}
292
293impl AcpSessionList {
294 fn new(connection: ConnectionTo<Agent>) -> Self {
295 let (tx, rx) = smol::channel::unbounded();
296 Self {
297 connection,
298 updates_tx: tx,
299 updates_rx: rx,
300 }
301 }
302
303 fn notify_update(&self) {
304 self.updates_tx
305 .try_send(acp_thread::SessionListUpdate::Refresh)
306 .log_err();
307 }
308
309 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
310 self.updates_tx
311 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
312 .log_err();
313 }
314}
315
316impl AgentSessionList for AcpSessionList {
317 fn list_sessions(
318 &self,
319 request: AgentSessionListRequest,
320 cx: &mut App,
321 ) -> Task<Result<AgentSessionListResponse>> {
322 let conn = self.connection.clone();
323 cx.foreground_executor().spawn(async move {
324 let acp_request = acp::ListSessionsRequest::new()
325 .cwd(request.cwd)
326 .cursor(request.cursor);
327 let response = into_foreground_future(conn.send_request(acp_request))
328 .await
329 .map_err(map_acp_error)?;
330 Ok(AgentSessionListResponse {
331 sessions: response
332 .sessions
333 .into_iter()
334 .map(|s| AgentSessionInfo {
335 session_id: s.session_id,
336 work_dirs: Some(PathList::new(&[s.cwd])),
337 title: s.title.map(Into::into),
338 updated_at: s.updated_at.and_then(|date_str| {
339 chrono::DateTime::parse_from_rfc3339(&date_str)
340 .ok()
341 .map(|dt| dt.with_timezone(&chrono::Utc))
342 }),
343 created_at: None,
344 meta: s.meta,
345 })
346 .collect(),
347 next_cursor: response.next_cursor,
348 meta: response.meta,
349 })
350 })
351 }
352
353 fn watch(
354 &self,
355 _cx: &mut App,
356 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
357 Some(self.updates_rx.clone())
358 }
359
360 fn notify_refresh(&self) {
361 self.notify_update();
362 }
363
364 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
365 self
366 }
367}
368
369pub async fn connect(
370 agent_id: AgentId,
371 project: Entity<Project>,
372 command: AgentServerCommand,
373 agent_server_store: WeakEntity<AgentServerStore>,
374 default_mode: Option<acp::SessionModeId>,
375 default_model: Option<acp::ModelId>,
376 default_config_options: HashMap<String, String>,
377 cx: &mut AsyncApp,
378) -> Result<Rc<dyn AgentConnection>> {
379 let conn = AcpConnection::stdio(
380 agent_id,
381 project,
382 command.clone(),
383 agent_server_store,
384 default_mode,
385 default_model,
386 default_config_options,
387 cx,
388 )
389 .await?;
390 Ok(Rc::new(conn) as _)
391}
392
393const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
394
395/// Build a `Client` connection over `transport` with Zed's full
396/// agent→client handler set wired up.
397///
398/// All incoming requests and notifications are forwarded to the foreground
399/// dispatch queue via `dispatch_tx`, where they are handled by the
400/// `handle_*` functions on a GPUI context. The returned future drives the
401/// connection and completes when the transport closes; callers are expected
402/// to spawn it on a background executor and hold the task for the lifetime
403/// of the connection. The `connection_tx` oneshot receives the
404/// `ConnectionTo<Agent>` handle as soon as the builder runs its `main_fn`.
405fn connect_client_future(
406 name: &'static str,
407 transport: impl agent_client_protocol::ConnectTo<Client> + 'static,
408 dispatch_tx: mpsc::UnboundedSender<ForegroundWork>,
409 connection_tx: futures::channel::oneshot::Sender<ConnectionTo<Agent>>,
410) -> impl Future<Output = Result<(), acp::Error>> {
411 // Each handler forwards its inputs onto the foreground dispatch queue.
412 // The SDK requires the closure to be `Send`, so we move a clone of
413 // `dispatch_tx` into each one.
414 macro_rules! on_request {
415 ($handler:ident) => {{
416 let dispatch_tx = dispatch_tx.clone();
417 async move |req, responder, _connection| {
418 enqueue_request(&dispatch_tx, req, responder, $handler);
419 Ok(())
420 }
421 }};
422 }
423 macro_rules! on_notification {
424 ($handler:ident) => {{
425 let dispatch_tx = dispatch_tx.clone();
426 async move |notif, connection| {
427 enqueue_notification(&dispatch_tx, notif, connection, $handler);
428 Ok(())
429 }
430 }};
431 }
432
433 Client
434 .builder()
435 .name(name)
436 // --- Request handlers (agent→client) ---
437 .on_receive_request(
438 on_request!(handle_request_permission),
439 agent_client_protocol::on_receive_request!(),
440 )
441 .on_receive_request(
442 on_request!(handle_write_text_file),
443 agent_client_protocol::on_receive_request!(),
444 )
445 .on_receive_request(
446 on_request!(handle_read_text_file),
447 agent_client_protocol::on_receive_request!(),
448 )
449 .on_receive_request(
450 on_request!(handle_create_terminal),
451 agent_client_protocol::on_receive_request!(),
452 )
453 .on_receive_request(
454 on_request!(handle_kill_terminal),
455 agent_client_protocol::on_receive_request!(),
456 )
457 .on_receive_request(
458 on_request!(handle_release_terminal),
459 agent_client_protocol::on_receive_request!(),
460 )
461 .on_receive_request(
462 on_request!(handle_terminal_output),
463 agent_client_protocol::on_receive_request!(),
464 )
465 .on_receive_request(
466 on_request!(handle_wait_for_terminal_exit),
467 agent_client_protocol::on_receive_request!(),
468 )
469 // --- Notification handlers (agent→client) ---
470 .on_receive_notification(
471 on_notification!(handle_session_notification),
472 agent_client_protocol::on_receive_notification!(),
473 )
474 .connect_with(
475 transport,
476 move |connection: ConnectionTo<Agent>| async move {
477 if connection_tx.send(connection).is_err() {
478 log::error!("failed to send ACP connection handle — receiver was dropped");
479 }
480 // Keep the connection alive until the transport closes.
481 futures::future::pending::<Result<(), acp::Error>>().await
482 },
483 )
484}
485
486impl AcpConnection {
487 pub async fn stdio(
488 agent_id: AgentId,
489 project: Entity<Project>,
490 command: AgentServerCommand,
491 agent_server_store: WeakEntity<AgentServerStore>,
492 default_mode: Option<acp::SessionModeId>,
493 default_model: Option<acp::ModelId>,
494 default_config_options: HashMap<String, String>,
495 cx: &mut AsyncApp,
496 ) -> Result<Self> {
497 let root_dir = project.read_with(cx, |project, cx| {
498 project
499 .default_path_list(cx)
500 .ordered_paths()
501 .next()
502 .cloned()
503 });
504 let original_command = command.clone();
505 let (path, args, env) = project
506 .read_with(cx, |project, cx| {
507 project.remote_client().and_then(|client| {
508 let template = client
509 .read(cx)
510 .build_command_with_options(
511 Some(command.path.display().to_string()),
512 &command.args,
513 &command.env.clone().into_iter().flatten().collect(),
514 root_dir.as_ref().map(|path| path.display().to_string()),
515 None,
516 Interactive::No,
517 )
518 .log_err()?;
519 Some((template.program, template.args, template.env))
520 })
521 })
522 .unwrap_or_else(|| {
523 (
524 command.path.display().to_string(),
525 command.args,
526 command.env.unwrap_or_default(),
527 )
528 });
529
530 let builder = ShellBuilder::new(&Shell::System, cfg!(windows)).non_interactive();
531 let mut child = builder.build_std_command(Some(path.clone()), &args);
532 child.envs(env.clone());
533 if let Some(cwd) = project.read_with(cx, |project, _cx| {
534 if project.is_local() {
535 root_dir.as_ref()
536 } else {
537 None
538 }
539 }) {
540 child.current_dir(cwd);
541 }
542 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
543
544 let stdout = child.stdout.take().context("Failed to take stdout")?;
545 let stdin = child.stdin.take().context("Failed to take stdin")?;
546 let stderr = child.stderr.take().context("Failed to take stderr")?;
547 log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
548 log::trace!("Spawned (pid: {})", child.id());
549
550 let sessions = Rc::new(RefCell::new(HashMap::default()));
551
552 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
553 (
554 release_channel::ReleaseChannel::try_global(cx)
555 .map(|release_channel| release_channel.display_name()),
556 release_channel::AppVersion::global(cx).to_string(),
557 )
558 });
559
560 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
561 Rc::new(RefCell::new(None));
562
563 // Set up the foreground dispatch channel for bridging Send handler
564 // closures to the !Send foreground thread.
565 let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
566
567 // Register this connection with the logs panel registry. The
568 // returned tap is opt-in: until someone subscribes to the ACP logs
569 // panel, `emit_*` calls below are ~free (atomic load + return).
570 let log_tap = cx.update(|cx| {
571 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
572 registry.set_active_connection(agent_id.clone(), cx)
573 })
574 });
575
576 let incoming_lines = futures::io::BufReader::new(stdout).lines();
577 let tapped_incoming = incoming_lines.inspect({
578 let log_tap = log_tap.clone();
579 move |result| match result {
580 Ok(line) => log_tap.emit_incoming(line),
581 Err(err) => {
582 // I/O errors on the transport are fatal for the SDK, but
583 // without logging them the ACP logs panel shows no trace
584 // of why the connection died.
585 log::warn!("ACP transport read error: {err}");
586 }
587 }
588 });
589
590 let tapped_outgoing = futures::sink::unfold(
591 (Box::pin(stdin), log_tap.clone()),
592 async move |(mut writer, log_tap), line: String| {
593 use futures::AsyncWriteExt;
594 log_tap.emit_outgoing(&line);
595 let mut bytes = line.into_bytes();
596 bytes.push(b'\n');
597 writer.write_all(&bytes).await?;
598 Ok::<_, std::io::Error>((writer, log_tap))
599 },
600 );
601
602 let transport = Lines::new(tapped_outgoing, tapped_incoming);
603
604 // `connect_client_future` installs the production handler set and
605 // hands us back both the connection-future (to run on a background
606 // executor) and a oneshot receiver that produces the
607 // `ConnectionTo<Agent>` once the transport handshake is ready.
608 let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
609 let connection_future =
610 connect_client_future("zed", transport, dispatch_tx.clone(), connection_tx);
611 let io_task = cx.background_spawn(async move {
612 if let Err(err) = connection_future.await {
613 log::error!("ACP connection error: {err}");
614 }
615 });
616
617 let connection: ConnectionTo<Agent> = connection_rx
618 .await
619 .context("Failed to receive ACP connection handle")?;
620
621 // Set up the foreground dispatch loop to process work items from handlers.
622 let dispatch_context = ClientContext {
623 sessions: sessions.clone(),
624 session_list: client_session_list.clone(),
625 };
626 let dispatch_task = cx.spawn({
627 let mut dispatch_rx = dispatch_rx;
628 async move |cx| {
629 while let Some(work) = dispatch_rx.next().await {
630 work.run(cx, &dispatch_context);
631 }
632 }
633 });
634
635 let stderr_task = cx.background_spawn({
636 let log_tap = log_tap.clone();
637 async move {
638 let mut stderr = BufReader::new(stderr);
639 let mut line = String::new();
640 while let Ok(n) = stderr.read_line(&mut line).await
641 && n > 0
642 {
643 let trimmed = line.trim_end_matches(['\n', '\r']);
644 log::warn!("agent stderr: {trimmed}");
645 log_tap.emit_stderr(trimmed);
646 line.clear();
647 }
648 Ok(())
649 }
650 });
651
652 let wait_task = cx.spawn({
653 let sessions = sessions.clone();
654 let status_fut = child.status();
655 async move |cx| {
656 let status = status_fut.await?;
657 emit_load_error_to_all_sessions(&sessions, LoadError::Exited { status }, cx);
658 anyhow::Ok(())
659 }
660 });
661
662 let response = into_foreground_future(
663 connection.send_request(
664 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
665 .client_capabilities(
666 acp::ClientCapabilities::new()
667 .fs(acp::FileSystemCapabilities::new()
668 .read_text_file(true)
669 .write_text_file(true))
670 .terminal(true)
671 .auth(acp::AuthCapabilities::new().terminal(true))
672 .meta(acp::Meta::from_iter([
673 ("terminal_output".into(), true.into()),
674 ("terminal-auth".into(), true.into()),
675 ])),
676 )
677 .client_info(
678 acp::Implementation::new("zed", version)
679 .title(release_channel.map(ToOwned::to_owned)),
680 ),
681 ),
682 )
683 .await?;
684
685 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
686 return Err(UnsupportedVersion.into());
687 }
688
689 let telemetry_id = response
690 .agent_info
691 // Use the one the agent provides if we have one
692 .map(|info| info.name.into())
693 // Otherwise, just use the name
694 .unwrap_or_else(|| agent_id.0.clone());
695
696 let session_list = if response
697 .agent_capabilities
698 .session_capabilities
699 .list
700 .is_some()
701 {
702 let list = Rc::new(AcpSessionList::new(connection.clone()));
703 *client_session_list.borrow_mut() = Some(list.clone());
704 Some(list)
705 } else {
706 None
707 };
708
709 // TODO: Remove this override once Google team releases their official auth methods
710 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
711 let mut gemini_args = original_command.args.clone();
712 gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
713 let value = serde_json::json!({
714 "label": "gemini /auth",
715 "command": original_command.path.to_string_lossy(),
716 "args": gemini_args,
717 "env": original_command.env.unwrap_or_default(),
718 });
719 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
720 vec![acp::AuthMethod::Agent(
721 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
722 .description("Login with your Google or Vertex AI account")
723 .meta(meta),
724 )]
725 } else {
726 response.auth_methods
727 };
728 Ok(Self {
729 id: agent_id,
730 auth_methods,
731 agent_server_store,
732 connection,
733 telemetry_id,
734 sessions,
735 pending_sessions: Rc::new(RefCell::new(HashMap::default())),
736 agent_capabilities: response.agent_capabilities,
737 default_mode,
738 default_model,
739 default_config_options,
740 session_list,
741 _io_task: io_task,
742 _dispatch_task: dispatch_task,
743 _wait_task: wait_task,
744 _stderr_task: stderr_task,
745 child: Some(child),
746 })
747 }
748
749 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
750 &self.agent_capabilities.prompt_capabilities
751 }
752
753 #[cfg(any(test, feature = "test-support"))]
754 fn new_for_test(
755 connection: ConnectionTo<Agent>,
756 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
757 agent_capabilities: acp::AgentCapabilities,
758 agent_server_store: WeakEntity<AgentServerStore>,
759 io_task: Task<()>,
760 dispatch_task: Task<()>,
761 _cx: &mut App,
762 ) -> Self {
763 Self {
764 id: AgentId::new("test"),
765 telemetry_id: "test".into(),
766 connection,
767 sessions,
768 pending_sessions: Rc::new(RefCell::new(HashMap::default())),
769 auth_methods: vec![],
770 agent_server_store,
771 agent_capabilities,
772 default_mode: None,
773 default_model: None,
774 default_config_options: HashMap::default(),
775 child: None,
776 session_list: None,
777 _io_task: io_task,
778 _dispatch_task: dispatch_task,
779 _wait_task: Task::ready(Ok(())),
780 _stderr_task: Task::ready(Ok(())),
781 }
782 }
783
784 fn open_or_create_session(
785 self: Rc<Self>,
786 session_id: acp::SessionId,
787 project: Entity<Project>,
788 work_dirs: PathList,
789 title: Option<SharedString>,
790 rpc_call: impl FnOnce(
791 ConnectionTo<Agent>,
792 acp::SessionId,
793 PathBuf,
794 )
795 -> futures::future::LocalBoxFuture<'static, Result<SessionConfigResponse>>
796 + 'static,
797 cx: &mut App,
798 ) -> Task<Result<Entity<AcpThread>>> {
799 // Check `pending_sessions` before `sessions` because the session is now
800 // inserted into `sessions` before the load RPC completes (so that
801 // notifications dispatched during history replay can find the thread).
802 // Concurrent loads should still wait for the in-flight task so that
803 // ref-counting happens in one place and the caller sees a fully loaded
804 // session.
805 if let Some(pending) = self.pending_sessions.borrow_mut().get_mut(&session_id) {
806 pending.ref_count += 1;
807 let task = pending.task.clone();
808 return cx
809 .foreground_executor()
810 .spawn(async move { task.await.map_err(|err| anyhow!(err)) });
811 }
812
813 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
814 session.ref_count += 1;
815 if let Some(thread) = session.thread.upgrade() {
816 return Task::ready(Ok(thread));
817 }
818 }
819
820 // TODO: remove this once ACP supports multiple working directories
821 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
822 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
823 };
824
825 let shared_task = cx
826 .spawn({
827 let session_id = session_id.clone();
828 let this = self.clone();
829 async move |cx| {
830 let action_log = cx.new(|_| ActionLog::new(project.clone()));
831 let thread: Entity<AcpThread> = cx.new(|cx| {
832 AcpThread::new(
833 None,
834 title,
835 Some(work_dirs),
836 this.clone(),
837 project,
838 action_log,
839 session_id.clone(),
840 watch::Receiver::constant(
841 this.agent_capabilities.prompt_capabilities.clone(),
842 ),
843 cx,
844 )
845 });
846
847 // Register the session before awaiting the RPC so that any
848 // `session/update` notifications that arrive during the call
849 // (e.g. history replay during `session/load`) can find the thread.
850 // Modes/models/config are filled in once the response arrives.
851 this.sessions.borrow_mut().insert(
852 session_id.clone(),
853 AcpSession {
854 thread: thread.downgrade(),
855 suppress_abort_err: false,
856 session_modes: None,
857 models: None,
858 config_options: None,
859 ref_count: 1,
860 },
861 );
862
863 let response =
864 match rpc_call(this.connection.clone(), session_id.clone(), cwd).await {
865 Ok(response) => response,
866 Err(err) => {
867 this.sessions.borrow_mut().remove(&session_id);
868 this.pending_sessions.borrow_mut().remove(&session_id);
869 return Err(Arc::new(err));
870 }
871 };
872
873 let (modes, models, config_options) =
874 config_state(response.modes, response.models, response.config_options);
875
876 if let Some(config_opts) = config_options.as_ref() {
877 this.apply_default_config_options(&session_id, config_opts, cx);
878 }
879
880 let ref_count = this
881 .pending_sessions
882 .borrow_mut()
883 .remove(&session_id)
884 .map_or(1, |pending| pending.ref_count);
885
886 // If `close_session` ran to completion while the load RPC was in
887 // flight, it will have removed both the pending entry and the
888 // sessions entry (and dispatched the ACP close RPC). In that case
889 // the thread has no live session to attach to, so fail the load
890 // instead of handing back an orphaned thread.
891 {
892 let mut sessions = this.sessions.borrow_mut();
893 let Some(session) = sessions.get_mut(&session_id) else {
894 return Err(Arc::new(anyhow!(
895 "session was closed before load completed"
896 )));
897 };
898 session.session_modes = modes;
899 session.models = models;
900 session.config_options = config_options.map(ConfigOptions::new);
901 session.ref_count = ref_count;
902 }
903
904 Ok(thread)
905 }
906 })
907 .shared();
908
909 self.pending_sessions.borrow_mut().insert(
910 session_id,
911 PendingAcpSession {
912 task: shared_task.clone(),
913 ref_count: 1,
914 },
915 );
916
917 cx.foreground_executor()
918 .spawn(async move { shared_task.await.map_err(|err| anyhow!(err)) })
919 }
920
921 fn apply_default_config_options(
922 &self,
923 session_id: &acp::SessionId,
924 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
925 cx: &mut AsyncApp,
926 ) {
927 let id = self.id.clone();
928 let defaults_to_apply: Vec<_> = {
929 let config_opts_ref = config_options.borrow();
930 config_opts_ref
931 .iter()
932 .filter_map(|config_option| {
933 let default_value = self.default_config_options.get(&*config_option.id.0)?;
934
935 let is_valid = match &config_option.kind {
936 acp::SessionConfigKind::Select(select) => match &select.options {
937 acp::SessionConfigSelectOptions::Ungrouped(options) => options
938 .iter()
939 .any(|opt| &*opt.value.0 == default_value.as_str()),
940 acp::SessionConfigSelectOptions::Grouped(groups) => {
941 groups.iter().any(|g| {
942 g.options
943 .iter()
944 .any(|opt| &*opt.value.0 == default_value.as_str())
945 })
946 }
947 _ => false,
948 },
949 _ => false,
950 };
951
952 if is_valid {
953 let initial_value = match &config_option.kind {
954 acp::SessionConfigKind::Select(select) => {
955 Some(select.current_value.clone())
956 }
957 _ => None,
958 };
959 Some((
960 config_option.id.clone(),
961 default_value.clone(),
962 initial_value,
963 ))
964 } else {
965 log::warn!(
966 "`{}` is not a valid value for config option `{}` in {}",
967 default_value,
968 config_option.id.0,
969 id
970 );
971 None
972 }
973 })
974 .collect()
975 };
976
977 for (config_id, default_value, initial_value) in defaults_to_apply {
978 cx.spawn({
979 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
980 let session_id = session_id.clone();
981 let config_id_clone = config_id.clone();
982 let config_opts = config_options.clone();
983 let conn = self.connection.clone();
984 async move |_| {
985 let result = into_foreground_future(conn.send_request(
986 acp::SetSessionConfigOptionRequest::new(
987 session_id,
988 config_id_clone.clone(),
989 default_value_id,
990 ),
991 ))
992 .await
993 .log_err();
994
995 if result.is_none() {
996 if let Some(initial) = initial_value {
997 let mut opts = config_opts.borrow_mut();
998 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
999 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
1000 select.current_value = initial;
1001 }
1002 }
1003 }
1004 }
1005 }
1006 })
1007 .detach();
1008
1009 let mut opts = config_options.borrow_mut();
1010 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
1011 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
1012 select.current_value = acp::SessionConfigValueId::new(default_value);
1013 }
1014 }
1015 }
1016 }
1017}
1018
1019fn emit_load_error_to_all_sessions(
1020 sessions: &Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1021 error: LoadError,
1022 cx: &mut AsyncApp,
1023) {
1024 let threads: Vec<_> = sessions
1025 .borrow()
1026 .values()
1027 .map(|session| session.thread.clone())
1028 .collect();
1029
1030 for thread in threads {
1031 thread
1032 .update(cx, |thread, cx| thread.emit_load_error(error.clone(), cx))
1033 .ok();
1034 }
1035}
1036
1037impl Drop for AcpConnection {
1038 fn drop(&mut self) {
1039 if let Some(ref mut child) = self.child {
1040 child.kill().log_err();
1041 }
1042 }
1043}
1044
1045fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
1046 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
1047}
1048
1049fn terminal_auth_task(
1050 command: &AgentServerCommand,
1051 agent_id: &AgentId,
1052 method: &acp::AuthMethodTerminal,
1053) -> SpawnInTerminal {
1054 acp_thread::build_terminal_auth_task(
1055 terminal_auth_task_id(agent_id, &method.id),
1056 method.name.clone(),
1057 command.path.to_string_lossy().into_owned(),
1058 command.args.clone(),
1059 command.env.clone().unwrap_or_default(),
1060 )
1061}
1062
1063/// Used to support the _meta method prior to stabilization
1064fn meta_terminal_auth_task(
1065 agent_id: &AgentId,
1066 method_id: &acp::AuthMethodId,
1067 method: &acp::AuthMethod,
1068) -> Option<SpawnInTerminal> {
1069 #[derive(Deserialize)]
1070 struct MetaTerminalAuth {
1071 label: String,
1072 command: String,
1073 #[serde(default)]
1074 args: Vec<String>,
1075 #[serde(default)]
1076 env: HashMap<String, String>,
1077 }
1078
1079 let meta = match method {
1080 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
1081 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
1082 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
1083 _ => None,
1084 }?;
1085 let terminal_auth =
1086 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
1087
1088 Some(acp_thread::build_terminal_auth_task(
1089 terminal_auth_task_id(agent_id, method_id),
1090 terminal_auth.label.clone(),
1091 terminal_auth.command,
1092 terminal_auth.args,
1093 terminal_auth.env,
1094 ))
1095}
1096
1097impl AgentConnection for AcpConnection {
1098 fn agent_id(&self) -> AgentId {
1099 self.id.clone()
1100 }
1101
1102 fn telemetry_id(&self) -> SharedString {
1103 self.telemetry_id.clone()
1104 }
1105
1106 fn new_session(
1107 self: Rc<Self>,
1108 project: Entity<Project>,
1109 work_dirs: PathList,
1110 cx: &mut App,
1111 ) -> Task<Result<Entity<AcpThread>>> {
1112 // TODO: remove this once ACP supports multiple working directories
1113 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
1114 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
1115 };
1116 let name = self.id.0.clone();
1117 let mcp_servers = mcp_servers_for_project(&project, cx);
1118
1119 cx.spawn(async move |cx| {
1120 let response = into_foreground_future(
1121 self.connection
1122 .send_request(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers)),
1123 )
1124 .await
1125 .map_err(map_acp_error)?;
1126
1127 let (modes, models, config_options) =
1128 config_state(response.modes, response.models, response.config_options);
1129
1130 if let Some(default_mode) = self.default_mode.clone() {
1131 if let Some(modes) = modes.as_ref() {
1132 let mut modes_ref = modes.borrow_mut();
1133 let has_mode = modes_ref
1134 .available_modes
1135 .iter()
1136 .any(|mode| mode.id == default_mode);
1137
1138 if has_mode {
1139 let initial_mode_id = modes_ref.current_mode_id.clone();
1140
1141 cx.spawn({
1142 let default_mode = default_mode.clone();
1143 let session_id = response.session_id.clone();
1144 let modes = modes.clone();
1145 let conn = self.connection.clone();
1146 async move |_| {
1147 let result = into_foreground_future(
1148 conn.send_request(acp::SetSessionModeRequest::new(
1149 session_id,
1150 default_mode,
1151 )),
1152 )
1153 .await
1154 .log_err();
1155
1156 if result.is_none() {
1157 modes.borrow_mut().current_mode_id = initial_mode_id;
1158 }
1159 }
1160 })
1161 .detach();
1162
1163 modes_ref.current_mode_id = default_mode;
1164 } else {
1165 let available_modes = modes_ref
1166 .available_modes
1167 .iter()
1168 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
1169 .collect::<Vec<_>>()
1170 .join("\n");
1171
1172 log::warn!(
1173 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
1174 );
1175 }
1176 }
1177 }
1178
1179 if let Some(default_model) = self.default_model.clone() {
1180 if let Some(models) = models.as_ref() {
1181 let mut models_ref = models.borrow_mut();
1182 let has_model = models_ref
1183 .available_models
1184 .iter()
1185 .any(|model| model.model_id == default_model);
1186
1187 if has_model {
1188 let initial_model_id = models_ref.current_model_id.clone();
1189
1190 cx.spawn({
1191 let default_model = default_model.clone();
1192 let session_id = response.session_id.clone();
1193 let models = models.clone();
1194 let conn = self.connection.clone();
1195 async move |_| {
1196 let result = into_foreground_future(
1197 conn.send_request(acp::SetSessionModelRequest::new(
1198 session_id,
1199 default_model,
1200 )),
1201 )
1202 .await
1203 .log_err();
1204
1205 if result.is_none() {
1206 models.borrow_mut().current_model_id = initial_model_id;
1207 }
1208 }
1209 })
1210 .detach();
1211
1212 models_ref.current_model_id = default_model;
1213 } else {
1214 let available_models = models_ref
1215 .available_models
1216 .iter()
1217 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
1218 .collect::<Vec<_>>()
1219 .join("\n");
1220
1221 log::warn!(
1222 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
1223 );
1224 }
1225 }
1226 }
1227
1228 if let Some(config_opts) = config_options.as_ref() {
1229 self.apply_default_config_options(&response.session_id, config_opts, cx);
1230 }
1231
1232 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1233 let thread: Entity<AcpThread> = cx.new(|cx| {
1234 AcpThread::new(
1235 None,
1236 None,
1237 Some(work_dirs),
1238 self.clone(),
1239 project,
1240 action_log,
1241 response.session_id.clone(),
1242 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
1243 watch::Receiver::constant(
1244 self.agent_capabilities.prompt_capabilities.clone(),
1245 ),
1246 cx,
1247 )
1248 });
1249
1250 self.sessions.borrow_mut().insert(
1251 response.session_id,
1252 AcpSession {
1253 thread: thread.downgrade(),
1254 suppress_abort_err: false,
1255 session_modes: modes,
1256 models,
1257 config_options: config_options.map(ConfigOptions::new),
1258 ref_count: 1,
1259 },
1260 );
1261
1262 Ok(thread)
1263 })
1264 }
1265
1266 fn supports_load_session(&self) -> bool {
1267 self.agent_capabilities.load_session
1268 }
1269
1270 fn supports_resume_session(&self) -> bool {
1271 self.agent_capabilities
1272 .session_capabilities
1273 .resume
1274 .is_some()
1275 }
1276
1277 fn load_session(
1278 self: Rc<Self>,
1279 session_id: acp::SessionId,
1280 project: Entity<Project>,
1281 work_dirs: PathList,
1282 title: Option<SharedString>,
1283 cx: &mut App,
1284 ) -> Task<Result<Entity<AcpThread>>> {
1285 if !self.agent_capabilities.load_session {
1286 return Task::ready(Err(anyhow!(LoadError::Other(
1287 "Loading sessions is not supported by this agent.".into()
1288 ))));
1289 }
1290
1291 let mcp_servers = mcp_servers_for_project(&project, cx);
1292 self.open_or_create_session(
1293 session_id,
1294 project,
1295 work_dirs,
1296 title,
1297 move |connection, session_id, cwd| {
1298 Box::pin(async move {
1299 let response = into_foreground_future(
1300 connection.send_request(
1301 acp::LoadSessionRequest::new(session_id.clone(), cwd)
1302 .mcp_servers(mcp_servers),
1303 ),
1304 )
1305 .await
1306 .map_err(map_acp_error)?;
1307 Ok(SessionConfigResponse {
1308 modes: response.modes,
1309 models: response.models,
1310 config_options: response.config_options,
1311 })
1312 })
1313 },
1314 cx,
1315 )
1316 }
1317
1318 fn resume_session(
1319 self: Rc<Self>,
1320 session_id: acp::SessionId,
1321 project: Entity<Project>,
1322 work_dirs: PathList,
1323 title: Option<SharedString>,
1324 cx: &mut App,
1325 ) -> Task<Result<Entity<AcpThread>>> {
1326 if self
1327 .agent_capabilities
1328 .session_capabilities
1329 .resume
1330 .is_none()
1331 {
1332 return Task::ready(Err(anyhow!(LoadError::Other(
1333 "Resuming sessions is not supported by this agent.".into()
1334 ))));
1335 }
1336
1337 let mcp_servers = mcp_servers_for_project(&project, cx);
1338 self.open_or_create_session(
1339 session_id,
1340 project,
1341 work_dirs,
1342 title,
1343 move |connection, session_id, cwd| {
1344 Box::pin(async move {
1345 let response = into_foreground_future(
1346 connection.send_request(
1347 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
1348 .mcp_servers(mcp_servers),
1349 ),
1350 )
1351 .await
1352 .map_err(map_acp_error)?;
1353 Ok(SessionConfigResponse {
1354 modes: response.modes,
1355 models: response.models,
1356 config_options: response.config_options,
1357 })
1358 })
1359 },
1360 cx,
1361 )
1362 }
1363
1364 fn supports_close_session(&self) -> bool {
1365 self.agent_capabilities.session_capabilities.close.is_some()
1366 }
1367
1368 fn close_session(
1369 self: Rc<Self>,
1370 session_id: &acp::SessionId,
1371 cx: &mut App,
1372 ) -> Task<Result<()>> {
1373 if !self.supports_close_session() {
1374 return Task::ready(Err(anyhow!(LoadError::Other(
1375 "Closing sessions is not supported by this agent.".into()
1376 ))));
1377 }
1378
1379 // If a load is still in flight, decrement its ref count. The pending
1380 // entry is the source of truth for how many handles exist during a
1381 // load, so we must tick it down here as well as the `sessions` entry
1382 // that was pre-registered to receive history-replay notifications.
1383 // Only once the pending ref count hits zero do we actually close the
1384 // session; the load task will observe the missing sessions entry and
1385 // fail with "session was closed before load completed".
1386 let pending_ref_count = {
1387 let mut pending_sessions = self.pending_sessions.borrow_mut();
1388 pending_sessions.get_mut(session_id).map(|pending| {
1389 pending.ref_count = pending.ref_count.saturating_sub(1);
1390 pending.ref_count
1391 })
1392 };
1393 match pending_ref_count {
1394 Some(0) => {
1395 self.pending_sessions.borrow_mut().remove(session_id);
1396 self.sessions.borrow_mut().remove(session_id);
1397
1398 let conn = self.connection.clone();
1399 let session_id = session_id.clone();
1400 return cx.foreground_executor().spawn(async move {
1401 into_foreground_future(
1402 conn.send_request(acp::CloseSessionRequest::new(session_id)),
1403 )
1404 .await?;
1405 Ok(())
1406 });
1407 }
1408 Some(_) => return Task::ready(Ok(())),
1409 None => {}
1410 }
1411
1412 let mut sessions = self.sessions.borrow_mut();
1413 let Some(session) = sessions.get_mut(session_id) else {
1414 return Task::ready(Ok(()));
1415 };
1416
1417 session.ref_count = session.ref_count.saturating_sub(1);
1418 if session.ref_count > 0 {
1419 return Task::ready(Ok(()));
1420 }
1421
1422 sessions.remove(session_id);
1423 drop(sessions);
1424
1425 let conn = self.connection.clone();
1426 let session_id = session_id.clone();
1427 cx.foreground_executor().spawn(async move {
1428 into_foreground_future(
1429 conn.send_request(acp::CloseSessionRequest::new(session_id.clone())),
1430 )
1431 .await?;
1432 Ok(())
1433 })
1434 }
1435
1436 fn auth_methods(&self) -> &[acp::AuthMethod] {
1437 &self.auth_methods
1438 }
1439
1440 fn terminal_auth_task(
1441 &self,
1442 method_id: &acp::AuthMethodId,
1443 cx: &App,
1444 ) -> Option<Task<Result<SpawnInTerminal>>> {
1445 let method = self
1446 .auth_methods
1447 .iter()
1448 .find(|method| method.id() == method_id)?;
1449
1450 match method {
1451 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1452 let agent_id = self.id.clone();
1453 let terminal = terminal.clone();
1454 let store = self.agent_server_store.clone();
1455 Some(cx.spawn(async move |cx| {
1456 let command = store
1457 .update(cx, |store, cx| {
1458 let agent = store
1459 .get_external_agent(&agent_id)
1460 .context("Agent server not found")?;
1461 anyhow::Ok(agent.get_command(
1462 terminal.args.clone(),
1463 HashMap::from_iter(terminal.env.clone()),
1464 &mut cx.to_async(),
1465 ))
1466 })?
1467 .context("Failed to get agent command")?
1468 .await?;
1469 Ok(terminal_auth_task(&command, &agent_id, &terminal))
1470 }))
1471 }
1472 _ => meta_terminal_auth_task(&self.id, method_id, method)
1473 .map(|task| Task::ready(Ok(task))),
1474 }
1475 }
1476
1477 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1478 let conn = self.connection.clone();
1479 cx.foreground_executor().spawn(async move {
1480 into_foreground_future(conn.send_request(acp::AuthenticateRequest::new(method_id)))
1481 .await?;
1482 Ok(())
1483 })
1484 }
1485
1486 fn prompt(
1487 &self,
1488 _id: acp_thread::UserMessageId,
1489 params: acp::PromptRequest,
1490 cx: &mut App,
1491 ) -> Task<Result<acp::PromptResponse>> {
1492 let conn = self.connection.clone();
1493 let sessions = self.sessions.clone();
1494 let session_id = params.session_id.clone();
1495 cx.foreground_executor().spawn(async move {
1496 let result = into_foreground_future(conn.send_request(params)).await;
1497
1498 let mut suppress_abort_err = false;
1499
1500 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1501 suppress_abort_err = session.suppress_abort_err;
1502 session.suppress_abort_err = false;
1503 }
1504
1505 match result {
1506 Ok(response) => Ok(response),
1507 Err(err) => {
1508 if err.code == acp::ErrorCode::AuthRequired {
1509 return Err(anyhow!(acp::Error::auth_required()));
1510 }
1511
1512 if err.code != ErrorCode::InternalError {
1513 anyhow::bail!(err)
1514 }
1515
1516 let Some(data) = &err.data else {
1517 anyhow::bail!(err)
1518 };
1519
1520 // Temporary workaround until the following PR is generally available:
1521 // https://github.com/google-gemini/gemini-cli/pull/6656
1522
1523 #[derive(Deserialize)]
1524 #[serde(deny_unknown_fields)]
1525 struct ErrorDetails {
1526 details: Box<str>,
1527 }
1528
1529 match serde_json::from_value(data.clone()) {
1530 Ok(ErrorDetails { details }) => {
1531 if suppress_abort_err
1532 && (details.contains("This operation was aborted")
1533 || details.contains("The user aborted a request"))
1534 {
1535 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1536 } else {
1537 Err(anyhow!(details))
1538 }
1539 }
1540 Err(_) => Err(anyhow!(err)),
1541 }
1542 }
1543 }
1544 })
1545 }
1546
1547 fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) {
1548 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1549 session.suppress_abort_err = true;
1550 }
1551 let params = acp::CancelNotification::new(session_id.clone());
1552 self.connection.send_notification(params).log_err();
1553 }
1554
1555 fn session_modes(
1556 &self,
1557 session_id: &acp::SessionId,
1558 _cx: &App,
1559 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1560 let sessions = self.sessions.clone();
1561 let sessions_ref = sessions.borrow();
1562 let Some(session) = sessions_ref.get(session_id) else {
1563 return None;
1564 };
1565
1566 if let Some(modes) = session.session_modes.as_ref() {
1567 Some(Rc::new(AcpSessionModes {
1568 connection: self.connection.clone(),
1569 session_id: session_id.clone(),
1570 state: modes.clone(),
1571 }) as _)
1572 } else {
1573 None
1574 }
1575 }
1576
1577 fn model_selector(
1578 &self,
1579 session_id: &acp::SessionId,
1580 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1581 let sessions = self.sessions.clone();
1582 let sessions_ref = sessions.borrow();
1583 let Some(session) = sessions_ref.get(session_id) else {
1584 return None;
1585 };
1586
1587 if let Some(models) = session.models.as_ref() {
1588 Some(Rc::new(AcpModelSelector::new(
1589 session_id.clone(),
1590 self.connection.clone(),
1591 models.clone(),
1592 )) as _)
1593 } else {
1594 None
1595 }
1596 }
1597
1598 fn session_config_options(
1599 &self,
1600 session_id: &acp::SessionId,
1601 _cx: &App,
1602 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1603 let sessions = self.sessions.borrow();
1604 let session = sessions.get(session_id)?;
1605
1606 let config_opts = session.config_options.as_ref()?;
1607
1608 Some(Rc::new(AcpSessionConfigOptions {
1609 session_id: session_id.clone(),
1610 connection: self.connection.clone(),
1611 state: config_opts.config_options.clone(),
1612 watch_tx: config_opts.tx.clone(),
1613 watch_rx: config_opts.rx.clone(),
1614 }) as _)
1615 }
1616
1617 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1618 self.session_list.clone().map(|s| s as _)
1619 }
1620
1621 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1622 self
1623 }
1624}
1625
1626fn map_acp_error(err: acp::Error) -> anyhow::Error {
1627 if err.code == acp::ErrorCode::AuthRequired {
1628 let mut error = AuthRequired::new();
1629
1630 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1631 error = error.with_description(err.message);
1632 }
1633
1634 anyhow!(error)
1635 } else {
1636 anyhow!(err)
1637 }
1638}
1639
1640#[cfg(any(test, feature = "test-support"))]
1641pub mod test_support {
1642 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1643
1644 use acp_thread::{
1645 AgentModelSelector, AgentSessionConfigOptions, AgentSessionModes, AgentSessionRetry,
1646 AgentSessionSetTitle, AgentSessionTruncate, AgentTelemetry, UserMessageId,
1647 };
1648
1649 use super::*;
1650
1651 #[derive(Clone, Default)]
1652 pub struct FakeAcpAgentServer {
1653 load_session_count: Arc<AtomicUsize>,
1654 close_session_count: Arc<AtomicUsize>,
1655 fail_next_prompt: Arc<AtomicBool>,
1656 exit_status_sender:
1657 Arc<std::sync::Mutex<Option<smol::channel::Sender<std::process::ExitStatus>>>>,
1658 }
1659
1660 impl FakeAcpAgentServer {
1661 pub fn new() -> Self {
1662 Self::default()
1663 }
1664
1665 pub fn load_session_count(&self) -> Arc<AtomicUsize> {
1666 self.load_session_count.clone()
1667 }
1668
1669 pub fn close_session_count(&self) -> Arc<AtomicUsize> {
1670 self.close_session_count.clone()
1671 }
1672
1673 pub fn simulate_server_exit(&self) {
1674 let sender = self
1675 .exit_status_sender
1676 .lock()
1677 .expect("exit status sender lock should not be poisoned")
1678 .clone()
1679 .expect("fake ACP server must be connected before simulating exit");
1680 sender
1681 .try_send(std::process::ExitStatus::default())
1682 .expect("fake ACP server exit receiver should still be alive");
1683 }
1684
1685 pub fn fail_next_prompt(&self) {
1686 self.fail_next_prompt.store(true, Ordering::SeqCst);
1687 }
1688 }
1689
1690 impl crate::AgentServer for FakeAcpAgentServer {
1691 fn logo(&self) -> ui::IconName {
1692 ui::IconName::ZedAgent
1693 }
1694
1695 fn agent_id(&self) -> AgentId {
1696 AgentId::new("Test")
1697 }
1698
1699 fn connect(
1700 &self,
1701 _delegate: crate::AgentServerDelegate,
1702 project: Entity<Project>,
1703 cx: &mut App,
1704 ) -> Task<anyhow::Result<Rc<dyn AgentConnection>>> {
1705 let load_session_count = self.load_session_count.clone();
1706 let close_session_count = self.close_session_count.clone();
1707 let fail_next_prompt = self.fail_next_prompt.clone();
1708 let exit_status_sender = self.exit_status_sender.clone();
1709 cx.spawn(async move |cx| {
1710 let harness = build_fake_acp_connection(
1711 project,
1712 load_session_count,
1713 close_session_count,
1714 fail_next_prompt,
1715 cx,
1716 )
1717 .await?;
1718 let (exit_tx, exit_rx) = smol::channel::bounded(1);
1719 *exit_status_sender
1720 .lock()
1721 .expect("exit status sender lock should not be poisoned") = Some(exit_tx);
1722 let connection = harness.connection.clone();
1723 let simulate_exit_task = cx.spawn(async move |cx| {
1724 while let Ok(status) = exit_rx.recv().await {
1725 emit_load_error_to_all_sessions(
1726 &connection.sessions,
1727 LoadError::Exited { status },
1728 cx,
1729 );
1730 }
1731 Ok(())
1732 });
1733 Ok(Rc::new(FakeAcpAgentConnection {
1734 inner: harness.connection,
1735 _keep_agent_alive: harness.keep_agent_alive,
1736 _simulate_exit_task: simulate_exit_task,
1737 }) as Rc<dyn AgentConnection>)
1738 })
1739 }
1740
1741 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1742 self
1743 }
1744 }
1745
1746 pub struct FakeAcpConnectionHarness {
1747 pub connection: Rc<AcpConnection>,
1748 pub load_session_count: Arc<AtomicUsize>,
1749 pub close_session_count: Arc<AtomicUsize>,
1750 pub keep_agent_alive: Task<anyhow::Result<()>>,
1751 }
1752
1753 struct FakeAcpAgentConnection {
1754 inner: Rc<AcpConnection>,
1755 _keep_agent_alive: Task<anyhow::Result<()>>,
1756 _simulate_exit_task: Task<anyhow::Result<()>>,
1757 }
1758
1759 impl AgentConnection for FakeAcpAgentConnection {
1760 fn agent_id(&self) -> AgentId {
1761 self.inner.agent_id()
1762 }
1763
1764 fn telemetry_id(&self) -> SharedString {
1765 self.inner.telemetry_id()
1766 }
1767
1768 fn new_session(
1769 self: Rc<Self>,
1770 project: Entity<Project>,
1771 work_dirs: PathList,
1772 cx: &mut App,
1773 ) -> Task<Result<Entity<AcpThread>>> {
1774 self.inner.clone().new_session(project, work_dirs, cx)
1775 }
1776
1777 fn supports_load_session(&self) -> bool {
1778 self.inner.supports_load_session()
1779 }
1780
1781 fn load_session(
1782 self: Rc<Self>,
1783 session_id: acp::SessionId,
1784 project: Entity<Project>,
1785 work_dirs: PathList,
1786 title: Option<SharedString>,
1787 cx: &mut App,
1788 ) -> Task<Result<Entity<AcpThread>>> {
1789 self.inner
1790 .clone()
1791 .load_session(session_id, project, work_dirs, title, cx)
1792 }
1793
1794 fn supports_close_session(&self) -> bool {
1795 self.inner.supports_close_session()
1796 }
1797
1798 fn close_session(
1799 self: Rc<Self>,
1800 session_id: &acp::SessionId,
1801 cx: &mut App,
1802 ) -> Task<Result<()>> {
1803 self.inner.clone().close_session(session_id, cx)
1804 }
1805
1806 fn supports_resume_session(&self) -> bool {
1807 self.inner.supports_resume_session()
1808 }
1809
1810 fn resume_session(
1811 self: Rc<Self>,
1812 session_id: acp::SessionId,
1813 project: Entity<Project>,
1814 work_dirs: PathList,
1815 title: Option<SharedString>,
1816 cx: &mut App,
1817 ) -> Task<Result<Entity<AcpThread>>> {
1818 self.inner
1819 .clone()
1820 .resume_session(session_id, project, work_dirs, title, cx)
1821 }
1822
1823 fn auth_methods(&self) -> &[acp::AuthMethod] {
1824 self.inner.auth_methods()
1825 }
1826
1827 fn terminal_auth_task(
1828 &self,
1829 method: &acp::AuthMethodId,
1830 cx: &App,
1831 ) -> Option<Task<Result<SpawnInTerminal>>> {
1832 self.inner.terminal_auth_task(method, cx)
1833 }
1834
1835 fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1836 self.inner.authenticate(method, cx)
1837 }
1838
1839 fn prompt(
1840 &self,
1841 user_message_id: UserMessageId,
1842 params: acp::PromptRequest,
1843 cx: &mut App,
1844 ) -> Task<Result<acp::PromptResponse>> {
1845 self.inner.prompt(user_message_id, params, cx)
1846 }
1847
1848 fn retry(
1849 &self,
1850 session_id: &acp::SessionId,
1851 cx: &App,
1852 ) -> Option<Rc<dyn AgentSessionRetry>> {
1853 self.inner.retry(session_id, cx)
1854 }
1855
1856 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1857 self.inner.cancel(session_id, cx)
1858 }
1859
1860 fn truncate(
1861 &self,
1862 session_id: &acp::SessionId,
1863 cx: &App,
1864 ) -> Option<Rc<dyn AgentSessionTruncate>> {
1865 self.inner.truncate(session_id, cx)
1866 }
1867
1868 fn set_title(
1869 &self,
1870 session_id: &acp::SessionId,
1871 cx: &App,
1872 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
1873 self.inner.set_title(session_id, cx)
1874 }
1875
1876 fn model_selector(
1877 &self,
1878 session_id: &acp::SessionId,
1879 ) -> Option<Rc<dyn AgentModelSelector>> {
1880 self.inner.model_selector(session_id)
1881 }
1882
1883 fn telemetry(&self) -> Option<Rc<dyn AgentTelemetry>> {
1884 self.inner.telemetry()
1885 }
1886
1887 fn session_modes(
1888 &self,
1889 session_id: &acp::SessionId,
1890 cx: &App,
1891 ) -> Option<Rc<dyn AgentSessionModes>> {
1892 self.inner.session_modes(session_id, cx)
1893 }
1894
1895 fn session_config_options(
1896 &self,
1897 session_id: &acp::SessionId,
1898 cx: &App,
1899 ) -> Option<Rc<dyn AgentSessionConfigOptions>> {
1900 self.inner.session_config_options(session_id, cx)
1901 }
1902
1903 fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1904 self.inner.session_list(cx)
1905 }
1906
1907 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1908 self
1909 }
1910 }
1911
1912 async fn build_fake_acp_connection(
1913 project: Entity<Project>,
1914 load_session_count: Arc<AtomicUsize>,
1915 close_session_count: Arc<AtomicUsize>,
1916 fail_next_prompt: Arc<AtomicBool>,
1917 cx: &mut AsyncApp,
1918 ) -> Result<FakeAcpConnectionHarness> {
1919 let (client_transport, agent_transport) = agent_client_protocol::Channel::duplex();
1920
1921 let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1922 Rc::new(RefCell::new(HashMap::default()));
1923 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1924 Rc::new(RefCell::new(None));
1925
1926 let agent_future = Agent
1927 .builder()
1928 .name("fake-agent")
1929 .on_receive_request(
1930 async move |req: acp::InitializeRequest, responder, _cx| {
1931 responder.respond(
1932 acp::InitializeResponse::new(req.protocol_version).agent_capabilities(
1933 acp::AgentCapabilities::default()
1934 .load_session(true)
1935 .session_capabilities(
1936 acp::SessionCapabilities::default()
1937 .close(acp::SessionCloseCapabilities::new()),
1938 ),
1939 ),
1940 )
1941 },
1942 agent_client_protocol::on_receive_request!(),
1943 )
1944 .on_receive_request(
1945 async move |_req: acp::AuthenticateRequest, responder, _cx| {
1946 responder.respond(Default::default())
1947 },
1948 agent_client_protocol::on_receive_request!(),
1949 )
1950 .on_receive_request(
1951 async move |_req: acp::NewSessionRequest, responder, _cx| {
1952 responder.respond(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1953 },
1954 agent_client_protocol::on_receive_request!(),
1955 )
1956 .on_receive_request(
1957 {
1958 let fail_next_prompt = fail_next_prompt.clone();
1959 async move |_req: acp::PromptRequest, responder, _cx| {
1960 if fail_next_prompt.swap(false, Ordering::SeqCst) {
1961 responder.respond_with_error(acp::ErrorCode::InternalError.into())
1962 } else {
1963 responder.respond(acp::PromptResponse::new(acp::StopReason::EndTurn))
1964 }
1965 }
1966 },
1967 agent_client_protocol::on_receive_request!(),
1968 )
1969 .on_receive_request(
1970 {
1971 let load_session_count = load_session_count.clone();
1972 async move |_req: acp::LoadSessionRequest, responder, _cx| {
1973 load_session_count.fetch_add(1, Ordering::SeqCst);
1974 responder.respond(acp::LoadSessionResponse::new())
1975 }
1976 },
1977 agent_client_protocol::on_receive_request!(),
1978 )
1979 .on_receive_request(
1980 {
1981 let close_session_count = close_session_count.clone();
1982 async move |_req: acp::CloseSessionRequest, responder, _cx| {
1983 close_session_count.fetch_add(1, Ordering::SeqCst);
1984 responder.respond(acp::CloseSessionResponse::new())
1985 }
1986 },
1987 agent_client_protocol::on_receive_request!(),
1988 )
1989 .on_receive_notification(
1990 async move |_notif: acp::CancelNotification, _cx| Ok(()),
1991 agent_client_protocol::on_receive_notification!(),
1992 )
1993 .connect_to(agent_transport);
1994
1995 let agent_io_task = cx.background_spawn(agent_future);
1996
1997 // Wire the production handler set into the fake client so inbound
1998 // requests/notifications from the fake agent are dispatched the
1999 // same way the real `stdio` path does.
2000 let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
2001
2002 let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
2003 let client_future = connect_client_future(
2004 "zed-test",
2005 client_transport,
2006 dispatch_tx.clone(),
2007 connection_tx,
2008 );
2009 let client_io_task = cx.background_spawn(async move {
2010 client_future.await.ok();
2011 });
2012
2013 let client_conn: ConnectionTo<Agent> = connection_rx
2014 .await
2015 .context("failed to receive fake ACP connection handle")?;
2016
2017 let response = into_foreground_future(
2018 client_conn.send_request(acp::InitializeRequest::new(acp::ProtocolVersion::V1)),
2019 )
2020 .await?;
2021
2022 let agent_capabilities = response.agent_capabilities;
2023
2024 let dispatch_context = ClientContext {
2025 sessions: sessions.clone(),
2026 session_list: client_session_list.clone(),
2027 };
2028 let dispatch_task = cx.spawn({
2029 let mut dispatch_rx = dispatch_rx;
2030 async move |cx| {
2031 while let Some(work) = dispatch_rx.next().await {
2032 work.run(cx, &dispatch_context);
2033 }
2034 }
2035 });
2036
2037 let agent_server_store =
2038 project.read_with(cx, |project, _| project.agent_server_store().downgrade());
2039
2040 let connection = cx.update(|cx| {
2041 AcpConnection::new_for_test(
2042 client_conn,
2043 sessions,
2044 agent_capabilities,
2045 agent_server_store,
2046 client_io_task,
2047 dispatch_task,
2048 cx,
2049 )
2050 });
2051
2052 let keep_agent_alive = cx.background_spawn(async move {
2053 agent_io_task.await.ok();
2054 anyhow::Ok(())
2055 });
2056
2057 Ok(FakeAcpConnectionHarness {
2058 connection: Rc::new(connection),
2059 load_session_count,
2060 close_session_count,
2061 keep_agent_alive,
2062 })
2063 }
2064
2065 pub async fn connect_fake_acp_connection(
2066 project: Entity<Project>,
2067 cx: &mut gpui::TestAppContext,
2068 ) -> FakeAcpConnectionHarness {
2069 cx.update(|cx| {
2070 let store = settings::SettingsStore::test(cx);
2071 cx.set_global(store);
2072 });
2073
2074 build_fake_acp_connection(
2075 project,
2076 Arc::new(AtomicUsize::new(0)),
2077 Arc::new(AtomicUsize::new(0)),
2078 Arc::new(AtomicBool::new(false)),
2079 &mut cx.to_async(),
2080 )
2081 .await
2082 .expect("failed to initialize ACP connection")
2083 }
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088 use std::sync::atomic::{AtomicUsize, Ordering};
2089
2090 use super::*;
2091
2092 #[test]
2093 fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
2094 let command = AgentServerCommand {
2095 path: "/path/to/agent".into(),
2096 args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
2097 env: Some(HashMap::from_iter([
2098 ("BASE".into(), "1".into()),
2099 ("SHARED".into(), "override".into()),
2100 ("EXTRA".into(), "2".into()),
2101 ])),
2102 };
2103 let method = acp::AuthMethodTerminal::new("login", "Login");
2104
2105 let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
2106
2107 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
2108 assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
2109 assert_eq!(
2110 task.env,
2111 HashMap::from_iter([
2112 ("BASE".into(), "1".into()),
2113 ("SHARED".into(), "override".into()),
2114 ("EXTRA".into(), "2".into()),
2115 ])
2116 );
2117 assert_eq!(task.label, "Login");
2118 assert_eq!(task.command_label, "Login");
2119 }
2120
2121 #[test]
2122 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
2123 let method_id = acp::AuthMethodId::new("legacy-login");
2124 let method = acp::AuthMethod::Agent(
2125 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
2126 "terminal-auth".to_string(),
2127 serde_json::json!({
2128 "label": "legacy /auth",
2129 "command": "legacy-agent",
2130 "args": ["auth", "--interactive"],
2131 "env": {
2132 "AUTH_MODE": "interactive",
2133 },
2134 }),
2135 )])),
2136 );
2137
2138 let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
2139 .expect("expected legacy terminal auth task");
2140
2141 assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
2142 assert_eq!(task.command.as_deref(), Some("legacy-agent"));
2143 assert_eq!(task.args, vec!["auth", "--interactive"]);
2144 assert_eq!(
2145 task.env,
2146 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
2147 );
2148 assert_eq!(task.label, "legacy /auth");
2149 }
2150
2151 #[test]
2152 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
2153 let method_id = acp::AuthMethodId::new("legacy-login");
2154 let method = acp::AuthMethod::Agent(
2155 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
2156 "terminal-auth".to_string(),
2157 serde_json::json!({
2158 "label": "legacy /auth",
2159 }),
2160 )])),
2161 );
2162
2163 assert!(
2164 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
2165 );
2166 }
2167
2168 #[test]
2169 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
2170 let method_id = acp::AuthMethodId::new("login");
2171 let method = acp::AuthMethod::Terminal(
2172 acp::AuthMethodTerminal::new(method_id, "Login")
2173 .args(vec!["/auth".into()])
2174 .env(std::collections::HashMap::from_iter([(
2175 "AUTH_MODE".into(),
2176 "first-class".into(),
2177 )]))
2178 .meta(acp::Meta::from_iter([(
2179 "terminal-auth".to_string(),
2180 serde_json::json!({
2181 "label": "legacy /auth",
2182 "command": "legacy-agent",
2183 "args": ["legacy-auth"],
2184 "env": {
2185 "AUTH_MODE": "legacy",
2186 },
2187 }),
2188 )])),
2189 );
2190
2191 let command = AgentServerCommand {
2192 path: "/path/to/agent".into(),
2193 args: vec!["--acp".into(), "/auth".into()],
2194 env: Some(HashMap::from_iter([
2195 ("BASE".into(), "1".into()),
2196 ("AUTH_MODE".into(), "first-class".into()),
2197 ])),
2198 };
2199
2200 let task = match &method {
2201 acp::AuthMethod::Terminal(terminal) => {
2202 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
2203 }
2204 _ => unreachable!(),
2205 };
2206
2207 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
2208 assert_eq!(task.args, vec!["--acp", "/auth"]);
2209 assert_eq!(
2210 task.env,
2211 HashMap::from_iter([
2212 ("BASE".into(), "1".into()),
2213 ("AUTH_MODE".into(), "first-class".into()),
2214 ])
2215 );
2216 assert_eq!(task.label, "Login");
2217 }
2218
2219 async fn connect_fake_agent(
2220 cx: &mut gpui::TestAppContext,
2221 ) -> (
2222 Rc<AcpConnection>,
2223 Entity<project::Project>,
2224 Arc<AtomicUsize>,
2225 Arc<AtomicUsize>,
2226 Arc<std::sync::Mutex<Vec<acp::SessionUpdate>>>,
2227 Arc<std::sync::Mutex<Option<smol::channel::Receiver<()>>>>,
2228 Task<anyhow::Result<()>>,
2229 ) {
2230 cx.update(|cx| {
2231 let store = settings::SettingsStore::test(cx);
2232 cx.set_global(store);
2233 });
2234
2235 let fs = fs::FakeFs::new(cx.executor());
2236 fs.insert_tree("/", serde_json::json!({ "a": {} })).await;
2237 let project = project::Project::test(fs, [std::path::Path::new("/a")], cx).await;
2238
2239 let load_count = Arc::new(AtomicUsize::new(0));
2240 let close_count = Arc::new(AtomicUsize::new(0));
2241 let load_session_updates: Arc<std::sync::Mutex<Vec<acp::SessionUpdate>>> =
2242 Arc::new(std::sync::Mutex::new(Vec::new()));
2243 let load_session_gate: Arc<std::sync::Mutex<Option<smol::channel::Receiver<()>>>> =
2244 Arc::new(std::sync::Mutex::new(None));
2245
2246 let (client_transport, agent_transport) = agent_client_protocol::Channel::duplex();
2247
2248 let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
2249 Rc::new(RefCell::new(HashMap::default()));
2250 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
2251 Rc::new(RefCell::new(None));
2252
2253 // Build the fake agent side. It handles the requests issued by
2254 // `AcpConnection` during the test and tracks load/close counts.
2255 let agent_future = Agent
2256 .builder()
2257 .name("fake-agent")
2258 .on_receive_request(
2259 async move |req: acp::InitializeRequest, responder, _cx| {
2260 responder.respond(
2261 acp::InitializeResponse::new(req.protocol_version).agent_capabilities(
2262 acp::AgentCapabilities::default()
2263 .load_session(true)
2264 .session_capabilities(
2265 acp::SessionCapabilities::default()
2266 .close(acp::SessionCloseCapabilities::new()),
2267 ),
2268 ),
2269 )
2270 },
2271 agent_client_protocol::on_receive_request!(),
2272 )
2273 .on_receive_request(
2274 async move |_req: acp::AuthenticateRequest, responder, _cx| {
2275 responder.respond(Default::default())
2276 },
2277 agent_client_protocol::on_receive_request!(),
2278 )
2279 .on_receive_request(
2280 async move |_req: acp::NewSessionRequest, responder, _cx| {
2281 responder.respond(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
2282 },
2283 agent_client_protocol::on_receive_request!(),
2284 )
2285 .on_receive_request(
2286 async move |_req: acp::PromptRequest, responder, _cx| {
2287 responder.respond(acp::PromptResponse::new(acp::StopReason::EndTurn))
2288 },
2289 agent_client_protocol::on_receive_request!(),
2290 )
2291 .on_receive_request(
2292 {
2293 let load_count = load_count.clone();
2294 let load_session_updates = load_session_updates.clone();
2295 let load_session_gate = load_session_gate.clone();
2296 async move |req: acp::LoadSessionRequest, responder, cx| {
2297 load_count.fetch_add(1, Ordering::SeqCst);
2298
2299 // Simulate spec-compliant history replay: send
2300 // notifications to the client before responding to the
2301 // load request.
2302 let updates = std::mem::take(
2303 &mut *load_session_updates
2304 .lock()
2305 .expect("load_session_updates mutex poisoned"),
2306 );
2307 for update in updates {
2308 cx.send_notification(acp::SessionNotification::new(
2309 req.session_id.clone(),
2310 update,
2311 ))?;
2312 }
2313
2314 // If a gate was installed, park on it before responding
2315 // so tests can interleave other work (e.g.
2316 // `close_session`) with an in-flight load.
2317 let gate = load_session_gate
2318 .lock()
2319 .expect("load_session_gate mutex poisoned")
2320 .take();
2321 if let Some(gate) = gate {
2322 gate.recv().await.ok();
2323 }
2324
2325 responder.respond(acp::LoadSessionResponse::new())
2326 }
2327 },
2328 agent_client_protocol::on_receive_request!(),
2329 )
2330 .on_receive_request(
2331 {
2332 let close_count = close_count.clone();
2333 async move |_req: acp::CloseSessionRequest, responder, _cx| {
2334 close_count.fetch_add(1, Ordering::SeqCst);
2335 responder.respond(acp::CloseSessionResponse::new())
2336 }
2337 },
2338 agent_client_protocol::on_receive_request!(),
2339 )
2340 .on_receive_notification(
2341 async move |_notif: acp::CancelNotification, _cx| Ok(()),
2342 agent_client_protocol::on_receive_notification!(),
2343 )
2344 .connect_to(agent_transport);
2345
2346 let agent_io_task = cx.background_spawn(agent_future);
2347
2348 // Wire the production handler set into the fake client so inbound
2349 // requests/notifications from the fake agent reach the same
2350 // dispatcher that the real `stdio` path uses.
2351 let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
2352
2353 let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
2354 let client_future = connect_client_future(
2355 "zed-test",
2356 client_transport,
2357 dispatch_tx.clone(),
2358 connection_tx,
2359 );
2360 let client_io_task = cx.background_spawn(async move {
2361 client_future.await.ok();
2362 });
2363
2364 let client_conn: ConnectionTo<Agent> = connection_rx
2365 .await
2366 .expect("failed to receive ACP connection handle");
2367
2368 let response = into_foreground_future(
2369 client_conn.send_request(acp::InitializeRequest::new(acp::ProtocolVersion::V1)),
2370 )
2371 .await
2372 .expect("failed to initialize ACP connection");
2373
2374 let agent_capabilities = response.agent_capabilities;
2375
2376 let dispatch_context = ClientContext {
2377 sessions: sessions.clone(),
2378 session_list: client_session_list.clone(),
2379 };
2380 // `TestAppContext::spawn` hands out an `AsyncApp` by value, whereas the
2381 // production path uses `Context::spawn` which hands out `&mut AsyncApp`.
2382 // Bind the value-form to a local and take `&mut` of it to reuse the
2383 // same dispatch loop shape.
2384 let dispatch_task = cx.spawn({
2385 let mut dispatch_rx = dispatch_rx;
2386 move |cx| async move {
2387 let mut cx = cx;
2388 while let Some(work) = dispatch_rx.next().await {
2389 work.run(&mut cx, &dispatch_context);
2390 }
2391 }
2392 });
2393
2394 let agent_server_store =
2395 project.read_with(cx, |project, _| project.agent_server_store().downgrade());
2396
2397 let connection = cx.update(|cx| {
2398 AcpConnection::new_for_test(
2399 client_conn,
2400 sessions,
2401 agent_capabilities,
2402 agent_server_store,
2403 client_io_task,
2404 dispatch_task,
2405 cx,
2406 )
2407 });
2408
2409 let keep_agent_alive = cx.background_spawn(async move {
2410 agent_io_task.await.ok();
2411 anyhow::Ok(())
2412 });
2413
2414 (
2415 Rc::new(connection),
2416 project,
2417 load_count,
2418 close_count,
2419 load_session_updates,
2420 load_session_gate,
2421 keep_agent_alive,
2422 )
2423 }
2424
2425 #[gpui::test]
2426 async fn test_loaded_sessions_keep_state_until_last_close(cx: &mut gpui::TestAppContext) {
2427 let (
2428 connection,
2429 project,
2430 load_count,
2431 close_count,
2432 _load_session_updates,
2433 _load_session_gate,
2434 _keep_agent_alive,
2435 ) = connect_fake_agent(cx).await;
2436
2437 let session_id = acp::SessionId::new("session-1");
2438 let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2439
2440 // Load the same session twice concurrently — the second call should join
2441 // the pending task rather than issuing a second ACP load_session RPC.
2442 let first_load = cx.update(|cx| {
2443 connection.clone().load_session(
2444 session_id.clone(),
2445 project.clone(),
2446 work_dirs.clone(),
2447 None,
2448 cx,
2449 )
2450 });
2451 let second_load = cx.update(|cx| {
2452 connection.clone().load_session(
2453 session_id.clone(),
2454 project.clone(),
2455 work_dirs.clone(),
2456 None,
2457 cx,
2458 )
2459 });
2460
2461 let first_thread = first_load.await.expect("first load failed");
2462 let second_thread = second_load.await.expect("second load failed");
2463 cx.run_until_parked();
2464
2465 assert_eq!(
2466 first_thread.entity_id(),
2467 second_thread.entity_id(),
2468 "concurrent loads for the same session should share one AcpThread"
2469 );
2470 assert_eq!(
2471 load_count.load(Ordering::SeqCst),
2472 1,
2473 "underlying ACP load_session should be called exactly once for concurrent loads"
2474 );
2475
2476 // The session has ref_count 2. The first close should not send the ACP
2477 // close_session RPC — the session is still referenced.
2478 cx.update(|cx| connection.clone().close_session(&session_id, cx))
2479 .await
2480 .expect("first close failed");
2481
2482 assert_eq!(
2483 close_count.load(Ordering::SeqCst),
2484 0,
2485 "ACP close_session should not be sent while ref_count > 0"
2486 );
2487 assert!(
2488 connection.sessions.borrow().contains_key(&session_id),
2489 "session should still be tracked after first close"
2490 );
2491
2492 // The second close drops ref_count to 0 — now the ACP RPC must be sent.
2493 cx.update(|cx| connection.clone().close_session(&session_id, cx))
2494 .await
2495 .expect("second close failed");
2496 cx.run_until_parked();
2497
2498 assert_eq!(
2499 close_count.load(Ordering::SeqCst),
2500 1,
2501 "ACP close_session should be sent exactly once when ref_count reaches 0"
2502 );
2503 assert!(
2504 !connection.sessions.borrow().contains_key(&session_id),
2505 "session should be removed after final close"
2506 );
2507 }
2508
2509 // Regression test: per the ACP spec, an agent replays the entire conversation
2510 // history as `session/update` notifications *before* responding to the
2511 // `session/load` request. These notifications must be applied to the
2512 // reconstructed thread, not dropped because the session hasn't been
2513 // registered yet.
2514 #[gpui::test]
2515 async fn test_load_session_replays_notifications_sent_before_response(
2516 cx: &mut gpui::TestAppContext,
2517 ) {
2518 let (
2519 connection,
2520 project,
2521 _load_count,
2522 _close_count,
2523 load_session_updates,
2524 _load_session_gate,
2525 _keep_agent_alive,
2526 ) = connect_fake_agent(cx).await;
2527
2528 // Queue up some history updates that the fake agent will stream to
2529 // the client during the `load_session` call, before responding.
2530 *load_session_updates
2531 .lock()
2532 .expect("load_session_updates mutex poisoned") = vec![
2533 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(acp::ContentBlock::Text(
2534 acp::TextContent::new(String::from("hello agent")),
2535 ))),
2536 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(acp::ContentBlock::Text(
2537 acp::TextContent::new(String::from("hi user")),
2538 ))),
2539 ];
2540
2541 let session_id = acp::SessionId::new("session-replay");
2542 let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2543
2544 let thread = cx
2545 .update(|cx| {
2546 connection.clone().load_session(
2547 session_id.clone(),
2548 project.clone(),
2549 work_dirs,
2550 None,
2551 cx,
2552 )
2553 })
2554 .await
2555 .expect("load_session failed");
2556 cx.run_until_parked();
2557
2558 let entries = thread.read_with(cx, |thread, _| {
2559 thread
2560 .entries()
2561 .iter()
2562 .map(|entry| match entry {
2563 acp_thread::AgentThreadEntry::UserMessage(_) => "user",
2564 acp_thread::AgentThreadEntry::AssistantMessage(_) => "assistant",
2565 acp_thread::AgentThreadEntry::ToolCall(_) => "tool_call",
2566 acp_thread::AgentThreadEntry::CompletedPlan(_) => "plan",
2567 })
2568 .collect::<Vec<_>>()
2569 });
2570
2571 assert_eq!(
2572 entries,
2573 vec!["user", "assistant"],
2574 "replayed notifications should be applied to the thread"
2575 );
2576 }
2577
2578 // Regression test: if `close_session` is issued while a `load_session`
2579 // RPC is still in flight, the close must take effect cleanly — the load
2580 // must fail with a recognizable error (not return an orphaned thread),
2581 // no entry must remain in `sessions` or `pending_sessions`, and the ACP
2582 // `close_session` RPC must be dispatched.
2583 #[gpui::test]
2584 async fn test_close_session_during_in_flight_load(cx: &mut gpui::TestAppContext) {
2585 let (
2586 connection,
2587 project,
2588 load_count,
2589 close_count,
2590 _load_session_updates,
2591 load_session_gate,
2592 _keep_agent_alive,
2593 ) = connect_fake_agent(cx).await;
2594
2595 // Install a gate so the fake agent's `load_session` handler parks
2596 // before sending its response. We'll close the session while the
2597 // load is parked.
2598 let (gate_tx, gate_rx) = smol::channel::bounded::<()>(1);
2599 *load_session_gate
2600 .lock()
2601 .expect("load_session_gate mutex poisoned") = Some(gate_rx);
2602
2603 let session_id = acp::SessionId::new("session-close-during-load");
2604 let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2605
2606 let load_task = cx.update(|cx| {
2607 connection.clone().load_session(
2608 session_id.clone(),
2609 project.clone(),
2610 work_dirs,
2611 None,
2612 cx,
2613 )
2614 });
2615
2616 // Let the load RPC reach the agent and park on the gate.
2617 cx.run_until_parked();
2618 assert_eq!(
2619 load_count.load(Ordering::SeqCst),
2620 1,
2621 "load_session RPC should have been dispatched"
2622 );
2623 assert!(
2624 connection
2625 .pending_sessions
2626 .borrow()
2627 .contains_key(&session_id),
2628 "pending_sessions entry should exist while load is in flight"
2629 );
2630 assert!(
2631 connection.sessions.borrow().contains_key(&session_id),
2632 "sessions entry should be pre-registered to receive replay notifications"
2633 );
2634
2635 // Close the session while the load is still parked. This should take
2636 // the pending path and dispatch the ACP close RPC.
2637 let close_task = cx.update(|cx| connection.clone().close_session(&session_id, cx));
2638
2639 // Release the gate so the load RPC can finally respond.
2640 gate_tx.send(()).await.expect("gate send failed");
2641 drop(gate_tx);
2642
2643 let load_result = load_task.await;
2644 close_task.await.expect("close failed");
2645 cx.run_until_parked();
2646
2647 let err = load_result.expect_err("load should fail after close-during-load");
2648 assert!(
2649 err.to_string()
2650 .contains("session was closed before load completed"),
2651 "expected close-during-load error, got: {err}"
2652 );
2653
2654 assert_eq!(
2655 close_count.load(Ordering::SeqCst),
2656 1,
2657 "ACP close_session should be sent exactly once"
2658 );
2659 assert!(
2660 !connection.sessions.borrow().contains_key(&session_id),
2661 "sessions entry should be removed after close-during-load"
2662 );
2663 assert!(
2664 !connection
2665 .pending_sessions
2666 .borrow()
2667 .contains_key(&session_id),
2668 "pending_sessions entry should be removed after close-during-load"
2669 );
2670 }
2671
2672 // Regression test: when two concurrent `load_session` calls share a pending
2673 // task and one of them issues `close_session` before the load RPC
2674 // resolves, the remaining load must still succeed and the session must
2675 // stay live. If `close_session` incorrectly short-circuits via the
2676 // `sessions` path (removing the entry while a load is still in flight),
2677 // the pending task will fail and both concurrent loaders will lose
2678 // their handle.
2679 #[gpui::test]
2680 async fn test_close_during_load_preserves_other_concurrent_loader(
2681 cx: &mut gpui::TestAppContext,
2682 ) {
2683 let (
2684 connection,
2685 project,
2686 load_count,
2687 close_count,
2688 _load_session_updates,
2689 load_session_gate,
2690 _keep_agent_alive,
2691 ) = connect_fake_agent(cx).await;
2692
2693 let (gate_tx, gate_rx) = smol::channel::bounded::<()>(1);
2694 *load_session_gate
2695 .lock()
2696 .expect("load_session_gate mutex poisoned") = Some(gate_rx);
2697
2698 let session_id = acp::SessionId::new("session-concurrent-close");
2699 let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
2700
2701 // Kick off two concurrent loads; the second must join the first's pending
2702 // task rather than issuing a second RPC.
2703 let first_load = cx.update(|cx| {
2704 connection.clone().load_session(
2705 session_id.clone(),
2706 project.clone(),
2707 work_dirs.clone(),
2708 None,
2709 cx,
2710 )
2711 });
2712 let second_load = cx.update(|cx| {
2713 connection.clone().load_session(
2714 session_id.clone(),
2715 project.clone(),
2716 work_dirs.clone(),
2717 None,
2718 cx,
2719 )
2720 });
2721
2722 cx.run_until_parked();
2723 assert_eq!(
2724 load_count.load(Ordering::SeqCst),
2725 1,
2726 "load_session RPC should only be dispatched once for concurrent loads"
2727 );
2728
2729 // Close one of the two handles while the shared load is still parked.
2730 // Because a second loader still holds a pending ref, this should be a
2731 // no-op on the wire.
2732 cx.update(|cx| connection.clone().close_session(&session_id, cx))
2733 .await
2734 .expect("close during load failed");
2735 assert_eq!(
2736 close_count.load(Ordering::SeqCst),
2737 0,
2738 "close_session RPC must not be dispatched while another load handle remains"
2739 );
2740
2741 // Release the gate so the load RPC can finally respond.
2742 gate_tx.send(()).await.expect("gate send failed");
2743 drop(gate_tx);
2744
2745 let first_thread = first_load.await.expect("first load should still succeed");
2746 let second_thread = second_load.await.expect("second load should still succeed");
2747 cx.run_until_parked();
2748
2749 assert_eq!(
2750 first_thread.entity_id(),
2751 second_thread.entity_id(),
2752 "concurrent loads should share one AcpThread"
2753 );
2754 assert!(
2755 connection.sessions.borrow().contains_key(&session_id),
2756 "session must remain tracked while a load handle is still outstanding"
2757 );
2758 assert!(
2759 !connection
2760 .pending_sessions
2761 .borrow()
2762 .contains_key(&session_id),
2763 "pending_sessions entry should be cleared once the load resolves"
2764 );
2765
2766 // Final close drops ref_count to 0 and dispatches the ACP close RPC.
2767 cx.update(|cx| connection.clone().close_session(&session_id, cx))
2768 .await
2769 .expect("final close failed");
2770 cx.run_until_parked();
2771 assert_eq!(
2772 close_count.load(Ordering::SeqCst),
2773 1,
2774 "close_session RPC should fire exactly once when the last handle is released"
2775 );
2776 assert!(
2777 !connection.sessions.borrow().contains_key(&session_id),
2778 "session should be removed after final close"
2779 );
2780 }
2781}
2782
2783fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
2784 let context_server_store = project.read(cx).context_server_store().read(cx);
2785 let is_local = project.read(cx).is_local();
2786 context_server_store
2787 .configured_server_ids()
2788 .iter()
2789 .filter_map(|id| {
2790 let configuration = context_server_store.configuration_for_server(id)?;
2791 match &*configuration {
2792 project::context_server_store::ContextServerConfiguration::Custom {
2793 command,
2794 remote,
2795 ..
2796 }
2797 | project::context_server_store::ContextServerConfiguration::Extension {
2798 command,
2799 remote,
2800 ..
2801 } if is_local || *remote => Some(acp::McpServer::Stdio(
2802 acp::McpServerStdio::new(id.0.to_string(), &command.path)
2803 .args(command.args.clone())
2804 .env(if let Some(env) = command.env.as_ref() {
2805 env.iter()
2806 .map(|(name, value)| acp::EnvVariable::new(name, value))
2807 .collect()
2808 } else {
2809 vec![]
2810 }),
2811 )),
2812 project::context_server_store::ContextServerConfiguration::Http {
2813 url,
2814 headers,
2815 timeout: _,
2816 } => Some(acp::McpServer::Http(
2817 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
2818 headers
2819 .iter()
2820 .map(|(name, value)| acp::HttpHeader::new(name, value))
2821 .collect(),
2822 ),
2823 )),
2824 _ => None,
2825 }
2826 })
2827 .collect()
2828}
2829
2830fn config_state(
2831 modes: Option<acp::SessionModeState>,
2832 models: Option<acp::SessionModelState>,
2833 config_options: Option<Vec<acp::SessionConfigOption>>,
2834) -> (
2835 Option<Rc<RefCell<acp::SessionModeState>>>,
2836 Option<Rc<RefCell<acp::SessionModelState>>>,
2837 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
2838) {
2839 if let Some(opts) = config_options {
2840 return (None, None, Some(Rc::new(RefCell::new(opts))));
2841 }
2842
2843 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
2844 let models = models.map(|models| Rc::new(RefCell::new(models)));
2845 (modes, models, None)
2846}
2847
2848struct AcpSessionModes {
2849 session_id: acp::SessionId,
2850 connection: ConnectionTo<Agent>,
2851 state: Rc<RefCell<acp::SessionModeState>>,
2852}
2853
2854impl acp_thread::AgentSessionModes for AcpSessionModes {
2855 fn current_mode(&self) -> acp::SessionModeId {
2856 self.state.borrow().current_mode_id.clone()
2857 }
2858
2859 fn all_modes(&self) -> Vec<acp::SessionMode> {
2860 self.state.borrow().available_modes.clone()
2861 }
2862
2863 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
2864 let connection = self.connection.clone();
2865 let session_id = self.session_id.clone();
2866 let old_mode_id;
2867 {
2868 let mut state = self.state.borrow_mut();
2869 old_mode_id = state.current_mode_id.clone();
2870 state.current_mode_id = mode_id.clone();
2871 };
2872 let state = self.state.clone();
2873 cx.foreground_executor().spawn(async move {
2874 let result = into_foreground_future(
2875 connection.send_request(acp::SetSessionModeRequest::new(session_id, mode_id)),
2876 )
2877 .await;
2878
2879 if result.is_err() {
2880 state.borrow_mut().current_mode_id = old_mode_id;
2881 }
2882
2883 result?;
2884
2885 Ok(())
2886 })
2887 }
2888}
2889
2890struct AcpModelSelector {
2891 session_id: acp::SessionId,
2892 connection: ConnectionTo<Agent>,
2893 state: Rc<RefCell<acp::SessionModelState>>,
2894}
2895
2896impl AcpModelSelector {
2897 fn new(
2898 session_id: acp::SessionId,
2899 connection: ConnectionTo<Agent>,
2900 state: Rc<RefCell<acp::SessionModelState>>,
2901 ) -> Self {
2902 Self {
2903 session_id,
2904 connection,
2905 state,
2906 }
2907 }
2908}
2909
2910impl acp_thread::AgentModelSelector for AcpModelSelector {
2911 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
2912 Task::ready(Ok(acp_thread::AgentModelList::Flat(
2913 self.state
2914 .borrow()
2915 .available_models
2916 .clone()
2917 .into_iter()
2918 .map(acp_thread::AgentModelInfo::from)
2919 .collect(),
2920 )))
2921 }
2922
2923 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
2924 let connection = self.connection.clone();
2925 let session_id = self.session_id.clone();
2926 let old_model_id;
2927 {
2928 let mut state = self.state.borrow_mut();
2929 old_model_id = state.current_model_id.clone();
2930 state.current_model_id = model_id.clone();
2931 };
2932 let state = self.state.clone();
2933 cx.foreground_executor().spawn(async move {
2934 let result = into_foreground_future(
2935 connection.send_request(acp::SetSessionModelRequest::new(session_id, model_id)),
2936 )
2937 .await;
2938
2939 if result.is_err() {
2940 state.borrow_mut().current_model_id = old_model_id;
2941 }
2942
2943 result?;
2944
2945 Ok(())
2946 })
2947 }
2948
2949 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
2950 let state = self.state.borrow();
2951 Task::ready(
2952 state
2953 .available_models
2954 .iter()
2955 .find(|m| m.model_id == state.current_model_id)
2956 .cloned()
2957 .map(acp_thread::AgentModelInfo::from)
2958 .ok_or_else(|| anyhow::anyhow!("Model not found")),
2959 )
2960 }
2961}
2962
2963struct AcpSessionConfigOptions {
2964 session_id: acp::SessionId,
2965 connection: ConnectionTo<Agent>,
2966 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
2967 watch_tx: Rc<RefCell<watch::Sender<()>>>,
2968 watch_rx: watch::Receiver<()>,
2969}
2970
2971impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
2972 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
2973 self.state.borrow().clone()
2974 }
2975
2976 fn set_config_option(
2977 &self,
2978 config_id: acp::SessionConfigId,
2979 value: acp::SessionConfigValueId,
2980 cx: &mut App,
2981 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
2982 let connection = self.connection.clone();
2983 let session_id = self.session_id.clone();
2984 let state = self.state.clone();
2985
2986 let watch_tx = self.watch_tx.clone();
2987
2988 cx.foreground_executor().spawn(async move {
2989 let response = into_foreground_future(connection.send_request(
2990 acp::SetSessionConfigOptionRequest::new(session_id, config_id, value),
2991 ))
2992 .await?;
2993
2994 *state.borrow_mut() = response.config_options.clone();
2995 watch_tx.borrow_mut().send(()).ok();
2996 Ok(response.config_options)
2997 })
2998 }
2999
3000 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
3001 Some(self.watch_rx.clone())
3002 }
3003}
3004
3005// ---------------------------------------------------------------------------
3006// Handler functions dispatched from background handler closures to the
3007// foreground thread via the ForegroundWork channel.
3008// ---------------------------------------------------------------------------
3009
3010fn session_thread(
3011 ctx: &ClientContext,
3012 session_id: &acp::SessionId,
3013) -> Result<WeakEntity<AcpThread>, acp::Error> {
3014 let sessions = ctx.sessions.borrow();
3015 sessions
3016 .get(session_id)
3017 .map(|session| session.thread.clone())
3018 .ok_or_else(|| acp::Error::internal_error().data(format!("unknown session: {session_id}")))
3019}
3020
3021fn respond_err<T: JsonRpcResponse>(responder: Responder<T>, err: acp::Error) {
3022 // Log the actual error we're returning — otherwise agents that hit an
3023 // error path (e.g. unknown session) would see only the generic internal
3024 // error returned over the wire with no trace of why on the client side.
3025 log::warn!(
3026 "Responding to ACP request `{method}` with error: {err:?}",
3027 method = responder.method()
3028 );
3029 responder.respond_with_error(err).log_err();
3030}
3031
3032fn handle_request_permission(
3033 args: acp::RequestPermissionRequest,
3034 responder: Responder<acp::RequestPermissionResponse>,
3035 cx: &mut AsyncApp,
3036 ctx: &ClientContext,
3037) {
3038 let thread = match session_thread(ctx, &args.session_id) {
3039 Ok(t) => t,
3040 Err(e) => return respond_err(responder, e),
3041 };
3042
3043 cx.spawn(async move |cx| {
3044 let result: Result<_, acp::Error> = async {
3045 let task = thread
3046 .update(cx, |thread, cx| {
3047 thread.request_tool_call_authorization(
3048 args.tool_call,
3049 acp_thread::PermissionOptions::Flat(args.options),
3050 cx,
3051 )
3052 })
3053 .flatten_acp()?;
3054 Ok(task.await)
3055 }
3056 .await;
3057
3058 match result {
3059 Ok(outcome) => {
3060 responder
3061 .respond(acp::RequestPermissionResponse::new(outcome.into()))
3062 .log_err();
3063 }
3064 Err(e) => respond_err(responder, e),
3065 }
3066 })
3067 .detach();
3068}
3069
3070fn handle_write_text_file(
3071 args: acp::WriteTextFileRequest,
3072 responder: Responder<acp::WriteTextFileResponse>,
3073 cx: &mut AsyncApp,
3074 ctx: &ClientContext,
3075) {
3076 let thread = match session_thread(ctx, &args.session_id) {
3077 Ok(t) => t,
3078 Err(e) => return respond_err(responder, e),
3079 };
3080
3081 cx.spawn(async move |cx| {
3082 let result: Result<_, acp::Error> = async {
3083 thread
3084 .update(cx, |thread, cx| {
3085 thread.write_text_file(args.path, args.content, cx)
3086 })
3087 .map_err(acp::Error::from)?
3088 .await?;
3089 Ok(())
3090 }
3091 .await;
3092
3093 match result {
3094 Ok(()) => {
3095 responder
3096 .respond(acp::WriteTextFileResponse::default())
3097 .log_err();
3098 }
3099 Err(e) => respond_err(responder, e),
3100 }
3101 })
3102 .detach();
3103}
3104
3105fn handle_read_text_file(
3106 args: acp::ReadTextFileRequest,
3107 responder: Responder<acp::ReadTextFileResponse>,
3108 cx: &mut AsyncApp,
3109 ctx: &ClientContext,
3110) {
3111 let thread = match session_thread(ctx, &args.session_id) {
3112 Ok(t) => t,
3113 Err(e) => return respond_err(responder, e),
3114 };
3115
3116 cx.spawn(async move |cx| {
3117 let result: Result<_, acp::Error> = async {
3118 thread
3119 .update(cx, |thread, cx| {
3120 thread.read_text_file(args.path, args.line, args.limit, false, cx)
3121 })
3122 .map_err(acp::Error::from)?
3123 .await
3124 }
3125 .await;
3126
3127 match result {
3128 Ok(content) => {
3129 responder
3130 .respond(acp::ReadTextFileResponse::new(content))
3131 .log_err();
3132 }
3133 Err(e) => respond_err(responder, e),
3134 }
3135 })
3136 .detach();
3137}
3138
3139fn handle_session_notification(
3140 notification: acp::SessionNotification,
3141 cx: &mut AsyncApp,
3142 ctx: &ClientContext,
3143) {
3144 // Extract everything we need from the session while briefly borrowing.
3145 let (thread, session_modes, config_opts_data) = {
3146 let sessions = ctx.sessions.borrow();
3147 let Some(session) = sessions.get(¬ification.session_id) else {
3148 log::warn!(
3149 "Received session notification for unknown session: {:?}",
3150 notification.session_id
3151 );
3152 return;
3153 };
3154 (
3155 session.thread.clone(),
3156 session.session_modes.clone(),
3157 session
3158 .config_options
3159 .as_ref()
3160 .map(|opts| (opts.config_options.clone(), opts.tx.clone())),
3161 )
3162 };
3163 // Borrow is dropped here.
3164
3165 // Apply mode/config/session_list updates without holding the borrow.
3166 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
3167 current_mode_id, ..
3168 }) = ¬ification.update
3169 {
3170 if let Some(session_modes) = &session_modes {
3171 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
3172 }
3173 }
3174
3175 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
3176 config_options, ..
3177 }) = ¬ification.update
3178 {
3179 if let Some((config_opts_cell, tx_cell)) = &config_opts_data {
3180 *config_opts_cell.borrow_mut() = config_options.clone();
3181 tx_cell.borrow_mut().send(()).ok();
3182 }
3183 }
3184
3185 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
3186 && let Some(session_list) = ctx.session_list.borrow().as_ref()
3187 {
3188 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
3189 }
3190
3191 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
3192 if let acp::SessionUpdate::ToolCall(tc) = ¬ification.update {
3193 if let Some(meta) = &tc.meta {
3194 if let Some(terminal_info) = meta.get("terminal_info") {
3195 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) {
3196 let terminal_id = acp::TerminalId::new(id_str);
3197 let cwd = terminal_info
3198 .get("cwd")
3199 .and_then(|v| v.as_str().map(PathBuf::from));
3200
3201 thread
3202 .update(cx, |thread, cx| {
3203 let builder = TerminalBuilder::new_display_only(
3204 CursorShape::default(),
3205 AlternateScroll::On,
3206 None,
3207 0,
3208 cx.background_executor(),
3209 thread.project().read(cx).path_style(cx),
3210 )?;
3211 let lower = cx.new(|cx| builder.subscribe(cx));
3212 thread.on_terminal_provider_event(
3213 TerminalProviderEvent::Created {
3214 terminal_id,
3215 label: tc.title.clone(),
3216 cwd,
3217 output_byte_limit: None,
3218 terminal: lower,
3219 },
3220 cx,
3221 );
3222 anyhow::Ok(())
3223 })
3224 .log_err();
3225 }
3226 }
3227 }
3228 }
3229
3230 // Forward the update to the acp_thread as usual.
3231 if let Err(err) = thread
3232 .update(cx, |thread, cx| {
3233 thread.handle_session_update(notification.update.clone(), cx)
3234 })
3235 .flatten_acp()
3236 {
3237 log::error!(
3238 "Failed to handle session update for {:?}: {err:?}",
3239 notification.session_id
3240 );
3241 }
3242
3243 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
3244 if let acp::SessionUpdate::ToolCallUpdate(tcu) = ¬ification.update {
3245 if let Some(meta) = &tcu.meta {
3246 if let Some(term_out) = meta.get("terminal_output") {
3247 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
3248 let terminal_id = acp::TerminalId::new(id_str);
3249 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
3250 let data = s.as_bytes().to_vec();
3251 thread
3252 .update(cx, |thread, cx| {
3253 thread.on_terminal_provider_event(
3254 TerminalProviderEvent::Output { terminal_id, data },
3255 cx,
3256 );
3257 })
3258 .log_err();
3259 }
3260 }
3261 }
3262
3263 if let Some(term_exit) = meta.get("terminal_exit") {
3264 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
3265 let terminal_id = acp::TerminalId::new(id_str);
3266 let status = acp::TerminalExitStatus::new()
3267 .exit_code(
3268 term_exit
3269 .get("exit_code")
3270 .and_then(|v| v.as_u64())
3271 .map(|i| i as u32),
3272 )
3273 .signal(
3274 term_exit
3275 .get("signal")
3276 .and_then(|v| v.as_str().map(|s| s.to_string())),
3277 );
3278
3279 thread
3280 .update(cx, |thread, cx| {
3281 thread.on_terminal_provider_event(
3282 TerminalProviderEvent::Exit {
3283 terminal_id,
3284 status,
3285 },
3286 cx,
3287 );
3288 })
3289 .log_err();
3290 }
3291 }
3292 }
3293 }
3294}
3295
3296fn handle_create_terminal(
3297 args: acp::CreateTerminalRequest,
3298 responder: Responder<acp::CreateTerminalResponse>,
3299 cx: &mut AsyncApp,
3300 ctx: &ClientContext,
3301) {
3302 let thread = match session_thread(ctx, &args.session_id) {
3303 Ok(t) => t,
3304 Err(e) => return respond_err(responder, e),
3305 };
3306 let project = match thread
3307 .read_with(cx, |thread, _cx| thread.project().clone())
3308 .map_err(acp::Error::from)
3309 {
3310 Ok(p) => p,
3311 Err(e) => return respond_err(responder, e),
3312 };
3313
3314 cx.spawn(async move |cx| {
3315 let result: Result<_, acp::Error> = async {
3316 let terminal_entity = acp_thread::create_terminal_entity(
3317 args.command.clone(),
3318 &args.args,
3319 args.env
3320 .into_iter()
3321 .map(|env| (env.name, env.value))
3322 .collect(),
3323 args.cwd.clone(),
3324 &project,
3325 cx,
3326 )
3327 .await?;
3328
3329 let terminal_entity = thread.update(cx, |thread, cx| {
3330 thread.register_terminal_created(
3331 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
3332 format!("{} {}", args.command, args.args.join(" ")),
3333 args.cwd.clone(),
3334 args.output_byte_limit,
3335 terminal_entity,
3336 cx,
3337 )
3338 })?;
3339 let terminal_id = terminal_entity.read_with(cx, |terminal, _| terminal.id().clone());
3340 Ok(terminal_id)
3341 }
3342 .await;
3343
3344 match result {
3345 Ok(terminal_id) => {
3346 responder
3347 .respond(acp::CreateTerminalResponse::new(terminal_id))
3348 .log_err();
3349 }
3350 Err(e) => respond_err(responder, e),
3351 }
3352 })
3353 .detach();
3354}
3355
3356fn handle_kill_terminal(
3357 args: acp::KillTerminalRequest,
3358 responder: Responder<acp::KillTerminalResponse>,
3359 cx: &mut AsyncApp,
3360 ctx: &ClientContext,
3361) {
3362 let thread = match session_thread(ctx, &args.session_id) {
3363 Ok(t) => t,
3364 Err(e) => return respond_err(responder, e),
3365 };
3366
3367 match thread
3368 .update(cx, |thread, cx| thread.kill_terminal(args.terminal_id, cx))
3369 .flatten_acp()
3370 {
3371 Ok(()) => {
3372 responder
3373 .respond(acp::KillTerminalResponse::default())
3374 .log_err();
3375 }
3376 Err(e) => respond_err(responder, e),
3377 }
3378}
3379
3380fn handle_release_terminal(
3381 args: acp::ReleaseTerminalRequest,
3382 responder: Responder<acp::ReleaseTerminalResponse>,
3383 cx: &mut AsyncApp,
3384 ctx: &ClientContext,
3385) {
3386 let thread = match session_thread(ctx, &args.session_id) {
3387 Ok(t) => t,
3388 Err(e) => return respond_err(responder, e),
3389 };
3390
3391 match thread
3392 .update(cx, |thread, cx| {
3393 thread.release_terminal(args.terminal_id, cx)
3394 })
3395 .flatten_acp()
3396 {
3397 Ok(()) => {
3398 responder
3399 .respond(acp::ReleaseTerminalResponse::default())
3400 .log_err();
3401 }
3402 Err(e) => respond_err(responder, e),
3403 }
3404}
3405
3406fn handle_terminal_output(
3407 args: acp::TerminalOutputRequest,
3408 responder: Responder<acp::TerminalOutputResponse>,
3409 cx: &mut AsyncApp,
3410 ctx: &ClientContext,
3411) {
3412 let thread = match session_thread(ctx, &args.session_id) {
3413 Ok(t) => t,
3414 Err(e) => return respond_err(responder, e),
3415 };
3416
3417 match thread
3418 .read_with(cx, |thread, cx| -> anyhow::Result<_> {
3419 let out = thread
3420 .terminal(args.terminal_id)?
3421 .read(cx)
3422 .current_output(cx);
3423 Ok(out)
3424 })
3425 .flatten_acp()
3426 {
3427 Ok(output) => {
3428 responder.respond(output).log_err();
3429 }
3430 Err(e) => respond_err(responder, e),
3431 }
3432}
3433
3434fn handle_wait_for_terminal_exit(
3435 args: acp::WaitForTerminalExitRequest,
3436 responder: Responder<acp::WaitForTerminalExitResponse>,
3437 cx: &mut AsyncApp,
3438 ctx: &ClientContext,
3439) {
3440 let thread = match session_thread(ctx, &args.session_id) {
3441 Ok(t) => t,
3442 Err(e) => return respond_err(responder, e),
3443 };
3444
3445 cx.spawn(async move |cx| {
3446 let result: Result<_, acp::Error> = async {
3447 let exit_status = thread
3448 .update(cx, |thread, cx| {
3449 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
3450 })
3451 .flatten_acp()?
3452 .await;
3453 Ok(exit_status)
3454 }
3455 .await;
3456
3457 match result {
3458 Ok(exit_status) => {
3459 responder
3460 .respond(acp::WaitForTerminalExitResponse::new(exit_status))
3461 .log_err();
3462 }
3463 Err(e) => respond_err(responder, e),
3464 }
3465 })
3466 .detach();
3467}