1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::FutureExt as _;
13use futures::future::Shared;
14use futures::io::BufReader;
15use project::agent_server_store::{AgentServerCommand, AgentServerStore};
16use project::{AgentId, Project};
17use remote::remote_client::Interactive;
18use serde::Deserialize;
19use settings::Settings as _;
20use std::path::PathBuf;
21use std::process::Stdio;
22use std::rc::Rc;
23use std::{any::Any, cell::RefCell};
24use task::{ShellBuilder, SpawnInTerminal};
25use thiserror::Error;
26use util::ResultExt as _;
27use util::path_list::PathList;
28use util::process::Child;
29
30use std::sync::Arc;
31
32use anyhow::{Context as _, Result};
33use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
34
35use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
36use terminal::TerminalBuilder;
37use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
38
39use crate::GEMINI_ID;
40
41pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
42
43#[derive(Debug, Error)]
44#[error("Unsupported version")]
45pub struct UnsupportedVersion;
46
47pub struct AcpConnection {
48 id: AgentId,
49 telemetry_id: SharedString,
50 connection: Rc<acp::ClientSideConnection>,
51 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
52 pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
53 auth_methods: Vec<acp::AuthMethod>,
54 agent_server_store: WeakEntity<AgentServerStore>,
55 agent_capabilities: acp::AgentCapabilities,
56 default_mode: Option<acp::SessionModeId>,
57 default_model: Option<acp::ModelId>,
58 default_config_options: HashMap<String, String>,
59 child: Option<Child>,
60 session_list: Option<Rc<AcpSessionList>>,
61 _io_task: Task<Result<(), acp::Error>>,
62 _wait_task: Task<Result<()>>,
63 _stderr_task: Task<Result<()>>,
64}
65
66struct PendingAcpSession {
67 task: Shared<Task<Result<Entity<AcpThread>, Arc<anyhow::Error>>>>,
68 ref_count: usize,
69}
70
71struct SessionConfigResponse {
72 modes: Option<acp::SessionModeState>,
73 models: Option<acp::SessionModelState>,
74 config_options: Option<Vec<acp::SessionConfigOption>>,
75}
76
77struct ConfigOptions {
78 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
79 tx: Rc<RefCell<watch::Sender<()>>>,
80 rx: watch::Receiver<()>,
81}
82
83impl ConfigOptions {
84 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
85 let (tx, rx) = watch::channel(());
86 Self {
87 config_options,
88 tx: Rc::new(RefCell::new(tx)),
89 rx,
90 }
91 }
92}
93
94pub struct AcpSession {
95 thread: WeakEntity<AcpThread>,
96 suppress_abort_err: bool,
97 models: Option<Rc<RefCell<acp::SessionModelState>>>,
98 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
99 config_options: Option<ConfigOptions>,
100 ref_count: usize,
101}
102
103pub struct AcpSessionList {
104 connection: Rc<acp::ClientSideConnection>,
105 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
106 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
107}
108
109impl AcpSessionList {
110 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
111 let (tx, rx) = smol::channel::unbounded();
112 Self {
113 connection,
114 updates_tx: tx,
115 updates_rx: rx,
116 }
117 }
118
119 fn notify_update(&self) {
120 self.updates_tx
121 .try_send(acp_thread::SessionListUpdate::Refresh)
122 .log_err();
123 }
124
125 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
126 self.updates_tx
127 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
128 .log_err();
129 }
130}
131
132impl AgentSessionList for AcpSessionList {
133 fn list_sessions(
134 &self,
135 request: AgentSessionListRequest,
136 cx: &mut App,
137 ) -> Task<Result<AgentSessionListResponse>> {
138 let conn = self.connection.clone();
139 cx.foreground_executor().spawn(async move {
140 let acp_request = acp::ListSessionsRequest::new()
141 .cwd(request.cwd)
142 .cursor(request.cursor);
143 let response = conn.list_sessions(acp_request).await?;
144 Ok(AgentSessionListResponse {
145 sessions: response
146 .sessions
147 .into_iter()
148 .map(|s| AgentSessionInfo {
149 session_id: s.session_id,
150 work_dirs: Some(PathList::new(&[s.cwd])),
151 title: s.title.map(Into::into),
152 updated_at: s.updated_at.and_then(|date_str| {
153 chrono::DateTime::parse_from_rfc3339(&date_str)
154 .ok()
155 .map(|dt| dt.with_timezone(&chrono::Utc))
156 }),
157 created_at: None,
158 meta: s.meta,
159 })
160 .collect(),
161 next_cursor: response.next_cursor,
162 meta: response.meta,
163 })
164 })
165 }
166
167 fn watch(
168 &self,
169 _cx: &mut App,
170 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
171 Some(self.updates_rx.clone())
172 }
173
174 fn notify_refresh(&self) {
175 self.notify_update();
176 }
177
178 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
179 self
180 }
181}
182
183pub async fn connect(
184 agent_id: AgentId,
185 project: Entity<Project>,
186 command: AgentServerCommand,
187 agent_server_store: WeakEntity<AgentServerStore>,
188 default_mode: Option<acp::SessionModeId>,
189 default_model: Option<acp::ModelId>,
190 default_config_options: HashMap<String, String>,
191 cx: &mut AsyncApp,
192) -> Result<Rc<dyn AgentConnection>> {
193 let conn = AcpConnection::stdio(
194 agent_id,
195 project,
196 command.clone(),
197 agent_server_store,
198 default_mode,
199 default_model,
200 default_config_options,
201 cx,
202 )
203 .await?;
204 Ok(Rc::new(conn) as _)
205}
206
207const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
208
209impl AcpConnection {
210 pub async fn stdio(
211 agent_id: AgentId,
212 project: Entity<Project>,
213 command: AgentServerCommand,
214 agent_server_store: WeakEntity<AgentServerStore>,
215 default_mode: Option<acp::SessionModeId>,
216 default_model: Option<acp::ModelId>,
217 default_config_options: HashMap<String, String>,
218 cx: &mut AsyncApp,
219 ) -> Result<Self> {
220 let root_dir = project.read_with(cx, |project, cx| {
221 project
222 .default_path_list(cx)
223 .ordered_paths()
224 .next()
225 .cloned()
226 });
227 let original_command = command.clone();
228 let (path, args, env) = project
229 .read_with(cx, |project, cx| {
230 project.remote_client().and_then(|client| {
231 let template = client
232 .read(cx)
233 .build_command_with_options(
234 Some(command.path.display().to_string()),
235 &command.args,
236 &command.env.clone().into_iter().flatten().collect(),
237 root_dir.as_ref().map(|path| path.display().to_string()),
238 None,
239 Interactive::No,
240 )
241 .log_err()?;
242 Some((template.program, template.args, template.env))
243 })
244 })
245 .unwrap_or_else(|| {
246 (
247 command.path.display().to_string(),
248 command.args,
249 command.env.unwrap_or_default(),
250 )
251 });
252
253 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
254 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
255 let mut child = builder.build_std_command(Some(path.clone()), &args);
256 child.envs(env.clone());
257 if let Some(cwd) = project.read_with(cx, |project, _cx| {
258 if project.is_local() {
259 root_dir.as_ref()
260 } else {
261 None
262 }
263 }) {
264 child.current_dir(cwd);
265 }
266 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
267
268 let stdout = child.stdout.take().context("Failed to take stdout")?;
269 let stdin = child.stdin.take().context("Failed to take stdin")?;
270 let stderr = child.stderr.take().context("Failed to take stderr")?;
271 log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
272 log::trace!("Spawned (pid: {})", child.id());
273
274 let sessions = Rc::new(RefCell::new(HashMap::default()));
275
276 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
277 (
278 release_channel::ReleaseChannel::try_global(cx)
279 .map(|release_channel| release_channel.display_name()),
280 release_channel::AppVersion::global(cx).to_string(),
281 )
282 });
283
284 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
285 Rc::new(RefCell::new(None));
286
287 let client = ClientDelegate {
288 sessions: sessions.clone(),
289 session_list: client_session_list.clone(),
290 cx: cx.clone(),
291 };
292 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
293 let foreground_executor = cx.foreground_executor().clone();
294 move |fut| {
295 foreground_executor.spawn(fut).detach();
296 }
297 });
298
299 let io_task = cx.background_spawn(io_task);
300
301 let stderr_task = cx.background_spawn(async move {
302 let mut stderr = BufReader::new(stderr);
303 let mut line = String::new();
304 while let Ok(n) = stderr.read_line(&mut line).await
305 && n > 0
306 {
307 log::warn!("agent stderr: {}", line.trim());
308 line.clear();
309 }
310 Ok(())
311 });
312
313 let wait_task = cx.spawn({
314 let sessions = sessions.clone();
315 let status_fut = child.status();
316 async move |cx| {
317 let status = status_fut.await?;
318
319 for session in sessions.borrow().values() {
320 session
321 .thread
322 .update(cx, |thread, cx| {
323 thread.emit_load_error(LoadError::Exited { status }, cx)
324 })
325 .ok();
326 }
327
328 anyhow::Ok(())
329 }
330 });
331
332 let connection = Rc::new(connection);
333
334 cx.update(|cx| {
335 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
336 registry.set_active_connection(agent_id.clone(), &connection, cx)
337 });
338 });
339
340 let response = connection
341 .initialize(
342 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
343 .client_capabilities(
344 acp::ClientCapabilities::new()
345 .fs(acp::FileSystemCapabilities::new()
346 .read_text_file(true)
347 .write_text_file(true))
348 .terminal(true)
349 .auth(acp::AuthCapabilities::new().terminal(true))
350 // Experimental: Allow for rendering terminal output from the agents
351 .meta(acp::Meta::from_iter([
352 ("terminal_output".into(), true.into()),
353 ("terminal-auth".into(), true.into()),
354 ])),
355 )
356 .client_info(
357 acp::Implementation::new("zed", version)
358 .title(release_channel.map(ToOwned::to_owned)),
359 ),
360 )
361 .await?;
362
363 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
364 return Err(UnsupportedVersion.into());
365 }
366
367 let telemetry_id = response
368 .agent_info
369 // Use the one the agent provides if we have one
370 .map(|info| info.name.into())
371 // Otherwise, just use the name
372 .unwrap_or_else(|| agent_id.0.clone());
373
374 let session_list = if response
375 .agent_capabilities
376 .session_capabilities
377 .list
378 .is_some()
379 {
380 let list = Rc::new(AcpSessionList::new(connection.clone()));
381 *client_session_list.borrow_mut() = Some(list.clone());
382 Some(list)
383 } else {
384 None
385 };
386
387 // TODO: Remove this override once Google team releases their official auth methods
388 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
389 let mut gemini_args = original_command.args.clone();
390 gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
391 let value = serde_json::json!({
392 "label": "gemini /auth",
393 "command": original_command.path.to_string_lossy(),
394 "args": gemini_args,
395 "env": original_command.env.unwrap_or_default(),
396 });
397 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
398 vec![acp::AuthMethod::Agent(
399 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
400 .description("Login with your Google or Vertex AI account")
401 .meta(meta),
402 )]
403 } else {
404 response.auth_methods
405 };
406 Ok(Self {
407 id: agent_id,
408 auth_methods,
409 agent_server_store,
410 connection,
411 telemetry_id,
412 sessions,
413 pending_sessions: Rc::new(RefCell::new(HashMap::default())),
414 agent_capabilities: response.agent_capabilities,
415 default_mode,
416 default_model,
417 default_config_options,
418 session_list,
419 _io_task: io_task,
420 _wait_task: wait_task,
421 _stderr_task: stderr_task,
422 child: Some(child),
423 })
424 }
425
426 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
427 &self.agent_capabilities.prompt_capabilities
428 }
429
430 #[cfg(test)]
431 fn new_for_test(
432 connection: Rc<acp::ClientSideConnection>,
433 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
434 agent_capabilities: acp::AgentCapabilities,
435 agent_server_store: WeakEntity<AgentServerStore>,
436 io_task: Task<Result<(), acp::Error>>,
437 _cx: &mut App,
438 ) -> Self {
439 Self {
440 id: AgentId::new("test"),
441 telemetry_id: "test".into(),
442 connection,
443 sessions,
444 pending_sessions: Rc::new(RefCell::new(HashMap::default())),
445 auth_methods: vec![],
446 agent_server_store,
447 agent_capabilities,
448 default_mode: None,
449 default_model: None,
450 default_config_options: HashMap::default(),
451 child: None,
452 session_list: None,
453 _io_task: io_task,
454 _wait_task: Task::ready(Ok(())),
455 _stderr_task: Task::ready(Ok(())),
456 }
457 }
458
459 fn open_or_create_session(
460 self: Rc<Self>,
461 session_id: acp::SessionId,
462 project: Entity<Project>,
463 work_dirs: PathList,
464 title: Option<SharedString>,
465 rpc_call: impl FnOnce(
466 Rc<acp::ClientSideConnection>,
467 acp::SessionId,
468 PathBuf,
469 )
470 -> futures::future::LocalBoxFuture<'static, Result<SessionConfigResponse>>
471 + 'static,
472 cx: &mut App,
473 ) -> Task<Result<Entity<AcpThread>>> {
474 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
475 session.ref_count += 1;
476 if let Some(thread) = session.thread.upgrade() {
477 return Task::ready(Ok(thread));
478 }
479 }
480
481 if let Some(pending) = self.pending_sessions.borrow_mut().get_mut(&session_id) {
482 pending.ref_count += 1;
483 let task = pending.task.clone();
484 return cx
485 .foreground_executor()
486 .spawn(async move { task.await.map_err(|err| anyhow!(err)) });
487 }
488
489 // TODO: remove this once ACP supports multiple working directories
490 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
491 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
492 };
493
494 let shared_task = cx
495 .spawn({
496 let session_id = session_id.clone();
497 let this = self.clone();
498 async move |cx| {
499 let action_log = cx.new(|_| ActionLog::new(project.clone()));
500 let thread: Entity<AcpThread> = cx.new(|cx| {
501 AcpThread::new(
502 None,
503 title,
504 Some(work_dirs),
505 this.clone(),
506 project,
507 action_log,
508 session_id.clone(),
509 watch::Receiver::constant(
510 this.agent_capabilities.prompt_capabilities.clone(),
511 ),
512 cx,
513 )
514 });
515
516 let response =
517 match rpc_call(this.connection.clone(), session_id.clone(), cwd).await {
518 Ok(response) => response,
519 Err(err) => {
520 this.pending_sessions.borrow_mut().remove(&session_id);
521 return Err(Arc::new(err));
522 }
523 };
524
525 let (modes, models, config_options) =
526 config_state(response.modes, response.models, response.config_options);
527
528 if let Some(config_opts) = config_options.as_ref() {
529 this.apply_default_config_options(&session_id, config_opts, cx);
530 }
531
532 let ref_count = this
533 .pending_sessions
534 .borrow_mut()
535 .remove(&session_id)
536 .map_or(1, |pending| pending.ref_count);
537
538 this.sessions.borrow_mut().insert(
539 session_id,
540 AcpSession {
541 thread: thread.downgrade(),
542 suppress_abort_err: false,
543 session_modes: modes,
544 models,
545 config_options: config_options.map(ConfigOptions::new),
546 ref_count,
547 },
548 );
549
550 Ok(thread)
551 }
552 })
553 .shared();
554
555 self.pending_sessions.borrow_mut().insert(
556 session_id,
557 PendingAcpSession {
558 task: shared_task.clone(),
559 ref_count: 1,
560 },
561 );
562
563 cx.foreground_executor()
564 .spawn(async move { shared_task.await.map_err(|err| anyhow!(err)) })
565 }
566
567 fn apply_default_config_options(
568 &self,
569 session_id: &acp::SessionId,
570 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
571 cx: &mut AsyncApp,
572 ) {
573 let id = self.id.clone();
574 let defaults_to_apply: Vec<_> = {
575 let config_opts_ref = config_options.borrow();
576 config_opts_ref
577 .iter()
578 .filter_map(|config_option| {
579 let default_value = self.default_config_options.get(&*config_option.id.0)?;
580
581 let is_valid = match &config_option.kind {
582 acp::SessionConfigKind::Select(select) => match &select.options {
583 acp::SessionConfigSelectOptions::Ungrouped(options) => options
584 .iter()
585 .any(|opt| &*opt.value.0 == default_value.as_str()),
586 acp::SessionConfigSelectOptions::Grouped(groups) => {
587 groups.iter().any(|g| {
588 g.options
589 .iter()
590 .any(|opt| &*opt.value.0 == default_value.as_str())
591 })
592 }
593 _ => false,
594 },
595 _ => false,
596 };
597
598 if is_valid {
599 let initial_value = match &config_option.kind {
600 acp::SessionConfigKind::Select(select) => {
601 Some(select.current_value.clone())
602 }
603 _ => None,
604 };
605 Some((
606 config_option.id.clone(),
607 default_value.clone(),
608 initial_value,
609 ))
610 } else {
611 log::warn!(
612 "`{}` is not a valid value for config option `{}` in {}",
613 default_value,
614 config_option.id.0,
615 id
616 );
617 None
618 }
619 })
620 .collect()
621 };
622
623 for (config_id, default_value, initial_value) in defaults_to_apply {
624 cx.spawn({
625 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
626 let session_id = session_id.clone();
627 let config_id_clone = config_id.clone();
628 let config_opts = config_options.clone();
629 let conn = self.connection.clone();
630 async move |_| {
631 let result = conn
632 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
633 session_id,
634 config_id_clone.clone(),
635 default_value_id,
636 ))
637 .await
638 .log_err();
639
640 if result.is_none() {
641 if let Some(initial) = initial_value {
642 let mut opts = config_opts.borrow_mut();
643 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
644 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
645 select.current_value = initial;
646 }
647 }
648 }
649 }
650 }
651 })
652 .detach();
653
654 let mut opts = config_options.borrow_mut();
655 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
656 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
657 select.current_value = acp::SessionConfigValueId::new(default_value);
658 }
659 }
660 }
661 }
662}
663
664impl Drop for AcpConnection {
665 fn drop(&mut self) {
666 if let Some(ref mut child) = self.child {
667 child.kill().log_err();
668 }
669 }
670}
671
672fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
673 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
674}
675
676fn terminal_auth_task(
677 command: &AgentServerCommand,
678 agent_id: &AgentId,
679 method: &acp::AuthMethodTerminal,
680) -> SpawnInTerminal {
681 acp_thread::build_terminal_auth_task(
682 terminal_auth_task_id(agent_id, &method.id),
683 method.name.clone(),
684 command.path.to_string_lossy().into_owned(),
685 command.args.clone(),
686 command.env.clone().unwrap_or_default(),
687 )
688}
689
690/// Used to support the _meta method prior to stabilization
691fn meta_terminal_auth_task(
692 agent_id: &AgentId,
693 method_id: &acp::AuthMethodId,
694 method: &acp::AuthMethod,
695) -> Option<SpawnInTerminal> {
696 #[derive(Deserialize)]
697 struct MetaTerminalAuth {
698 label: String,
699 command: String,
700 #[serde(default)]
701 args: Vec<String>,
702 #[serde(default)]
703 env: HashMap<String, String>,
704 }
705
706 let meta = match method {
707 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
708 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
709 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
710 _ => None,
711 }?;
712 let terminal_auth =
713 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
714
715 Some(acp_thread::build_terminal_auth_task(
716 terminal_auth_task_id(agent_id, method_id),
717 terminal_auth.label.clone(),
718 terminal_auth.command,
719 terminal_auth.args,
720 terminal_auth.env,
721 ))
722}
723
724impl AgentConnection for AcpConnection {
725 fn agent_id(&self) -> AgentId {
726 self.id.clone()
727 }
728
729 fn telemetry_id(&self) -> SharedString {
730 self.telemetry_id.clone()
731 }
732
733 fn new_session(
734 self: Rc<Self>,
735 project: Entity<Project>,
736 work_dirs: PathList,
737 cx: &mut App,
738 ) -> Task<Result<Entity<AcpThread>>> {
739 // TODO: remove this once ACP supports multiple working directories
740 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
741 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
742 };
743 let name = self.id.0.clone();
744 let mcp_servers = mcp_servers_for_project(&project, cx);
745
746 cx.spawn(async move |cx| {
747 let response = self.connection
748 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
749 .await
750 .map_err(map_acp_error)?;
751
752 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
753
754 if let Some(default_mode) = self.default_mode.clone() {
755 if let Some(modes) = modes.as_ref() {
756 let mut modes_ref = modes.borrow_mut();
757 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
758
759 if has_mode {
760 let initial_mode_id = modes_ref.current_mode_id.clone();
761
762 cx.spawn({
763 let default_mode = default_mode.clone();
764 let session_id = response.session_id.clone();
765 let modes = modes.clone();
766 let conn = self.connection.clone();
767 async move |_| {
768 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
769 .await.log_err();
770
771 if result.is_none() {
772 modes.borrow_mut().current_mode_id = initial_mode_id;
773 }
774 }
775 }).detach();
776
777 modes_ref.current_mode_id = default_mode;
778 } else {
779 let available_modes = modes_ref
780 .available_modes
781 .iter()
782 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
783 .collect::<Vec<_>>()
784 .join("\n");
785
786 log::warn!(
787 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
788 );
789 }
790 }
791 }
792
793 if let Some(default_model) = self.default_model.clone() {
794 if let Some(models) = models.as_ref() {
795 let mut models_ref = models.borrow_mut();
796 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
797
798 if has_model {
799 let initial_model_id = models_ref.current_model_id.clone();
800
801 cx.spawn({
802 let default_model = default_model.clone();
803 let session_id = response.session_id.clone();
804 let models = models.clone();
805 let conn = self.connection.clone();
806 async move |_| {
807 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
808 .await.log_err();
809
810 if result.is_none() {
811 models.borrow_mut().current_model_id = initial_model_id;
812 }
813 }
814 }).detach();
815
816 models_ref.current_model_id = default_model;
817 } else {
818 let available_models = models_ref
819 .available_models
820 .iter()
821 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
822 .collect::<Vec<_>>()
823 .join("\n");
824
825 log::warn!(
826 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
827 );
828 }
829 }
830 }
831
832 if let Some(config_opts) = config_options.as_ref() {
833 self.apply_default_config_options(&response.session_id, config_opts, cx);
834 }
835
836 let action_log = cx.new(|_| ActionLog::new(project.clone()));
837 let thread: Entity<AcpThread> = cx.new(|cx| {
838 AcpThread::new(
839 None,
840 None,
841 Some(work_dirs),
842 self.clone(),
843 project,
844 action_log,
845 response.session_id.clone(),
846 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
847 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
848 cx,
849 )
850 });
851
852 self.sessions.borrow_mut().insert(
853 response.session_id,
854 AcpSession {
855 thread: thread.downgrade(),
856 suppress_abort_err: false,
857 session_modes: modes,
858 models,
859 config_options: config_options.map(ConfigOptions::new),
860 ref_count: 1,
861 },
862 );
863
864 Ok(thread)
865 })
866 }
867
868 fn supports_load_session(&self) -> bool {
869 self.agent_capabilities.load_session
870 }
871
872 fn supports_resume_session(&self) -> bool {
873 self.agent_capabilities
874 .session_capabilities
875 .resume
876 .is_some()
877 }
878
879 fn load_session(
880 self: Rc<Self>,
881 session_id: acp::SessionId,
882 project: Entity<Project>,
883 work_dirs: PathList,
884 title: Option<SharedString>,
885 cx: &mut App,
886 ) -> Task<Result<Entity<AcpThread>>> {
887 if !self.agent_capabilities.load_session {
888 return Task::ready(Err(anyhow!(LoadError::Other(
889 "Loading sessions is not supported by this agent.".into()
890 ))));
891 }
892
893 let mcp_servers = mcp_servers_for_project(&project, cx);
894 self.open_or_create_session(
895 session_id,
896 project,
897 work_dirs,
898 title,
899 move |connection, session_id, cwd| {
900 Box::pin(async move {
901 let response = connection
902 .load_session(
903 acp::LoadSessionRequest::new(session_id, cwd).mcp_servers(mcp_servers),
904 )
905 .await
906 .map_err(map_acp_error)?;
907 Ok(SessionConfigResponse {
908 modes: response.modes,
909 models: response.models,
910 config_options: response.config_options,
911 })
912 })
913 },
914 cx,
915 )
916 }
917
918 fn resume_session(
919 self: Rc<Self>,
920 session_id: acp::SessionId,
921 project: Entity<Project>,
922 work_dirs: PathList,
923 title: Option<SharedString>,
924 cx: &mut App,
925 ) -> Task<Result<Entity<AcpThread>>> {
926 if self
927 .agent_capabilities
928 .session_capabilities
929 .resume
930 .is_none()
931 {
932 return Task::ready(Err(anyhow!(LoadError::Other(
933 "Resuming sessions is not supported by this agent.".into()
934 ))));
935 }
936
937 let mcp_servers = mcp_servers_for_project(&project, cx);
938 self.open_or_create_session(
939 session_id,
940 project,
941 work_dirs,
942 title,
943 move |connection, session_id, cwd| {
944 Box::pin(async move {
945 let response = connection
946 .resume_session(
947 acp::ResumeSessionRequest::new(session_id, cwd)
948 .mcp_servers(mcp_servers),
949 )
950 .await
951 .map_err(map_acp_error)?;
952 Ok(SessionConfigResponse {
953 modes: response.modes,
954 models: response.models,
955 config_options: response.config_options,
956 })
957 })
958 },
959 cx,
960 )
961 }
962
963 fn supports_close_session(&self) -> bool {
964 self.agent_capabilities.session_capabilities.close.is_some()
965 }
966
967 fn close_session(
968 self: Rc<Self>,
969 session_id: &acp::SessionId,
970 cx: &mut App,
971 ) -> Task<Result<()>> {
972 if !self.supports_close_session() {
973 return Task::ready(Err(anyhow!(LoadError::Other(
974 "Closing sessions is not supported by this agent.".into()
975 ))));
976 }
977
978 let mut sessions = self.sessions.borrow_mut();
979 let Some(session) = sessions.get_mut(session_id) else {
980 return Task::ready(Ok(()));
981 };
982
983 session.ref_count -= 1;
984 if session.ref_count > 0 {
985 return Task::ready(Ok(()));
986 }
987
988 sessions.remove(session_id);
989 drop(sessions);
990
991 let conn = self.connection.clone();
992 let session_id = session_id.clone();
993 cx.foreground_executor().spawn(async move {
994 conn.close_session(acp::CloseSessionRequest::new(session_id))
995 .await?;
996 Ok(())
997 })
998 }
999
1000 fn auth_methods(&self) -> &[acp::AuthMethod] {
1001 &self.auth_methods
1002 }
1003
1004 fn terminal_auth_task(
1005 &self,
1006 method_id: &acp::AuthMethodId,
1007 cx: &App,
1008 ) -> Option<Task<Result<SpawnInTerminal>>> {
1009 let method = self
1010 .auth_methods
1011 .iter()
1012 .find(|method| method.id() == method_id)?;
1013
1014 match method {
1015 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1016 let agent_id = self.id.clone();
1017 let terminal = terminal.clone();
1018 let store = self.agent_server_store.clone();
1019 Some(cx.spawn(async move |cx| {
1020 let command = store
1021 .update(cx, |store, cx| {
1022 let agent = store
1023 .get_external_agent(&agent_id)
1024 .context("Agent server not found")?;
1025 anyhow::Ok(agent.get_command(
1026 terminal.args.clone(),
1027 HashMap::from_iter(terminal.env.clone()),
1028 &mut cx.to_async(),
1029 ))
1030 })?
1031 .context("Failed to get agent command")?
1032 .await?;
1033 Ok(terminal_auth_task(&command, &agent_id, &terminal))
1034 }))
1035 }
1036 _ => meta_terminal_auth_task(&self.id, method_id, method)
1037 .map(|task| Task::ready(Ok(task))),
1038 }
1039 }
1040
1041 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1042 let conn = self.connection.clone();
1043 cx.foreground_executor().spawn(async move {
1044 conn.authenticate(acp::AuthenticateRequest::new(method_id))
1045 .await?;
1046 Ok(())
1047 })
1048 }
1049
1050 fn prompt(
1051 &self,
1052 _id: acp_thread::UserMessageId,
1053 params: acp::PromptRequest,
1054 cx: &mut App,
1055 ) -> Task<Result<acp::PromptResponse>> {
1056 let conn = self.connection.clone();
1057 let sessions = self.sessions.clone();
1058 let session_id = params.session_id.clone();
1059 cx.foreground_executor().spawn(async move {
1060 let result = conn.prompt(params).await;
1061
1062 let mut suppress_abort_err = false;
1063
1064 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1065 suppress_abort_err = session.suppress_abort_err;
1066 session.suppress_abort_err = false;
1067 }
1068
1069 match result {
1070 Ok(response) => Ok(response),
1071 Err(err) => {
1072 if err.code == acp::ErrorCode::AuthRequired {
1073 return Err(anyhow!(acp::Error::auth_required()));
1074 }
1075
1076 if err.code != ErrorCode::InternalError {
1077 anyhow::bail!(err)
1078 }
1079
1080 let Some(data) = &err.data else {
1081 anyhow::bail!(err)
1082 };
1083
1084 // Temporary workaround until the following PR is generally available:
1085 // https://github.com/google-gemini/gemini-cli/pull/6656
1086
1087 #[derive(Deserialize)]
1088 #[serde(deny_unknown_fields)]
1089 struct ErrorDetails {
1090 details: Box<str>,
1091 }
1092
1093 match serde_json::from_value(data.clone()) {
1094 Ok(ErrorDetails { details }) => {
1095 if suppress_abort_err
1096 && (details.contains("This operation was aborted")
1097 || details.contains("The user aborted a request"))
1098 {
1099 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1100 } else {
1101 Err(anyhow!(details))
1102 }
1103 }
1104 Err(_) => Err(anyhow!(err)),
1105 }
1106 }
1107 }
1108 })
1109 }
1110
1111 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1112 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1113 session.suppress_abort_err = true;
1114 }
1115 let conn = self.connection.clone();
1116 let params = acp::CancelNotification::new(session_id.clone());
1117 cx.foreground_executor()
1118 .spawn(async move { conn.cancel(params).await })
1119 .detach();
1120 }
1121
1122 fn session_modes(
1123 &self,
1124 session_id: &acp::SessionId,
1125 _cx: &App,
1126 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1127 let sessions = self.sessions.clone();
1128 let sessions_ref = sessions.borrow();
1129 let Some(session) = sessions_ref.get(session_id) else {
1130 return None;
1131 };
1132
1133 if let Some(modes) = session.session_modes.as_ref() {
1134 Some(Rc::new(AcpSessionModes {
1135 connection: self.connection.clone(),
1136 session_id: session_id.clone(),
1137 state: modes.clone(),
1138 }) as _)
1139 } else {
1140 None
1141 }
1142 }
1143
1144 fn model_selector(
1145 &self,
1146 session_id: &acp::SessionId,
1147 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1148 let sessions = self.sessions.clone();
1149 let sessions_ref = sessions.borrow();
1150 let Some(session) = sessions_ref.get(session_id) else {
1151 return None;
1152 };
1153
1154 if let Some(models) = session.models.as_ref() {
1155 Some(Rc::new(AcpModelSelector::new(
1156 session_id.clone(),
1157 self.connection.clone(),
1158 models.clone(),
1159 )) as _)
1160 } else {
1161 None
1162 }
1163 }
1164
1165 fn session_config_options(
1166 &self,
1167 session_id: &acp::SessionId,
1168 _cx: &App,
1169 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1170 let sessions = self.sessions.borrow();
1171 let session = sessions.get(session_id)?;
1172
1173 let config_opts = session.config_options.as_ref()?;
1174
1175 Some(Rc::new(AcpSessionConfigOptions {
1176 session_id: session_id.clone(),
1177 connection: self.connection.clone(),
1178 state: config_opts.config_options.clone(),
1179 watch_tx: config_opts.tx.clone(),
1180 watch_rx: config_opts.rx.clone(),
1181 }) as _)
1182 }
1183
1184 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1185 self.session_list.clone().map(|s| s as _)
1186 }
1187
1188 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1189 self
1190 }
1191}
1192
1193fn map_acp_error(err: acp::Error) -> anyhow::Error {
1194 if err.code == acp::ErrorCode::AuthRequired {
1195 let mut error = AuthRequired::new();
1196
1197 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1198 error = error.with_description(err.message);
1199 }
1200
1201 anyhow!(error)
1202 } else {
1203 anyhow!(err)
1204 }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use std::sync::atomic::{AtomicUsize, Ordering};
1210
1211 use super::*;
1212
1213 #[test]
1214 fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1215 let command = AgentServerCommand {
1216 path: "/path/to/agent".into(),
1217 args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1218 env: Some(HashMap::from_iter([
1219 ("BASE".into(), "1".into()),
1220 ("SHARED".into(), "override".into()),
1221 ("EXTRA".into(), "2".into()),
1222 ])),
1223 };
1224 let method = acp::AuthMethodTerminal::new("login", "Login");
1225
1226 let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1227
1228 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1229 assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1230 assert_eq!(
1231 task.env,
1232 HashMap::from_iter([
1233 ("BASE".into(), "1".into()),
1234 ("SHARED".into(), "override".into()),
1235 ("EXTRA".into(), "2".into()),
1236 ])
1237 );
1238 assert_eq!(task.label, "Login");
1239 assert_eq!(task.command_label, "Login");
1240 }
1241
1242 #[test]
1243 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1244 let method_id = acp::AuthMethodId::new("legacy-login");
1245 let method = acp::AuthMethod::Agent(
1246 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1247 "terminal-auth".to_string(),
1248 serde_json::json!({
1249 "label": "legacy /auth",
1250 "command": "legacy-agent",
1251 "args": ["auth", "--interactive"],
1252 "env": {
1253 "AUTH_MODE": "interactive",
1254 },
1255 }),
1256 )])),
1257 );
1258
1259 let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1260 .expect("expected legacy terminal auth task");
1261
1262 assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1263 assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1264 assert_eq!(task.args, vec!["auth", "--interactive"]);
1265 assert_eq!(
1266 task.env,
1267 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1268 );
1269 assert_eq!(task.label, "legacy /auth");
1270 }
1271
1272 #[test]
1273 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1274 let method_id = acp::AuthMethodId::new("legacy-login");
1275 let method = acp::AuthMethod::Agent(
1276 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1277 "terminal-auth".to_string(),
1278 serde_json::json!({
1279 "label": "legacy /auth",
1280 }),
1281 )])),
1282 );
1283
1284 assert!(
1285 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1286 );
1287 }
1288
1289 #[test]
1290 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1291 let method_id = acp::AuthMethodId::new("login");
1292 let method = acp::AuthMethod::Terminal(
1293 acp::AuthMethodTerminal::new(method_id, "Login")
1294 .args(vec!["/auth".into()])
1295 .env(std::collections::HashMap::from_iter([(
1296 "AUTH_MODE".into(),
1297 "first-class".into(),
1298 )]))
1299 .meta(acp::Meta::from_iter([(
1300 "terminal-auth".to_string(),
1301 serde_json::json!({
1302 "label": "legacy /auth",
1303 "command": "legacy-agent",
1304 "args": ["legacy-auth"],
1305 "env": {
1306 "AUTH_MODE": "legacy",
1307 },
1308 }),
1309 )])),
1310 );
1311
1312 let command = AgentServerCommand {
1313 path: "/path/to/agent".into(),
1314 args: vec!["--acp".into(), "/auth".into()],
1315 env: Some(HashMap::from_iter([
1316 ("BASE".into(), "1".into()),
1317 ("AUTH_MODE".into(), "first-class".into()),
1318 ])),
1319 };
1320
1321 let task = match &method {
1322 acp::AuthMethod::Terminal(terminal) => {
1323 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1324 }
1325 _ => unreachable!(),
1326 };
1327
1328 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1329 assert_eq!(task.args, vec!["--acp", "/auth"]);
1330 assert_eq!(
1331 task.env,
1332 HashMap::from_iter([
1333 ("BASE".into(), "1".into()),
1334 ("AUTH_MODE".into(), "first-class".into()),
1335 ])
1336 );
1337 assert_eq!(task.label, "Login");
1338 }
1339
1340 struct FakeAcpAgent {
1341 load_session_count: Arc<AtomicUsize>,
1342 close_session_count: Arc<AtomicUsize>,
1343 }
1344
1345 #[async_trait::async_trait(?Send)]
1346 impl acp::Agent for FakeAcpAgent {
1347 async fn initialize(
1348 &self,
1349 args: acp::InitializeRequest,
1350 ) -> acp::Result<acp::InitializeResponse> {
1351 Ok(
1352 acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
1353 acp::AgentCapabilities::default()
1354 .load_session(true)
1355 .session_capabilities(
1356 acp::SessionCapabilities::default()
1357 .close(acp::SessionCloseCapabilities::new()),
1358 ),
1359 ),
1360 )
1361 }
1362
1363 async fn authenticate(
1364 &self,
1365 _: acp::AuthenticateRequest,
1366 ) -> acp::Result<acp::AuthenticateResponse> {
1367 Ok(Default::default())
1368 }
1369
1370 async fn new_session(
1371 &self,
1372 _: acp::NewSessionRequest,
1373 ) -> acp::Result<acp::NewSessionResponse> {
1374 Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1375 }
1376
1377 async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
1378 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
1379 }
1380
1381 async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
1382 Ok(())
1383 }
1384
1385 async fn load_session(
1386 &self,
1387 _: acp::LoadSessionRequest,
1388 ) -> acp::Result<acp::LoadSessionResponse> {
1389 self.load_session_count.fetch_add(1, Ordering::SeqCst);
1390 Ok(acp::LoadSessionResponse::new())
1391 }
1392
1393 async fn close_session(
1394 &self,
1395 _: acp::CloseSessionRequest,
1396 ) -> acp::Result<acp::CloseSessionResponse> {
1397 self.close_session_count.fetch_add(1, Ordering::SeqCst);
1398 Ok(acp::CloseSessionResponse::new())
1399 }
1400 }
1401
1402 async fn connect_fake_agent(
1403 cx: &mut gpui::TestAppContext,
1404 ) -> (
1405 Rc<AcpConnection>,
1406 Entity<project::Project>,
1407 Arc<AtomicUsize>,
1408 Arc<AtomicUsize>,
1409 Task<anyhow::Result<()>>,
1410 ) {
1411 cx.update(|cx| {
1412 let store = settings::SettingsStore::test(cx);
1413 cx.set_global(store);
1414 });
1415
1416 let fs = fs::FakeFs::new(cx.executor());
1417 fs.insert_tree("/", serde_json::json!({ "a": {} })).await;
1418 let project = project::Project::test(fs, [std::path::Path::new("/a")], cx).await;
1419
1420 let load_count = Arc::new(AtomicUsize::new(0));
1421 let close_count = Arc::new(AtomicUsize::new(0));
1422
1423 let (c2a_reader, c2a_writer) = piper::pipe(4096);
1424 let (a2c_reader, a2c_writer) = piper::pipe(4096);
1425
1426 let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1427 Rc::new(RefCell::new(HashMap::default()));
1428 let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1429 Rc::new(RefCell::new(None));
1430
1431 let foreground = cx.foreground_executor().clone();
1432
1433 let client_delegate = ClientDelegate {
1434 sessions: sessions.clone(),
1435 session_list: session_list_container,
1436 cx: cx.to_async(),
1437 };
1438
1439 let (client_conn, client_io_task) =
1440 acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
1441 let foreground = foreground.clone();
1442 move |fut| {
1443 foreground.spawn(fut).detach();
1444 }
1445 });
1446
1447 let fake_agent = FakeAcpAgent {
1448 load_session_count: load_count.clone(),
1449 close_session_count: close_count.clone(),
1450 };
1451
1452 let (_, agent_io_task) =
1453 acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
1454 let foreground = foreground.clone();
1455 move |fut| {
1456 foreground.spawn(fut).detach();
1457 }
1458 });
1459
1460 let client_io_task = cx.background_spawn(client_io_task);
1461 let agent_io_task = cx.background_spawn(agent_io_task);
1462
1463 let response = client_conn
1464 .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
1465 .await
1466 .expect("failed to initialize ACP connection");
1467
1468 let agent_capabilities = response.agent_capabilities;
1469
1470 let agent_server_store =
1471 project.read_with(cx, |project, _| project.agent_server_store().downgrade());
1472
1473 let connection = cx.update(|cx| {
1474 AcpConnection::new_for_test(
1475 Rc::new(client_conn),
1476 sessions,
1477 agent_capabilities,
1478 agent_server_store,
1479 client_io_task,
1480 cx,
1481 )
1482 });
1483
1484 let keep_agent_alive = cx.background_spawn(async move {
1485 agent_io_task.await.ok();
1486 anyhow::Ok(())
1487 });
1488
1489 (
1490 Rc::new(connection),
1491 project,
1492 load_count,
1493 close_count,
1494 keep_agent_alive,
1495 )
1496 }
1497
1498 #[gpui::test]
1499 async fn test_loaded_sessions_keep_state_until_last_close(cx: &mut gpui::TestAppContext) {
1500 let (connection, project, load_count, close_count, _keep_agent_alive) =
1501 connect_fake_agent(cx).await;
1502
1503 let session_id = acp::SessionId::new("session-1");
1504 let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
1505
1506 // Load the same session twice concurrently — the second call should join
1507 // the pending task rather than issuing a second ACP load_session RPC.
1508 let first_load = cx.update(|cx| {
1509 connection.clone().load_session(
1510 session_id.clone(),
1511 project.clone(),
1512 work_dirs.clone(),
1513 None,
1514 cx,
1515 )
1516 });
1517 let second_load = cx.update(|cx| {
1518 connection.clone().load_session(
1519 session_id.clone(),
1520 project.clone(),
1521 work_dirs.clone(),
1522 None,
1523 cx,
1524 )
1525 });
1526
1527 let first_thread = first_load.await.expect("first load failed");
1528 let second_thread = second_load.await.expect("second load failed");
1529 cx.run_until_parked();
1530
1531 assert_eq!(
1532 first_thread.entity_id(),
1533 second_thread.entity_id(),
1534 "concurrent loads for the same session should share one AcpThread"
1535 );
1536 assert_eq!(
1537 load_count.load(Ordering::SeqCst),
1538 1,
1539 "underlying ACP load_session should be called exactly once for concurrent loads"
1540 );
1541
1542 // The session has ref_count 2. The first close should not send the ACP
1543 // close_session RPC — the session is still referenced.
1544 cx.update(|cx| connection.clone().close_session(&session_id, cx))
1545 .await
1546 .expect("first close failed");
1547
1548 assert_eq!(
1549 close_count.load(Ordering::SeqCst),
1550 0,
1551 "ACP close_session should not be sent while ref_count > 0"
1552 );
1553 assert!(
1554 connection.sessions.borrow().contains_key(&session_id),
1555 "session should still be tracked after first close"
1556 );
1557
1558 // The second close drops ref_count to 0 — now the ACP RPC must be sent.
1559 cx.update(|cx| connection.clone().close_session(&session_id, cx))
1560 .await
1561 .expect("second close failed");
1562 cx.run_until_parked();
1563
1564 assert_eq!(
1565 close_count.load(Ordering::SeqCst),
1566 1,
1567 "ACP close_session should be sent exactly once when ref_count reaches 0"
1568 );
1569 assert!(
1570 !connection.sessions.borrow().contains_key(&session_id),
1571 "session should be removed after final close"
1572 );
1573 }
1574}
1575
1576fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1577 let context_server_store = project.read(cx).context_server_store().read(cx);
1578 let is_local = project.read(cx).is_local();
1579 context_server_store
1580 .configured_server_ids()
1581 .iter()
1582 .filter_map(|id| {
1583 let configuration = context_server_store.configuration_for_server(id)?;
1584 match &*configuration {
1585 project::context_server_store::ContextServerConfiguration::Custom {
1586 command,
1587 remote,
1588 ..
1589 }
1590 | project::context_server_store::ContextServerConfiguration::Extension {
1591 command,
1592 remote,
1593 ..
1594 } if is_local || *remote => Some(acp::McpServer::Stdio(
1595 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1596 .args(command.args.clone())
1597 .env(if let Some(env) = command.env.as_ref() {
1598 env.iter()
1599 .map(|(name, value)| acp::EnvVariable::new(name, value))
1600 .collect()
1601 } else {
1602 vec![]
1603 }),
1604 )),
1605 project::context_server_store::ContextServerConfiguration::Http {
1606 url,
1607 headers,
1608 timeout: _,
1609 } => Some(acp::McpServer::Http(
1610 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1611 headers
1612 .iter()
1613 .map(|(name, value)| acp::HttpHeader::new(name, value))
1614 .collect(),
1615 ),
1616 )),
1617 _ => None,
1618 }
1619 })
1620 .collect()
1621}
1622
1623fn config_state(
1624 modes: Option<acp::SessionModeState>,
1625 models: Option<acp::SessionModelState>,
1626 config_options: Option<Vec<acp::SessionConfigOption>>,
1627) -> (
1628 Option<Rc<RefCell<acp::SessionModeState>>>,
1629 Option<Rc<RefCell<acp::SessionModelState>>>,
1630 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1631) {
1632 if let Some(opts) = config_options {
1633 return (None, None, Some(Rc::new(RefCell::new(opts))));
1634 }
1635
1636 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1637 let models = models.map(|models| Rc::new(RefCell::new(models)));
1638 (modes, models, None)
1639}
1640
1641struct AcpSessionModes {
1642 session_id: acp::SessionId,
1643 connection: Rc<acp::ClientSideConnection>,
1644 state: Rc<RefCell<acp::SessionModeState>>,
1645}
1646
1647impl acp_thread::AgentSessionModes for AcpSessionModes {
1648 fn current_mode(&self) -> acp::SessionModeId {
1649 self.state.borrow().current_mode_id.clone()
1650 }
1651
1652 fn all_modes(&self) -> Vec<acp::SessionMode> {
1653 self.state.borrow().available_modes.clone()
1654 }
1655
1656 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1657 let connection = self.connection.clone();
1658 let session_id = self.session_id.clone();
1659 let old_mode_id;
1660 {
1661 let mut state = self.state.borrow_mut();
1662 old_mode_id = state.current_mode_id.clone();
1663 state.current_mode_id = mode_id.clone();
1664 };
1665 let state = self.state.clone();
1666 cx.foreground_executor().spawn(async move {
1667 let result = connection
1668 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1669 .await;
1670
1671 if result.is_err() {
1672 state.borrow_mut().current_mode_id = old_mode_id;
1673 }
1674
1675 result?;
1676
1677 Ok(())
1678 })
1679 }
1680}
1681
1682struct AcpModelSelector {
1683 session_id: acp::SessionId,
1684 connection: Rc<acp::ClientSideConnection>,
1685 state: Rc<RefCell<acp::SessionModelState>>,
1686}
1687
1688impl AcpModelSelector {
1689 fn new(
1690 session_id: acp::SessionId,
1691 connection: Rc<acp::ClientSideConnection>,
1692 state: Rc<RefCell<acp::SessionModelState>>,
1693 ) -> Self {
1694 Self {
1695 session_id,
1696 connection,
1697 state,
1698 }
1699 }
1700}
1701
1702impl acp_thread::AgentModelSelector for AcpModelSelector {
1703 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1704 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1705 self.state
1706 .borrow()
1707 .available_models
1708 .clone()
1709 .into_iter()
1710 .map(acp_thread::AgentModelInfo::from)
1711 .collect(),
1712 )))
1713 }
1714
1715 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1716 let connection = self.connection.clone();
1717 let session_id = self.session_id.clone();
1718 let old_model_id;
1719 {
1720 let mut state = self.state.borrow_mut();
1721 old_model_id = state.current_model_id.clone();
1722 state.current_model_id = model_id.clone();
1723 };
1724 let state = self.state.clone();
1725 cx.foreground_executor().spawn(async move {
1726 let result = connection
1727 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1728 .await;
1729
1730 if result.is_err() {
1731 state.borrow_mut().current_model_id = old_model_id;
1732 }
1733
1734 result?;
1735
1736 Ok(())
1737 })
1738 }
1739
1740 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1741 let state = self.state.borrow();
1742 Task::ready(
1743 state
1744 .available_models
1745 .iter()
1746 .find(|m| m.model_id == state.current_model_id)
1747 .cloned()
1748 .map(acp_thread::AgentModelInfo::from)
1749 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1750 )
1751 }
1752}
1753
1754struct AcpSessionConfigOptions {
1755 session_id: acp::SessionId,
1756 connection: Rc<acp::ClientSideConnection>,
1757 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1758 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1759 watch_rx: watch::Receiver<()>,
1760}
1761
1762impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1763 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1764 self.state.borrow().clone()
1765 }
1766
1767 fn set_config_option(
1768 &self,
1769 config_id: acp::SessionConfigId,
1770 value: acp::SessionConfigValueId,
1771 cx: &mut App,
1772 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1773 let connection = self.connection.clone();
1774 let session_id = self.session_id.clone();
1775 let state = self.state.clone();
1776
1777 let watch_tx = self.watch_tx.clone();
1778
1779 cx.foreground_executor().spawn(async move {
1780 let response = connection
1781 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1782 session_id, config_id, value,
1783 ))
1784 .await?;
1785
1786 *state.borrow_mut() = response.config_options.clone();
1787 watch_tx.borrow_mut().send(()).ok();
1788 Ok(response.config_options)
1789 })
1790 }
1791
1792 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1793 Some(self.watch_rx.clone())
1794 }
1795}
1796
1797struct ClientDelegate {
1798 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1799 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1800 cx: AsyncApp,
1801}
1802
1803#[async_trait::async_trait(?Send)]
1804impl acp::Client for ClientDelegate {
1805 async fn request_permission(
1806 &self,
1807 arguments: acp::RequestPermissionRequest,
1808 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1809 let thread;
1810 {
1811 let sessions_ref = self.sessions.borrow();
1812 let session = sessions_ref
1813 .get(&arguments.session_id)
1814 .context("Failed to get session")?;
1815 thread = session.thread.clone();
1816 }
1817
1818 let cx = &mut self.cx.clone();
1819
1820 let task = thread.update(cx, |thread, cx| {
1821 thread.request_tool_call_authorization(
1822 arguments.tool_call,
1823 acp_thread::PermissionOptions::Flat(arguments.options),
1824 cx,
1825 )
1826 })??;
1827
1828 let outcome = task.await;
1829
1830 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1831 }
1832
1833 async fn write_text_file(
1834 &self,
1835 arguments: acp::WriteTextFileRequest,
1836 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1837 let cx = &mut self.cx.clone();
1838 let task = self
1839 .session_thread(&arguments.session_id)?
1840 .update(cx, |thread, cx| {
1841 thread.write_text_file(arguments.path, arguments.content, cx)
1842 })?;
1843
1844 task.await?;
1845
1846 Ok(Default::default())
1847 }
1848
1849 async fn read_text_file(
1850 &self,
1851 arguments: acp::ReadTextFileRequest,
1852 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1853 let task = self.session_thread(&arguments.session_id)?.update(
1854 &mut self.cx.clone(),
1855 |thread, cx| {
1856 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1857 },
1858 )?;
1859
1860 let content = task.await?;
1861
1862 Ok(acp::ReadTextFileResponse::new(content))
1863 }
1864
1865 async fn session_notification(
1866 &self,
1867 notification: acp::SessionNotification,
1868 ) -> Result<(), acp::Error> {
1869 let sessions = self.sessions.borrow();
1870 let session = sessions
1871 .get(¬ification.session_id)
1872 .context("Failed to get session")?;
1873
1874 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1875 current_mode_id,
1876 ..
1877 }) = ¬ification.update
1878 {
1879 if let Some(session_modes) = &session.session_modes {
1880 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1881 }
1882 }
1883
1884 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1885 config_options,
1886 ..
1887 }) = ¬ification.update
1888 {
1889 if let Some(opts) = &session.config_options {
1890 *opts.config_options.borrow_mut() = config_options.clone();
1891 opts.tx.borrow_mut().send(()).ok();
1892 }
1893 }
1894
1895 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1896 && let Some(session_list) = self.session_list.borrow().as_ref()
1897 {
1898 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1899 }
1900
1901 // Clone so we can inspect meta both before and after handing off to the thread
1902 let update_clone = notification.update.clone();
1903
1904 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1905 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1906 if let Some(meta) = &tc.meta {
1907 if let Some(terminal_info) = meta.get("terminal_info") {
1908 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1909 {
1910 let terminal_id = acp::TerminalId::new(id_str);
1911 let cwd = terminal_info
1912 .get("cwd")
1913 .and_then(|v| v.as_str().map(PathBuf::from));
1914
1915 // Create a minimal display-only lower-level terminal and register it.
1916 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1917 let builder = TerminalBuilder::new_display_only(
1918 CursorShape::default(),
1919 AlternateScroll::On,
1920 None,
1921 0,
1922 cx.background_executor(),
1923 thread.project().read(cx).path_style(cx),
1924 )?;
1925 let lower = cx.new(|cx| builder.subscribe(cx));
1926 thread.on_terminal_provider_event(
1927 TerminalProviderEvent::Created {
1928 terminal_id,
1929 label: tc.title.clone(),
1930 cwd,
1931 output_byte_limit: None,
1932 terminal: lower,
1933 },
1934 cx,
1935 );
1936 anyhow::Ok(())
1937 });
1938 }
1939 }
1940 }
1941 }
1942
1943 // Forward the update to the acp_thread as usual.
1944 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1945 thread.handle_session_update(notification.update.clone(), cx)
1946 })??;
1947
1948 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1949 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1950 if let Some(meta) = &tcu.meta {
1951 if let Some(term_out) = meta.get("terminal_output") {
1952 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1953 let terminal_id = acp::TerminalId::new(id_str);
1954 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1955 let data = s.as_bytes().to_vec();
1956 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1957 thread.on_terminal_provider_event(
1958 TerminalProviderEvent::Output { terminal_id, data },
1959 cx,
1960 );
1961 });
1962 }
1963 }
1964 }
1965
1966 // terminal_exit
1967 if let Some(term_exit) = meta.get("terminal_exit") {
1968 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1969 let terminal_id = acp::TerminalId::new(id_str);
1970 let status = acp::TerminalExitStatus::new()
1971 .exit_code(
1972 term_exit
1973 .get("exit_code")
1974 .and_then(|v| v.as_u64())
1975 .map(|i| i as u32),
1976 )
1977 .signal(
1978 term_exit
1979 .get("signal")
1980 .and_then(|v| v.as_str().map(|s| s.to_string())),
1981 );
1982
1983 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1984 thread.on_terminal_provider_event(
1985 TerminalProviderEvent::Exit {
1986 terminal_id,
1987 status,
1988 },
1989 cx,
1990 );
1991 });
1992 }
1993 }
1994 }
1995 }
1996
1997 Ok(())
1998 }
1999
2000 async fn create_terminal(
2001 &self,
2002 args: acp::CreateTerminalRequest,
2003 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
2004 let thread = self.session_thread(&args.session_id)?;
2005 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
2006
2007 let terminal_entity = acp_thread::create_terminal_entity(
2008 args.command.clone(),
2009 &args.args,
2010 args.env
2011 .into_iter()
2012 .map(|env| (env.name, env.value))
2013 .collect(),
2014 args.cwd.clone(),
2015 &project,
2016 &mut self.cx.clone(),
2017 )
2018 .await?;
2019
2020 // Register with renderer
2021 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
2022 thread.register_terminal_created(
2023 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
2024 format!("{} {}", args.command, args.args.join(" ")),
2025 args.cwd.clone(),
2026 args.output_byte_limit,
2027 terminal_entity,
2028 cx,
2029 )
2030 })?;
2031 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
2032 Ok(acp::CreateTerminalResponse::new(terminal_id))
2033 }
2034
2035 async fn kill_terminal(
2036 &self,
2037 args: acp::KillTerminalRequest,
2038 ) -> Result<acp::KillTerminalResponse, acp::Error> {
2039 self.session_thread(&args.session_id)?
2040 .update(&mut self.cx.clone(), |thread, cx| {
2041 thread.kill_terminal(args.terminal_id, cx)
2042 })??;
2043
2044 Ok(Default::default())
2045 }
2046
2047 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
2048 Err(acp::Error::method_not_found())
2049 }
2050
2051 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
2052 Err(acp::Error::method_not_found())
2053 }
2054
2055 async fn release_terminal(
2056 &self,
2057 args: acp::ReleaseTerminalRequest,
2058 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
2059 self.session_thread(&args.session_id)?
2060 .update(&mut self.cx.clone(), |thread, cx| {
2061 thread.release_terminal(args.terminal_id, cx)
2062 })??;
2063
2064 Ok(Default::default())
2065 }
2066
2067 async fn terminal_output(
2068 &self,
2069 args: acp::TerminalOutputRequest,
2070 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
2071 self.session_thread(&args.session_id)?
2072 .read_with(&mut self.cx.clone(), |thread, cx| {
2073 let out = thread
2074 .terminal(args.terminal_id)?
2075 .read(cx)
2076 .current_output(cx);
2077
2078 Ok(out)
2079 })?
2080 }
2081
2082 async fn wait_for_terminal_exit(
2083 &self,
2084 args: acp::WaitForTerminalExitRequest,
2085 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
2086 let exit_status = self
2087 .session_thread(&args.session_id)?
2088 .update(&mut self.cx.clone(), |thread, cx| {
2089 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
2090 })??
2091 .await;
2092
2093 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
2094 }
2095}
2096
2097impl ClientDelegate {
2098 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
2099 let sessions = self.sessions.borrow();
2100 sessions
2101 .get(session_id)
2102 .context("Failed to get session")
2103 .map(|session| session.thread.clone())
2104 }
2105}