1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::{AcpConnectionRegistry, RawStreamLine, StreamMessageDirection};
6use action_log::ActionLog;
7use agent_client_protocol::schema::{self as acp, ErrorCode};
8use agent_client_protocol::{
9 Agent, Client, ConnectionTo, JsonRpcResponse, Lines, Responder, SentRequest,
10};
11use anyhow::anyhow;
12use collections::HashMap;
13use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
14use futures::channel::mpsc;
15use futures::io::BufReader;
16use futures::{AsyncBufReadExt as _, Future, StreamExt as _};
17use project::agent_server_store::AgentServerCommand;
18use project::{AgentId, Project};
19use serde::Deserialize;
20use settings::Settings as _;
21use task::{ShellBuilder, SpawnInTerminal};
22use util::ResultExt as _;
23use util::path_list::PathList;
24use util::process::Child;
25
26use std::path::PathBuf;
27use std::process::Stdio;
28use std::rc::Rc;
29use std::sync::Arc;
30use std::{any::Any, cell::RefCell};
31use thiserror::Error;
32
33use anyhow::{Context as _, Result};
34use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
35
36use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
37use terminal::TerminalBuilder;
38use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
39
40use crate::GEMINI_ID;
41
42pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
43
44/// Converts a [`SentRequest`] into a `Future` that can be safely awaited from
45/// the GPUI foreground thread.
46///
47/// Unlike [`SentRequest::block_task`], which is only safe inside
48/// [`ConnectionTo::spawn`] tasks, this uses [`SentRequest::on_receiving_result`]
49/// to bridge the response through a oneshot channel. The SDK callback is trivial
50/// (just a channel send), so it doesn't meaningfully block the dispatch loop.
51fn into_foreground_future<T: JsonRpcResponse + Send + 'static>(
52 sent: SentRequest<T>,
53) -> impl Future<Output = Result<T, acp::Error>> {
54 let (tx, rx) = futures::channel::oneshot::channel();
55 let spawn_result = sent.on_receiving_result(async move |result| {
56 tx.send(result).ok();
57 Ok(())
58 });
59 async move {
60 spawn_result?;
61 rx.await.map_err(|_| {
62 acp::Error::internal_error()
63 .data("response channel cancelled — connection may have dropped")
64 })?
65 }
66}
67
68#[derive(Debug, Error)]
69#[error("Unsupported version")]
70pub struct UnsupportedVersion;
71
72/// Holds state needed by foreground work dispatched from background handler closures.
73struct ClientContext {
74 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
75 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
76}
77
78/// Work items sent from `Send` handler closures to the `!Send` foreground thread.
79type ForegroundWork = Box<dyn FnOnce(&mut AsyncApp, &ClientContext) + Send>;
80
81pub struct AcpConnection {
82 id: AgentId,
83 telemetry_id: SharedString,
84 connection: ConnectionTo<Agent>,
85 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
86 auth_methods: Vec<acp::AuthMethod>,
87 command: AgentServerCommand,
88 agent_capabilities: acp::AgentCapabilities,
89 default_mode: Option<acp::SessionModeId>,
90 default_model: Option<acp::ModelId>,
91 default_config_options: HashMap<String, String>,
92 child: Child,
93 session_list: Option<Rc<AcpSessionList>>,
94 _io_task: Task<()>,
95 _dispatch_task: Task<()>,
96 _wait_task: Task<Result<()>>,
97 _stderr_task: Task<Result<()>>,
98}
99
100struct ConfigOptions {
101 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
102 tx: Rc<RefCell<watch::Sender<()>>>,
103 rx: watch::Receiver<()>,
104}
105
106impl ConfigOptions {
107 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
108 let (tx, rx) = watch::channel(());
109 Self {
110 config_options,
111 tx: Rc::new(RefCell::new(tx)),
112 rx,
113 }
114 }
115}
116
117pub struct AcpSession {
118 thread: WeakEntity<AcpThread>,
119 suppress_abort_err: bool,
120 models: Option<Rc<RefCell<acp::SessionModelState>>>,
121 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
122 config_options: Option<ConfigOptions>,
123}
124
125pub struct AcpSessionList {
126 connection: ConnectionTo<Agent>,
127 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
128 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
129}
130
131impl AcpSessionList {
132 fn new(connection: ConnectionTo<Agent>) -> Self {
133 let (tx, rx) = smol::channel::unbounded();
134 Self {
135 connection,
136 updates_tx: tx,
137 updates_rx: rx,
138 }
139 }
140
141 fn notify_update(&self) {
142 self.updates_tx
143 .try_send(acp_thread::SessionListUpdate::Refresh)
144 .log_err();
145 }
146
147 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
148 self.updates_tx
149 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
150 .log_err();
151 }
152}
153
154impl AgentSessionList for AcpSessionList {
155 fn list_sessions(
156 &self,
157 request: AgentSessionListRequest,
158 cx: &mut App,
159 ) -> Task<Result<AgentSessionListResponse>> {
160 let conn = self.connection.clone();
161 cx.foreground_executor().spawn(async move {
162 let acp_request = acp::ListSessionsRequest::new()
163 .cwd(request.cwd)
164 .cursor(request.cursor);
165 let response = into_foreground_future(conn.send_request(acp_request))
166 .await
167 .map_err(map_acp_error)?;
168 Ok(AgentSessionListResponse {
169 sessions: response
170 .sessions
171 .into_iter()
172 .map(|s| AgentSessionInfo {
173 session_id: s.session_id,
174 work_dirs: Some(PathList::new(&[s.cwd])),
175 title: s.title.map(Into::into),
176 updated_at: s.updated_at.and_then(|date_str| {
177 chrono::DateTime::parse_from_rfc3339(&date_str)
178 .ok()
179 .map(|dt| dt.with_timezone(&chrono::Utc))
180 }),
181 created_at: None,
182 meta: s.meta,
183 })
184 .collect(),
185 next_cursor: response.next_cursor,
186 meta: response.meta,
187 })
188 })
189 }
190
191 fn watch(
192 &self,
193 _cx: &mut App,
194 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
195 Some(self.updates_rx.clone())
196 }
197
198 fn notify_refresh(&self) {
199 self.notify_update();
200 }
201
202 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
203 self
204 }
205}
206
207pub async fn connect(
208 agent_id: AgentId,
209 project: Entity<Project>,
210 command: AgentServerCommand,
211 default_mode: Option<acp::SessionModeId>,
212 default_model: Option<acp::ModelId>,
213 default_config_options: HashMap<String, String>,
214 cx: &mut AsyncApp,
215) -> Result<Rc<dyn AgentConnection>> {
216 let conn = AcpConnection::stdio(
217 agent_id,
218 project,
219 command.clone(),
220 default_mode,
221 default_model,
222 default_config_options,
223 cx,
224 )
225 .await?;
226 Ok(Rc::new(conn) as _)
227}
228
229const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
230
231macro_rules! dispatch_request_handler {
232 ($dispatch_tx:expr, $handler:expr) => {{
233 let dispatch_tx = $dispatch_tx.clone();
234 async move |args, responder, _connection| {
235 dispatch_tx
236 .unbounded_send(Box::new(move |cx, ctx| {
237 $handler(args, responder, cx, ctx);
238 }))
239 .log_err();
240 Ok(())
241 }
242 }};
243}
244
245macro_rules! dispatch_notification_handler {
246 ($dispatch_tx:expr, $handler:expr) => {{
247 let dispatch_tx = $dispatch_tx.clone();
248 async move |notification, _connection| {
249 dispatch_tx
250 .unbounded_send(Box::new(move |cx, ctx| {
251 $handler(notification, cx, ctx);
252 }))
253 .log_err();
254 Ok(())
255 }
256 }};
257}
258
259impl AcpConnection {
260 pub async fn stdio(
261 agent_id: AgentId,
262 project: Entity<Project>,
263 command: AgentServerCommand,
264 default_mode: Option<acp::SessionModeId>,
265 default_model: Option<acp::ModelId>,
266 default_config_options: HashMap<String, String>,
267 cx: &mut AsyncApp,
268 ) -> Result<Self> {
269 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
270 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
271 let mut child =
272 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
273 child.envs(command.env.iter().flatten());
274 if let Some(cwd) = project.update(cx, |project, cx| {
275 if project.is_local() {
276 project
277 .default_path_list(cx)
278 .ordered_paths()
279 .next()
280 .cloned()
281 } else {
282 None
283 }
284 }) {
285 child.current_dir(cwd);
286 }
287 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
288
289 let stdout = child.stdout.take().context("Failed to take stdout")?;
290 let stdin = child.stdin.take().context("Failed to take stdin")?;
291 let stderr = child.stderr.take().context("Failed to take stderr")?;
292 log::debug!(
293 "Spawning external agent server: {:?}, {:?}",
294 command.path,
295 command.args
296 );
297 log::trace!("Spawned (pid: {})", child.id());
298
299 let sessions = Rc::new(RefCell::new(HashMap::default()));
300
301 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
302 (
303 release_channel::ReleaseChannel::try_global(cx)
304 .map(|release_channel| release_channel.display_name()),
305 release_channel::AppVersion::global(cx).to_string(),
306 )
307 });
308
309 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
310 Rc::new(RefCell::new(None));
311
312 // Set up the foreground dispatch channel for bridging Send handler
313 // closures to the !Send foreground thread.
314 let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
315
316 // Build a tapped transport that intercepts raw JSON-RPC lines for
317 // the ACP logs panel. Raw lines are sent without parsing — deserialization
318 // is deferred until a subscriber is actually listening.
319 let (stream_tap_tx, stream_tap_rx) = smol::channel::unbounded::<RawStreamLine>();
320
321 let incoming_lines = futures::io::BufReader::new(stdout).lines();
322 let tapped_incoming = incoming_lines.inspect({
323 let tap_tx = stream_tap_tx.clone();
324 move |result| {
325 if let Ok(line) = result {
326 tap_tx
327 .try_send(RawStreamLine {
328 direction: StreamMessageDirection::Incoming,
329 line: Arc::from(line.as_str()),
330 })
331 .log_err();
332 }
333 }
334 });
335
336 let tapped_outgoing = futures::sink::unfold(
337 (Box::pin(stdin), stream_tap_tx),
338 async move |(mut writer, tap_tx), line: String| {
339 use futures::AsyncWriteExt;
340 tap_tx
341 .try_send(RawStreamLine {
342 direction: StreamMessageDirection::Outgoing,
343 line: Arc::from(line.as_str()),
344 })
345 .log_err();
346 let mut bytes = line.into_bytes();
347 bytes.push(b'\n');
348 writer.write_all(&bytes).await?;
349 Ok::<_, std::io::Error>((writer, tap_tx))
350 },
351 );
352
353 let transport = Lines::new(tapped_outgoing, tapped_incoming);
354
355 // Use a oneshot channel to extract the ConnectionTo<Agent> from the
356 // connect_with closure.
357 let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
358
359 // Build the connection with handler closures.
360 // All handlers forward work to the foreground dispatch channel.
361 let connection_future = {
362 let dispatch_tx = dispatch_tx.clone();
363 Client
364 .builder()
365 .name("zed")
366 // --- Request handlers (agent→client) ---
367 .on_receive_request(
368 dispatch_request_handler!(dispatch_tx, handle_request_permission),
369 agent_client_protocol::on_receive_request!(),
370 )
371 .on_receive_request(
372 dispatch_request_handler!(dispatch_tx, handle_write_text_file),
373 agent_client_protocol::on_receive_request!(),
374 )
375 .on_receive_request(
376 dispatch_request_handler!(dispatch_tx, handle_read_text_file),
377 agent_client_protocol::on_receive_request!(),
378 )
379 .on_receive_request(
380 dispatch_request_handler!(dispatch_tx, handle_create_terminal),
381 agent_client_protocol::on_receive_request!(),
382 )
383 .on_receive_request(
384 dispatch_request_handler!(dispatch_tx, handle_kill_terminal),
385 agent_client_protocol::on_receive_request!(),
386 )
387 .on_receive_request(
388 dispatch_request_handler!(dispatch_tx, handle_release_terminal),
389 agent_client_protocol::on_receive_request!(),
390 )
391 .on_receive_request(
392 dispatch_request_handler!(dispatch_tx, handle_terminal_output),
393 agent_client_protocol::on_receive_request!(),
394 )
395 .on_receive_request(
396 dispatch_request_handler!(dispatch_tx, handle_wait_for_terminal_exit),
397 agent_client_protocol::on_receive_request!(),
398 )
399 // --- Notification handlers (agent→client) ---
400 .on_receive_notification(
401 dispatch_notification_handler!(dispatch_tx, handle_session_notification),
402 agent_client_protocol::on_receive_notification!(),
403 )
404 .connect_with(
405 transport,
406 move |connection: ConnectionTo<Agent>| async move {
407 if connection_tx.send(connection.clone()).is_err() {
408 log::error!(
409 "failed to send ACP connection handle — receiver was dropped"
410 );
411 }
412 // Keep the connection alive until the transport closes.
413 futures::future::pending::<Result<(), acp::Error>>().await
414 },
415 )
416 };
417
418 // Spawn the connection loop on a background thread.
419 let io_task = cx.background_spawn(async move {
420 if let Err(err) = connection_future.await {
421 log::error!("ACP connection error: {err}");
422 }
423 });
424
425 // Wait for the ConnectionTo<Agent> handle.
426 let connection: ConnectionTo<Agent> = connection_rx
427 .await
428 .context("Failed to receive ACP connection handle")?;
429
430 // Set up the foreground dispatch loop to process work items from handlers.
431 let dispatch_context = ClientContext {
432 sessions: sessions.clone(),
433 session_list: client_session_list.clone(),
434 };
435 let dispatch_task = cx.spawn({
436 let mut dispatch_rx = dispatch_rx;
437 async move |cx| {
438 while let Some(work) = dispatch_rx.next().await {
439 work(cx, &dispatch_context);
440 }
441 }
442 });
443
444 let stderr_task = cx.background_spawn(async move {
445 let mut stderr = BufReader::new(stderr);
446 let mut line = String::new();
447 while let Ok(n) = stderr.read_line(&mut line).await
448 && n > 0
449 {
450 log::warn!("agent stderr: {}", line.trim());
451 line.clear();
452 }
453 Ok(())
454 });
455
456 let wait_task = cx.spawn({
457 let sessions = sessions.clone();
458 let status_fut = child.status();
459 async move |cx| {
460 let status = status_fut.await?;
461
462 for session in sessions.borrow().values() {
463 session
464 .thread
465 .update(cx, |thread, cx| {
466 thread.emit_load_error(LoadError::Exited { status }, cx)
467 })
468 .ok();
469 }
470
471 anyhow::Ok(())
472 }
473 });
474
475 cx.update(|cx| {
476 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
477 registry.set_active_connection(agent_id.clone(), stream_tap_rx, cx)
478 });
479 });
480
481 let response = into_foreground_future(
482 connection.send_request(
483 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
484 .client_capabilities(
485 acp::ClientCapabilities::new()
486 .fs(acp::FileSystemCapabilities::new()
487 .read_text_file(true)
488 .write_text_file(true))
489 .terminal(true)
490 .auth(acp::AuthCapabilities::new().terminal(true))
491 .meta(acp::Meta::from_iter([
492 ("terminal_output".into(), true.into()),
493 ("terminal-auth".into(), true.into()),
494 ])),
495 )
496 .client_info(
497 acp::Implementation::new("zed", version)
498 .title(release_channel.map(ToOwned::to_owned)),
499 ),
500 ),
501 )
502 .await?;
503
504 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
505 return Err(UnsupportedVersion.into());
506 }
507
508 let telemetry_id = response
509 .agent_info
510 // Use the one the agent provides if we have one
511 .map(|info| info.name.into())
512 // Otherwise, just use the name
513 .unwrap_or_else(|| agent_id.0.to_string().into());
514
515 let session_list = if response
516 .agent_capabilities
517 .session_capabilities
518 .list
519 .is_some()
520 {
521 let list = Rc::new(AcpSessionList::new(connection.clone()));
522 *client_session_list.borrow_mut() = Some(list.clone());
523 Some(list)
524 } else {
525 None
526 };
527
528 // TODO: Remove this override once Google team releases their official auth methods
529 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
530 let mut args = command.args.clone();
531 args.retain(|a| a != "--experimental-acp" && a != "--acp");
532 let value = serde_json::json!({
533 "label": "gemini /auth",
534 "command": command.path.to_string_lossy().into_owned(),
535 "args": args,
536 "env": command.env.clone().unwrap_or_default(),
537 });
538 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
539 vec![acp::AuthMethod::Agent(
540 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
541 .description("Login with your Google or Vertex AI account")
542 .meta(meta),
543 )]
544 } else {
545 response.auth_methods
546 };
547 Ok(Self {
548 id: agent_id,
549 auth_methods,
550 command,
551 connection,
552 telemetry_id,
553 sessions,
554 agent_capabilities: response.agent_capabilities,
555 default_mode,
556 default_model,
557 default_config_options,
558 session_list,
559 _io_task: io_task,
560 _dispatch_task: dispatch_task,
561 _wait_task: wait_task,
562 _stderr_task: stderr_task,
563 child,
564 })
565 }
566
567 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
568 &self.agent_capabilities.prompt_capabilities
569 }
570
571 fn apply_default_config_options(
572 &self,
573 session_id: &acp::SessionId,
574 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
575 cx: &mut AsyncApp,
576 ) {
577 let id = self.id.clone();
578 let defaults_to_apply: Vec<_> = {
579 let config_opts_ref = config_options.borrow();
580 config_opts_ref
581 .iter()
582 .filter_map(|config_option| {
583 let default_value = self.default_config_options.get(&*config_option.id.0)?;
584
585 let is_valid = match &config_option.kind {
586 acp::SessionConfigKind::Select(select) => match &select.options {
587 acp::SessionConfigSelectOptions::Ungrouped(options) => options
588 .iter()
589 .any(|opt| &*opt.value.0 == default_value.as_str()),
590 acp::SessionConfigSelectOptions::Grouped(groups) => {
591 groups.iter().any(|g| {
592 g.options
593 .iter()
594 .any(|opt| &*opt.value.0 == default_value.as_str())
595 })
596 }
597 _ => false,
598 },
599 _ => false,
600 };
601
602 if is_valid {
603 let initial_value = match &config_option.kind {
604 acp::SessionConfigKind::Select(select) => {
605 Some(select.current_value.clone())
606 }
607 _ => None,
608 };
609 Some((
610 config_option.id.clone(),
611 default_value.clone(),
612 initial_value,
613 ))
614 } else {
615 log::warn!(
616 "`{}` is not a valid value for config option `{}` in {}",
617 default_value,
618 config_option.id.0,
619 id
620 );
621 None
622 }
623 })
624 .collect()
625 };
626
627 for (config_id, default_value, initial_value) in defaults_to_apply {
628 cx.spawn({
629 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
630 let session_id = session_id.clone();
631 let config_id_clone = config_id.clone();
632 let config_opts = config_options.clone();
633 let conn = self.connection.clone();
634 async move |_| {
635 let result = into_foreground_future(conn.send_request(
636 acp::SetSessionConfigOptionRequest::new(
637 session_id,
638 config_id_clone.clone(),
639 default_value_id,
640 ),
641 ))
642 .await
643 .log_err();
644
645 if result.is_none() {
646 if let Some(initial) = initial_value {
647 let mut opts = config_opts.borrow_mut();
648 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
649 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
650 select.current_value = initial;
651 }
652 }
653 }
654 }
655 }
656 })
657 .detach();
658
659 let mut opts = config_options.borrow_mut();
660 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
661 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
662 select.current_value = acp::SessionConfigValueId::new(default_value);
663 }
664 }
665 }
666 }
667}
668
669impl Drop for AcpConnection {
670 fn drop(&mut self) {
671 self.child.kill().log_err();
672 }
673}
674
675fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
676 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
677}
678
679fn terminal_auth_task(
680 command: &AgentServerCommand,
681 agent_id: &AgentId,
682 method: &acp::AuthMethodTerminal,
683) -> SpawnInTerminal {
684 let mut args = command.args.clone();
685 args.extend(method.args.clone());
686
687 let mut env = command.env.clone().unwrap_or_default();
688 env.extend(method.env.clone());
689
690 acp_thread::build_terminal_auth_task(
691 terminal_auth_task_id(agent_id, &method.id),
692 method.name.clone(),
693 command.path.to_string_lossy().into_owned(),
694 args,
695 env,
696 )
697}
698
699/// Used to support the _meta method prior to stabilization
700fn meta_terminal_auth_task(
701 agent_id: &AgentId,
702 method_id: &acp::AuthMethodId,
703 method: &acp::AuthMethod,
704) -> Option<SpawnInTerminal> {
705 #[derive(Deserialize)]
706 struct MetaTerminalAuth {
707 label: String,
708 command: String,
709 #[serde(default)]
710 args: Vec<String>,
711 #[serde(default)]
712 env: HashMap<String, String>,
713 }
714
715 let meta = match method {
716 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
717 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
718 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
719 _ => None,
720 }?;
721 let terminal_auth =
722 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
723
724 Some(acp_thread::build_terminal_auth_task(
725 terminal_auth_task_id(agent_id, method_id),
726 terminal_auth.label.clone(),
727 terminal_auth.command,
728 terminal_auth.args,
729 terminal_auth.env,
730 ))
731}
732
733impl AgentConnection for AcpConnection {
734 fn agent_id(&self) -> AgentId {
735 self.id.clone()
736 }
737
738 fn telemetry_id(&self) -> SharedString {
739 self.telemetry_id.clone()
740 }
741
742 fn new_session(
743 self: Rc<Self>,
744 project: Entity<Project>,
745 work_dirs: PathList,
746 cx: &mut App,
747 ) -> Task<Result<Entity<AcpThread>>> {
748 // TODO: remove this once ACP supports multiple working directories
749 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
750 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
751 };
752 let name = self.id.0.clone();
753 let mcp_servers = mcp_servers_for_project(&project, cx);
754
755 cx.spawn(async move |cx| {
756 let response = into_foreground_future(
757 self.connection
758 .send_request(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers)),
759 )
760 .await
761 .map_err(map_acp_error)?;
762
763 let (modes, models, config_options) =
764 config_state(response.modes, response.models, response.config_options);
765
766 if let Some(default_mode) = self.default_mode.clone() {
767 if let Some(modes) = modes.as_ref() {
768 let mut modes_ref = modes.borrow_mut();
769 let has_mode = modes_ref
770 .available_modes
771 .iter()
772 .any(|mode| mode.id == default_mode);
773
774 if has_mode {
775 let initial_mode_id = modes_ref.current_mode_id.clone();
776
777 cx.spawn({
778 let default_mode = default_mode.clone();
779 let session_id = response.session_id.clone();
780 let modes = modes.clone();
781 let conn = self.connection.clone();
782 async move |_| {
783 let result = into_foreground_future(
784 conn.send_request(acp::SetSessionModeRequest::new(
785 session_id,
786 default_mode,
787 )),
788 )
789 .await
790 .log_err();
791
792 if result.is_none() {
793 modes.borrow_mut().current_mode_id = initial_mode_id;
794 }
795 }
796 })
797 .detach();
798
799 modes_ref.current_mode_id = default_mode;
800 } else {
801 let available_modes = modes_ref
802 .available_modes
803 .iter()
804 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
805 .collect::<Vec<_>>()
806 .join("\n");
807
808 log::warn!(
809 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
810 );
811 }
812 }
813 }
814
815 if let Some(default_model) = self.default_model.clone() {
816 if let Some(models) = models.as_ref() {
817 let mut models_ref = models.borrow_mut();
818 let has_model = models_ref
819 .available_models
820 .iter()
821 .any(|model| model.model_id == default_model);
822
823 if has_model {
824 let initial_model_id = models_ref.current_model_id.clone();
825
826 cx.spawn({
827 let default_model = default_model.clone();
828 let session_id = response.session_id.clone();
829 let models = models.clone();
830 let conn = self.connection.clone();
831 async move |_| {
832 let result = into_foreground_future(
833 conn.send_request(acp::SetSessionModelRequest::new(
834 session_id,
835 default_model,
836 )),
837 )
838 .await
839 .log_err();
840
841 if result.is_none() {
842 models.borrow_mut().current_model_id = initial_model_id;
843 }
844 }
845 })
846 .detach();
847
848 models_ref.current_model_id = default_model;
849 } else {
850 let available_models = models_ref
851 .available_models
852 .iter()
853 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
854 .collect::<Vec<_>>()
855 .join("\n");
856
857 log::warn!(
858 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
859 );
860 }
861 }
862 }
863
864 if let Some(config_opts) = config_options.as_ref() {
865 self.apply_default_config_options(&response.session_id, config_opts, cx);
866 }
867
868 let action_log = cx.new(|_| ActionLog::new(project.clone()));
869 let thread: Entity<AcpThread> = cx.new(|cx| {
870 AcpThread::new(
871 None,
872 None,
873 Some(work_dirs),
874 self.clone(),
875 project,
876 action_log,
877 response.session_id.clone(),
878 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
879 watch::Receiver::constant(
880 self.agent_capabilities.prompt_capabilities.clone(),
881 ),
882 cx,
883 )
884 });
885
886 self.sessions.borrow_mut().insert(
887 response.session_id,
888 AcpSession {
889 thread: thread.downgrade(),
890 suppress_abort_err: false,
891 session_modes: modes,
892 models,
893 config_options: config_options.map(ConfigOptions::new),
894 },
895 );
896
897 Ok(thread)
898 })
899 }
900
901 fn supports_load_session(&self) -> bool {
902 self.agent_capabilities.load_session
903 }
904
905 fn supports_resume_session(&self) -> bool {
906 self.agent_capabilities
907 .session_capabilities
908 .resume
909 .is_some()
910 }
911
912 fn load_session(
913 self: Rc<Self>,
914 session_id: acp::SessionId,
915 project: Entity<Project>,
916 work_dirs: PathList,
917 title: Option<SharedString>,
918 cx: &mut App,
919 ) -> Task<Result<Entity<AcpThread>>> {
920 if !self.agent_capabilities.load_session {
921 return Task::ready(Err(anyhow!(LoadError::Other(
922 "Loading sessions is not supported by this agent.".into()
923 ))));
924 }
925 // TODO: remove this once ACP supports multiple working directories
926 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
927 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
928 };
929
930 let mcp_servers = mcp_servers_for_project(&project, cx);
931 let action_log = cx.new(|_| ActionLog::new(project.clone()));
932 let thread: Entity<AcpThread> = cx.new(|cx| {
933 AcpThread::new(
934 None,
935 title,
936 Some(work_dirs.clone()),
937 self.clone(),
938 project,
939 action_log,
940 session_id.clone(),
941 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
942 cx,
943 )
944 });
945
946 self.sessions.borrow_mut().insert(
947 session_id.clone(),
948 AcpSession {
949 thread: thread.downgrade(),
950 suppress_abort_err: false,
951 session_modes: None,
952 models: None,
953 config_options: None,
954 },
955 );
956
957 cx.spawn(async move |cx| {
958 let response = match into_foreground_future(self.connection.send_request(
959 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
960 ))
961 .await
962 {
963 Ok(response) => response,
964 Err(err) => {
965 self.sessions.borrow_mut().remove(&session_id);
966 return Err(map_acp_error(err));
967 }
968 };
969
970 let (modes, models, config_options) =
971 config_state(response.modes, response.models, response.config_options);
972
973 if let Some(config_opts) = config_options.as_ref() {
974 self.apply_default_config_options(&session_id, config_opts, cx);
975 }
976
977 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
978 session.session_modes = modes;
979 session.models = models;
980 session.config_options = config_options.map(ConfigOptions::new);
981 }
982
983 Ok(thread)
984 })
985 }
986
987 fn resume_session(
988 self: Rc<Self>,
989 session_id: acp::SessionId,
990 project: Entity<Project>,
991 work_dirs: PathList,
992 title: Option<SharedString>,
993 cx: &mut App,
994 ) -> Task<Result<Entity<AcpThread>>> {
995 if self
996 .agent_capabilities
997 .session_capabilities
998 .resume
999 .is_none()
1000 {
1001 return Task::ready(Err(anyhow!(LoadError::Other(
1002 "Resuming sessions is not supported by this agent.".into()
1003 ))));
1004 }
1005 // TODO: remove this once ACP supports multiple working directories
1006 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
1007 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
1008 };
1009
1010 let mcp_servers = mcp_servers_for_project(&project, cx);
1011 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1012 let thread: Entity<AcpThread> = cx.new(|cx| {
1013 AcpThread::new(
1014 None,
1015 title,
1016 Some(work_dirs),
1017 self.clone(),
1018 project,
1019 action_log,
1020 session_id.clone(),
1021 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
1022 cx,
1023 )
1024 });
1025
1026 self.sessions.borrow_mut().insert(
1027 session_id.clone(),
1028 AcpSession {
1029 thread: thread.downgrade(),
1030 suppress_abort_err: false,
1031 session_modes: None,
1032 models: None,
1033 config_options: None,
1034 },
1035 );
1036
1037 cx.spawn(async move |cx| {
1038 let response = match into_foreground_future(self.connection.send_request(
1039 acp::ResumeSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
1040 ))
1041 .await
1042 {
1043 Ok(response) => response,
1044 Err(err) => {
1045 self.sessions.borrow_mut().remove(&session_id);
1046 return Err(map_acp_error(err));
1047 }
1048 };
1049
1050 let (modes, models, config_options) =
1051 config_state(response.modes, response.models, response.config_options);
1052
1053 if let Some(config_opts) = config_options.as_ref() {
1054 self.apply_default_config_options(&session_id, config_opts, cx);
1055 }
1056
1057 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
1058 session.session_modes = modes;
1059 session.models = models;
1060 session.config_options = config_options.map(ConfigOptions::new);
1061 }
1062
1063 Ok(thread)
1064 })
1065 }
1066
1067 fn supports_close_session(&self) -> bool {
1068 self.agent_capabilities.session_capabilities.close.is_some()
1069 }
1070
1071 fn close_session(
1072 self: Rc<Self>,
1073 session_id: &acp::SessionId,
1074 cx: &mut App,
1075 ) -> Task<Result<()>> {
1076 if !self.supports_close_session() {
1077 return Task::ready(Err(anyhow!(LoadError::Other(
1078 "Closing sessions is not supported by this agent.".into()
1079 ))));
1080 }
1081
1082 let conn = self.connection.clone();
1083 let session_id = session_id.clone();
1084 cx.foreground_executor().spawn(async move {
1085 into_foreground_future(
1086 conn.send_request(acp::CloseSessionRequest::new(session_id.clone())),
1087 )
1088 .await?;
1089 self.sessions.borrow_mut().remove(&session_id);
1090 Ok(())
1091 })
1092 }
1093
1094 fn auth_methods(&self) -> &[acp::AuthMethod] {
1095 &self.auth_methods
1096 }
1097
1098 fn terminal_auth_task(
1099 &self,
1100 method_id: &acp::AuthMethodId,
1101 cx: &App,
1102 ) -> Option<SpawnInTerminal> {
1103 let method = self
1104 .auth_methods
1105 .iter()
1106 .find(|method| method.id() == method_id)?;
1107
1108 match method {
1109 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1110 Some(terminal_auth_task(&self.command, &self.id, terminal))
1111 }
1112 _ => meta_terminal_auth_task(&self.id, method_id, method),
1113 }
1114 }
1115
1116 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1117 let conn = self.connection.clone();
1118 cx.foreground_executor().spawn(async move {
1119 into_foreground_future(conn.send_request(acp::AuthenticateRequest::new(method_id)))
1120 .await?;
1121 Ok(())
1122 })
1123 }
1124
1125 fn prompt(
1126 &self,
1127 _id: Option<acp_thread::UserMessageId>,
1128 params: acp::PromptRequest,
1129 cx: &mut App,
1130 ) -> Task<Result<acp::PromptResponse>> {
1131 let conn = self.connection.clone();
1132 let sessions = self.sessions.clone();
1133 let session_id = params.session_id.clone();
1134 cx.foreground_executor().spawn(async move {
1135 let result = into_foreground_future(conn.send_request(params)).await;
1136
1137 let mut suppress_abort_err = false;
1138
1139 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1140 suppress_abort_err = session.suppress_abort_err;
1141 session.suppress_abort_err = false;
1142 }
1143
1144 match result {
1145 Ok(response) => Ok(response),
1146 Err(err) => {
1147 if err.code == acp::ErrorCode::AuthRequired {
1148 return Err(anyhow!(acp::Error::auth_required()));
1149 }
1150
1151 if err.code != ErrorCode::InternalError {
1152 anyhow::bail!(err)
1153 }
1154
1155 let Some(data) = &err.data else {
1156 anyhow::bail!(err)
1157 };
1158
1159 // Temporary workaround until the following PR is generally available:
1160 // https://github.com/google-gemini/gemini-cli/pull/6656
1161
1162 #[derive(Deserialize)]
1163 #[serde(deny_unknown_fields)]
1164 struct ErrorDetails {
1165 details: Box<str>,
1166 }
1167
1168 match serde_json::from_value(data.clone()) {
1169 Ok(ErrorDetails { details }) => {
1170 if suppress_abort_err
1171 && (details.contains("This operation was aborted")
1172 || details.contains("The user aborted a request"))
1173 {
1174 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1175 } else {
1176 Err(anyhow!(details))
1177 }
1178 }
1179 Err(_) => Err(anyhow!(err)),
1180 }
1181 }
1182 }
1183 })
1184 }
1185
1186 fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) {
1187 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1188 session.suppress_abort_err = true;
1189 }
1190 let params = acp::CancelNotification::new(session_id.clone());
1191 self.connection.send_notification(params).log_err();
1192 }
1193
1194 fn session_modes(
1195 &self,
1196 session_id: &acp::SessionId,
1197 _cx: &App,
1198 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1199 let sessions = self.sessions.clone();
1200 let sessions_ref = sessions.borrow();
1201 let Some(session) = sessions_ref.get(session_id) else {
1202 return None;
1203 };
1204
1205 if let Some(modes) = session.session_modes.as_ref() {
1206 Some(Rc::new(AcpSessionModes {
1207 connection: self.connection.clone(),
1208 session_id: session_id.clone(),
1209 state: modes.clone(),
1210 }) as _)
1211 } else {
1212 None
1213 }
1214 }
1215
1216 fn model_selector(
1217 &self,
1218 session_id: &acp::SessionId,
1219 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1220 let sessions = self.sessions.clone();
1221 let sessions_ref = sessions.borrow();
1222 let Some(session) = sessions_ref.get(session_id) else {
1223 return None;
1224 };
1225
1226 if let Some(models) = session.models.as_ref() {
1227 Some(Rc::new(AcpModelSelector::new(
1228 session_id.clone(),
1229 self.connection.clone(),
1230 models.clone(),
1231 )) as _)
1232 } else {
1233 None
1234 }
1235 }
1236
1237 fn session_config_options(
1238 &self,
1239 session_id: &acp::SessionId,
1240 _cx: &App,
1241 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1242 let sessions = self.sessions.borrow();
1243 let session = sessions.get(session_id)?;
1244
1245 let config_opts = session.config_options.as_ref()?;
1246
1247 Some(Rc::new(AcpSessionConfigOptions {
1248 session_id: session_id.clone(),
1249 connection: self.connection.clone(),
1250 state: config_opts.config_options.clone(),
1251 watch_tx: config_opts.tx.clone(),
1252 watch_rx: config_opts.rx.clone(),
1253 }) as _)
1254 }
1255
1256 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1257 self.session_list.clone().map(|s| s as _)
1258 }
1259
1260 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1261 self
1262 }
1263}
1264
1265fn map_acp_error(err: acp::Error) -> anyhow::Error {
1266 if err.code == acp::ErrorCode::AuthRequired {
1267 let mut error = AuthRequired::new();
1268
1269 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1270 error = error.with_description(err.message);
1271 }
1272
1273 anyhow!(error)
1274 } else {
1275 anyhow!(err)
1276 }
1277}
1278
1279#[cfg(test)]
1280mod tests {
1281 use super::*;
1282
1283 #[test]
1284 fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1285 let command = AgentServerCommand {
1286 path: "/path/to/agent".into(),
1287 args: vec!["--acp".into(), "--verbose".into()],
1288 env: Some(HashMap::from_iter([
1289 ("BASE".into(), "1".into()),
1290 ("SHARED".into(), "base".into()),
1291 ])),
1292 };
1293 let method = acp::AuthMethodTerminal::new("login", "Login")
1294 .args(vec!["/auth".into()])
1295 .env(std::collections::HashMap::from_iter([
1296 ("EXTRA".into(), "2".into()),
1297 ("SHARED".into(), "override".into()),
1298 ]));
1299
1300 let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1301
1302 assert_eq!(
1303 terminal_auth_task.command.as_deref(),
1304 Some("/path/to/agent")
1305 );
1306 assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1307 assert_eq!(
1308 terminal_auth_task.env,
1309 HashMap::from_iter([
1310 ("BASE".into(), "1".into()),
1311 ("SHARED".into(), "override".into()),
1312 ("EXTRA".into(), "2".into()),
1313 ])
1314 );
1315 assert_eq!(terminal_auth_task.label, "Login");
1316 assert_eq!(terminal_auth_task.command_label, "Login");
1317 }
1318
1319 #[test]
1320 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1321 let method_id = acp::AuthMethodId::new("legacy-login");
1322 let method = acp::AuthMethod::Agent(
1323 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1324 "terminal-auth".to_string(),
1325 serde_json::json!({
1326 "label": "legacy /auth",
1327 "command": "legacy-agent",
1328 "args": ["auth", "--interactive"],
1329 "env": {
1330 "AUTH_MODE": "interactive",
1331 },
1332 }),
1333 )])),
1334 );
1335
1336 let terminal_auth_task =
1337 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1338 .expect("expected legacy terminal auth task");
1339
1340 assert_eq!(
1341 terminal_auth_task.id.0,
1342 "external-agent-test-agent-legacy-login-login"
1343 );
1344 assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1345 assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1346 assert_eq!(
1347 terminal_auth_task.env,
1348 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1349 );
1350 assert_eq!(terminal_auth_task.label, "legacy /auth");
1351 }
1352
1353 #[test]
1354 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1355 let method_id = acp::AuthMethodId::new("legacy-login");
1356 let method = acp::AuthMethod::Agent(
1357 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1358 "terminal-auth".to_string(),
1359 serde_json::json!({
1360 "label": "legacy /auth",
1361 }),
1362 )])),
1363 );
1364
1365 assert!(
1366 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1367 );
1368 }
1369
1370 #[test]
1371 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1372 let method_id = acp::AuthMethodId::new("login");
1373 let method = acp::AuthMethod::Terminal(
1374 acp::AuthMethodTerminal::new(method_id, "Login")
1375 .args(vec!["/auth".into()])
1376 .env(std::collections::HashMap::from_iter([(
1377 "AUTH_MODE".into(),
1378 "first-class".into(),
1379 )]))
1380 .meta(acp::Meta::from_iter([(
1381 "terminal-auth".to_string(),
1382 serde_json::json!({
1383 "label": "legacy /auth",
1384 "command": "legacy-agent",
1385 "args": ["legacy-auth"],
1386 "env": {
1387 "AUTH_MODE": "legacy",
1388 },
1389 }),
1390 )])),
1391 );
1392
1393 let command = AgentServerCommand {
1394 path: "/path/to/agent".into(),
1395 args: vec!["--acp".into()],
1396 env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1397 };
1398
1399 let terminal_auth_task = match &method {
1400 acp::AuthMethod::Terminal(terminal) => {
1401 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1402 }
1403 _ => unreachable!(),
1404 };
1405
1406 assert_eq!(
1407 terminal_auth_task.command.as_deref(),
1408 Some("/path/to/agent")
1409 );
1410 assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1411 assert_eq!(
1412 terminal_auth_task.env,
1413 HashMap::from_iter([
1414 ("BASE".into(), "1".into()),
1415 ("AUTH_MODE".into(), "first-class".into()),
1416 ])
1417 );
1418 assert_eq!(terminal_auth_task.label, "Login");
1419 }
1420}
1421
1422fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1423 let context_server_store = project.read(cx).context_server_store().read(cx);
1424 let is_local = project.read(cx).is_local();
1425 context_server_store
1426 .configured_server_ids()
1427 .iter()
1428 .filter_map(|id| {
1429 let configuration = context_server_store.configuration_for_server(id)?;
1430 match &*configuration {
1431 project::context_server_store::ContextServerConfiguration::Custom {
1432 command,
1433 remote,
1434 ..
1435 }
1436 | project::context_server_store::ContextServerConfiguration::Extension {
1437 command,
1438 remote,
1439 ..
1440 } if is_local || *remote => Some(acp::McpServer::Stdio(
1441 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1442 .args(command.args.clone())
1443 .env(if let Some(env) = command.env.as_ref() {
1444 env.iter()
1445 .map(|(name, value)| acp::EnvVariable::new(name, value))
1446 .collect()
1447 } else {
1448 vec![]
1449 }),
1450 )),
1451 project::context_server_store::ContextServerConfiguration::Http {
1452 url,
1453 headers,
1454 timeout: _,
1455 } => Some(acp::McpServer::Http(
1456 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1457 headers
1458 .iter()
1459 .map(|(name, value)| acp::HttpHeader::new(name, value))
1460 .collect(),
1461 ),
1462 )),
1463 _ => None,
1464 }
1465 })
1466 .collect()
1467}
1468
1469fn config_state(
1470 modes: Option<acp::SessionModeState>,
1471 models: Option<acp::SessionModelState>,
1472 config_options: Option<Vec<acp::SessionConfigOption>>,
1473) -> (
1474 Option<Rc<RefCell<acp::SessionModeState>>>,
1475 Option<Rc<RefCell<acp::SessionModelState>>>,
1476 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1477) {
1478 if let Some(opts) = config_options {
1479 return (None, None, Some(Rc::new(RefCell::new(opts))));
1480 }
1481
1482 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1483 let models = models.map(|models| Rc::new(RefCell::new(models)));
1484 (modes, models, None)
1485}
1486
1487struct AcpSessionModes {
1488 session_id: acp::SessionId,
1489 connection: ConnectionTo<Agent>,
1490 state: Rc<RefCell<acp::SessionModeState>>,
1491}
1492
1493impl acp_thread::AgentSessionModes for AcpSessionModes {
1494 fn current_mode(&self) -> acp::SessionModeId {
1495 self.state.borrow().current_mode_id.clone()
1496 }
1497
1498 fn all_modes(&self) -> Vec<acp::SessionMode> {
1499 self.state.borrow().available_modes.clone()
1500 }
1501
1502 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1503 let connection = self.connection.clone();
1504 let session_id = self.session_id.clone();
1505 let old_mode_id;
1506 {
1507 let mut state = self.state.borrow_mut();
1508 old_mode_id = state.current_mode_id.clone();
1509 state.current_mode_id = mode_id.clone();
1510 };
1511 let state = self.state.clone();
1512 cx.foreground_executor().spawn(async move {
1513 let result = into_foreground_future(
1514 connection.send_request(acp::SetSessionModeRequest::new(session_id, mode_id)),
1515 )
1516 .await;
1517
1518 if result.is_err() {
1519 state.borrow_mut().current_mode_id = old_mode_id;
1520 }
1521
1522 result?;
1523
1524 Ok(())
1525 })
1526 }
1527}
1528
1529struct AcpModelSelector {
1530 session_id: acp::SessionId,
1531 connection: ConnectionTo<Agent>,
1532 state: Rc<RefCell<acp::SessionModelState>>,
1533}
1534
1535impl AcpModelSelector {
1536 fn new(
1537 session_id: acp::SessionId,
1538 connection: ConnectionTo<Agent>,
1539 state: Rc<RefCell<acp::SessionModelState>>,
1540 ) -> Self {
1541 Self {
1542 session_id,
1543 connection,
1544 state,
1545 }
1546 }
1547}
1548
1549impl acp_thread::AgentModelSelector for AcpModelSelector {
1550 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1551 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1552 self.state
1553 .borrow()
1554 .available_models
1555 .clone()
1556 .into_iter()
1557 .map(acp_thread::AgentModelInfo::from)
1558 .collect(),
1559 )))
1560 }
1561
1562 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1563 let connection = self.connection.clone();
1564 let session_id = self.session_id.clone();
1565 let old_model_id;
1566 {
1567 let mut state = self.state.borrow_mut();
1568 old_model_id = state.current_model_id.clone();
1569 state.current_model_id = model_id.clone();
1570 };
1571 let state = self.state.clone();
1572 cx.foreground_executor().spawn(async move {
1573 let result = into_foreground_future(
1574 connection.send_request(acp::SetSessionModelRequest::new(session_id, model_id)),
1575 )
1576 .await;
1577
1578 if result.is_err() {
1579 state.borrow_mut().current_model_id = old_model_id;
1580 }
1581
1582 result?;
1583
1584 Ok(())
1585 })
1586 }
1587
1588 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1589 let state = self.state.borrow();
1590 Task::ready(
1591 state
1592 .available_models
1593 .iter()
1594 .find(|m| m.model_id == state.current_model_id)
1595 .cloned()
1596 .map(acp_thread::AgentModelInfo::from)
1597 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1598 )
1599 }
1600}
1601
1602struct AcpSessionConfigOptions {
1603 session_id: acp::SessionId,
1604 connection: ConnectionTo<Agent>,
1605 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1606 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1607 watch_rx: watch::Receiver<()>,
1608}
1609
1610impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1611 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1612 self.state.borrow().clone()
1613 }
1614
1615 fn set_config_option(
1616 &self,
1617 config_id: acp::SessionConfigId,
1618 value: acp::SessionConfigValueId,
1619 cx: &mut App,
1620 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1621 let connection = self.connection.clone();
1622 let session_id = self.session_id.clone();
1623 let state = self.state.clone();
1624
1625 let watch_tx = self.watch_tx.clone();
1626
1627 cx.foreground_executor().spawn(async move {
1628 let response = into_foreground_future(connection.send_request(
1629 acp::SetSessionConfigOptionRequest::new(session_id, config_id, value),
1630 ))
1631 .await?;
1632
1633 *state.borrow_mut() = response.config_options.clone();
1634 watch_tx.borrow_mut().send(()).ok();
1635 Ok(response.config_options)
1636 })
1637 }
1638
1639 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1640 Some(self.watch_rx.clone())
1641 }
1642}
1643
1644// ---------------------------------------------------------------------------
1645// Handler functions dispatched from background handler closures to the
1646// foreground thread via the ForegroundWork channel.
1647// ---------------------------------------------------------------------------
1648
1649fn session_thread(
1650 ctx: &ClientContext,
1651 session_id: &acp::SessionId,
1652) -> Result<WeakEntity<AcpThread>, acp::Error> {
1653 let sessions = ctx.sessions.borrow();
1654 sessions
1655 .get(session_id)
1656 .map(|session| session.thread.clone())
1657 .ok_or_else(|| acp::Error::internal_error().data(format!("unknown session: {session_id}")))
1658}
1659
1660fn respond_err<T: JsonRpcResponse + Send + 'static>(responder: Responder<T>, err: acp::Error) {
1661 responder.respond_with_error(err).log_err();
1662}
1663
1664fn handle_request_permission(
1665 args: acp::RequestPermissionRequest,
1666 responder: Responder<acp::RequestPermissionResponse>,
1667 cx: &mut AsyncApp,
1668 ctx: &ClientContext,
1669) {
1670 let thread = match session_thread(ctx, &args.session_id) {
1671 Ok(t) => t,
1672 Err(e) => return respond_err(responder, e),
1673 };
1674
1675 cx.spawn(async move |cx| {
1676 let result: Result<_, acp::Error> = async {
1677 let task = thread
1678 .update(cx, |thread, cx| {
1679 thread.request_tool_call_authorization(
1680 args.tool_call,
1681 acp_thread::PermissionOptions::Flat(args.options),
1682 cx,
1683 )
1684 })
1685 .map_err(acp::Error::from)?
1686 .map_err(acp::Error::from)?;
1687 Ok(task.await)
1688 }
1689 .await;
1690
1691 match result {
1692 Ok(outcome) => {
1693 responder
1694 .respond(acp::RequestPermissionResponse::new(outcome.into()))
1695 .log_err();
1696 }
1697 Err(e) => respond_err(responder, e),
1698 }
1699 })
1700 .detach();
1701}
1702
1703fn handle_write_text_file(
1704 args: acp::WriteTextFileRequest,
1705 responder: Responder<acp::WriteTextFileResponse>,
1706 cx: &mut AsyncApp,
1707 ctx: &ClientContext,
1708) {
1709 let thread = match session_thread(ctx, &args.session_id) {
1710 Ok(t) => t,
1711 Err(e) => return respond_err(responder, e),
1712 };
1713
1714 cx.spawn(async move |cx| {
1715 let result: Result<_, acp::Error> = async {
1716 thread
1717 .update(cx, |thread, cx| {
1718 thread.write_text_file(args.path, args.content, cx)
1719 })
1720 .map_err(acp::Error::from)?
1721 .await
1722 .map_err(acp::Error::from)?;
1723 Ok(())
1724 }
1725 .await;
1726
1727 match result {
1728 Ok(()) => {
1729 responder
1730 .respond(acp::WriteTextFileResponse::default())
1731 .log_err();
1732 }
1733 Err(e) => respond_err(responder, e),
1734 }
1735 })
1736 .detach();
1737}
1738
1739fn handle_read_text_file(
1740 args: acp::ReadTextFileRequest,
1741 responder: Responder<acp::ReadTextFileResponse>,
1742 cx: &mut AsyncApp,
1743 ctx: &ClientContext,
1744) {
1745 let thread = match session_thread(ctx, &args.session_id) {
1746 Ok(t) => t,
1747 Err(e) => return respond_err(responder, e),
1748 };
1749
1750 cx.spawn(async move |cx| {
1751 let result: Result<_, acp::Error> = async {
1752 thread
1753 .update(cx, |thread, cx| {
1754 thread.read_text_file(args.path, args.line, args.limit, false, cx)
1755 })
1756 .map_err(acp::Error::from)?
1757 .await
1758 }
1759 .await;
1760
1761 match result {
1762 Ok(content) => {
1763 responder
1764 .respond(acp::ReadTextFileResponse::new(content))
1765 .log_err();
1766 }
1767 Err(e) => respond_err(responder, e),
1768 }
1769 })
1770 .detach();
1771}
1772
1773fn handle_session_notification(
1774 notification: acp::SessionNotification,
1775 cx: &mut AsyncApp,
1776 ctx: &ClientContext,
1777) {
1778 // Extract everything we need from the session while briefly borrowing.
1779 let (thread, session_modes, config_opts_data) = {
1780 let sessions = ctx.sessions.borrow();
1781 let Some(session) = sessions.get(¬ification.session_id) else {
1782 log::warn!(
1783 "Received session notification for unknown session: {:?}",
1784 notification.session_id
1785 );
1786 return;
1787 };
1788 (
1789 session.thread.clone(),
1790 session.session_modes.clone(),
1791 session
1792 .config_options
1793 .as_ref()
1794 .map(|opts| (opts.config_options.clone(), opts.tx.clone())),
1795 )
1796 };
1797 // Borrow is dropped here.
1798
1799 // Apply mode/config/session_list updates without holding the borrow.
1800 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1801 current_mode_id, ..
1802 }) = ¬ification.update
1803 {
1804 if let Some(session_modes) = &session_modes {
1805 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1806 }
1807 }
1808
1809 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1810 config_options, ..
1811 }) = ¬ification.update
1812 {
1813 if let Some((config_opts_cell, tx_cell)) = &config_opts_data {
1814 *config_opts_cell.borrow_mut() = config_options.clone();
1815 tx_cell.borrow_mut().send(()).ok();
1816 }
1817 }
1818
1819 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1820 && let Some(session_list) = ctx.session_list.borrow().as_ref()
1821 {
1822 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1823 }
1824
1825 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1826 if let acp::SessionUpdate::ToolCall(tc) = ¬ification.update {
1827 if let Some(meta) = &tc.meta {
1828 if let Some(terminal_info) = meta.get("terminal_info") {
1829 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) {
1830 let terminal_id = acp::TerminalId::new(id_str);
1831 let cwd = terminal_info
1832 .get("cwd")
1833 .and_then(|v| v.as_str().map(PathBuf::from));
1834
1835 thread
1836 .update(cx, |thread, cx| {
1837 let builder = TerminalBuilder::new_display_only(
1838 CursorShape::default(),
1839 AlternateScroll::On,
1840 None,
1841 0,
1842 cx.background_executor(),
1843 thread.project().read(cx).path_style(cx),
1844 )?;
1845 let lower = cx.new(|cx| builder.subscribe(cx));
1846 thread.on_terminal_provider_event(
1847 TerminalProviderEvent::Created {
1848 terminal_id,
1849 label: tc.title.clone(),
1850 cwd,
1851 output_byte_limit: None,
1852 terminal: lower,
1853 },
1854 cx,
1855 );
1856 anyhow::Ok(())
1857 })
1858 .log_err();
1859 }
1860 }
1861 }
1862 }
1863
1864 // Forward the update to the acp_thread as usual.
1865 if let Err(err) = thread
1866 .update(cx, |thread, cx| {
1867 thread.handle_session_update(notification.update.clone(), cx)
1868 })
1869 .and_then(|inner| inner.map_err(anyhow::Error::from))
1870 {
1871 log::error!(
1872 "Failed to handle session update for {:?}: {err:?}",
1873 notification.session_id
1874 );
1875 }
1876
1877 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1878 if let acp::SessionUpdate::ToolCallUpdate(tcu) = ¬ification.update {
1879 if let Some(meta) = &tcu.meta {
1880 if let Some(term_out) = meta.get("terminal_output") {
1881 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1882 let terminal_id = acp::TerminalId::new(id_str);
1883 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1884 let data = s.as_bytes().to_vec();
1885 thread
1886 .update(cx, |thread, cx| {
1887 thread.on_terminal_provider_event(
1888 TerminalProviderEvent::Output { terminal_id, data },
1889 cx,
1890 );
1891 })
1892 .log_err();
1893 }
1894 }
1895 }
1896
1897 if let Some(term_exit) = meta.get("terminal_exit") {
1898 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1899 let terminal_id = acp::TerminalId::new(id_str);
1900 let status = acp::TerminalExitStatus::new()
1901 .exit_code(
1902 term_exit
1903 .get("exit_code")
1904 .and_then(|v| v.as_u64())
1905 .map(|i| i as u32),
1906 )
1907 .signal(
1908 term_exit
1909 .get("signal")
1910 .and_then(|v| v.as_str().map(|s| s.to_string())),
1911 );
1912
1913 thread
1914 .update(cx, |thread, cx| {
1915 thread.on_terminal_provider_event(
1916 TerminalProviderEvent::Exit {
1917 terminal_id,
1918 status,
1919 },
1920 cx,
1921 );
1922 })
1923 .log_err();
1924 }
1925 }
1926 }
1927 }
1928}
1929
1930fn handle_create_terminal(
1931 args: acp::CreateTerminalRequest,
1932 responder: Responder<acp::CreateTerminalResponse>,
1933 cx: &mut AsyncApp,
1934 ctx: &ClientContext,
1935) {
1936 let thread = match session_thread(ctx, &args.session_id) {
1937 Ok(t) => t,
1938 Err(e) => return respond_err(responder, e),
1939 };
1940 let project = match thread
1941 .read_with(cx, |thread, _cx| thread.project().clone())
1942 .map_err(acp::Error::from)
1943 {
1944 Ok(p) => p,
1945 Err(e) => return respond_err(responder, e),
1946 };
1947
1948 cx.spawn(async move |cx| {
1949 let result: Result<_, acp::Error> = async {
1950 let terminal_entity = acp_thread::create_terminal_entity(
1951 args.command.clone(),
1952 &args.args,
1953 args.env
1954 .into_iter()
1955 .map(|env| (env.name, env.value))
1956 .collect(),
1957 args.cwd.clone(),
1958 &project,
1959 cx,
1960 )
1961 .await
1962 .map_err(acp::Error::from)?;
1963
1964 let terminal_entity = thread
1965 .update(cx, |thread, cx| {
1966 thread.register_terminal_created(
1967 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1968 format!("{} {}", args.command, args.args.join(" ")),
1969 args.cwd.clone(),
1970 args.output_byte_limit,
1971 terminal_entity,
1972 cx,
1973 )
1974 })
1975 .map_err(acp::Error::from)?;
1976 let terminal_id = terminal_entity.read_with(cx, |terminal, _| terminal.id().clone());
1977 Ok(terminal_id)
1978 }
1979 .await;
1980
1981 match result {
1982 Ok(terminal_id) => {
1983 responder
1984 .respond(acp::CreateTerminalResponse::new(terminal_id))
1985 .log_err();
1986 }
1987 Err(e) => respond_err(responder, e),
1988 }
1989 })
1990 .detach();
1991}
1992
1993fn handle_kill_terminal(
1994 args: acp::KillTerminalRequest,
1995 responder: Responder<acp::KillTerminalResponse>,
1996 cx: &mut AsyncApp,
1997 ctx: &ClientContext,
1998) {
1999 let thread = match session_thread(ctx, &args.session_id) {
2000 Ok(t) => t,
2001 Err(e) => return respond_err(responder, e),
2002 };
2003
2004 match thread
2005 .update(cx, |thread, cx| thread.kill_terminal(args.terminal_id, cx))
2006 .map_err(acp::Error::from)
2007 .and_then(|r| r.map_err(acp::Error::from))
2008 {
2009 Ok(()) => {
2010 responder
2011 .respond(acp::KillTerminalResponse::default())
2012 .log_err();
2013 }
2014 Err(e) => respond_err(responder, e),
2015 }
2016}
2017
2018fn handle_release_terminal(
2019 args: acp::ReleaseTerminalRequest,
2020 responder: Responder<acp::ReleaseTerminalResponse>,
2021 cx: &mut AsyncApp,
2022 ctx: &ClientContext,
2023) {
2024 let thread = match session_thread(ctx, &args.session_id) {
2025 Ok(t) => t,
2026 Err(e) => return respond_err(responder, e),
2027 };
2028
2029 match thread
2030 .update(cx, |thread, cx| {
2031 thread.release_terminal(args.terminal_id, cx)
2032 })
2033 .map_err(acp::Error::from)
2034 .and_then(|r| r.map_err(acp::Error::from))
2035 {
2036 Ok(()) => {
2037 responder
2038 .respond(acp::ReleaseTerminalResponse::default())
2039 .log_err();
2040 }
2041 Err(e) => respond_err(responder, e),
2042 }
2043}
2044
2045fn handle_terminal_output(
2046 args: acp::TerminalOutputRequest,
2047 responder: Responder<acp::TerminalOutputResponse>,
2048 cx: &mut AsyncApp,
2049 ctx: &ClientContext,
2050) {
2051 let thread = match session_thread(ctx, &args.session_id) {
2052 Ok(t) => t,
2053 Err(e) => return respond_err(responder, e),
2054 };
2055
2056 match thread
2057 .read_with(cx, |thread, cx| -> Result<_, anyhow::Error> {
2058 let out = thread
2059 .terminal(args.terminal_id)?
2060 .read(cx)
2061 .current_output(cx);
2062 Ok(out)
2063 })
2064 .map_err(acp::Error::from)
2065 .and_then(|r| r.map_err(acp::Error::from))
2066 {
2067 Ok(output) => {
2068 responder.respond(output).log_err();
2069 }
2070 Err(e) => respond_err(responder, e),
2071 }
2072}
2073
2074fn handle_wait_for_terminal_exit(
2075 args: acp::WaitForTerminalExitRequest,
2076 responder: Responder<acp::WaitForTerminalExitResponse>,
2077 cx: &mut AsyncApp,
2078 ctx: &ClientContext,
2079) {
2080 let thread = match session_thread(ctx, &args.session_id) {
2081 Ok(t) => t,
2082 Err(e) => return respond_err(responder, e),
2083 };
2084
2085 cx.spawn(async move |cx| {
2086 let result: Result<_, acp::Error> = async {
2087 let exit_status = thread
2088 .update(cx, |thread, cx| {
2089 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
2090 })
2091 .map_err(acp::Error::from)?
2092 .map_err(acp::Error::from)?
2093 .await;
2094 Ok(exit_status)
2095 }
2096 .await;
2097
2098 match result {
2099 Ok(exit_status) => {
2100 responder
2101 .respond(acp::WaitForTerminalExitResponse::new(exit_status))
2102 .log_err();
2103 }
2104 Err(e) => respond_err(responder, e),
2105 }
2106 })
2107 .detach();
2108}