1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::FutureExt as _;
13use futures::future::Shared;
14use futures::io::BufReader;
15use project::agent_server_store::{AgentServerCommand, AgentServerStore};
16use project::{AgentId, Project};
17use remote::remote_client::Interactive;
18use serde::Deserialize;
19use std::path::PathBuf;
20use std::process::Stdio;
21use std::rc::Rc;
22use std::{any::Any, cell::RefCell};
23use task::{Shell, ShellBuilder, SpawnInTerminal};
24use thiserror::Error;
25use util::ResultExt as _;
26use util::path_list::PathList;
27use util::process::Child;
28
29use std::sync::Arc;
30
31use anyhow::{Context as _, Result};
32use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
33
34use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
35use terminal::TerminalBuilder;
36use terminal::terminal_settings::{AlternateScroll, CursorShape};
37
38use crate::GEMINI_ID;
39
40pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
41
42#[derive(Debug, Error)]
43#[error("Unsupported version")]
44pub struct UnsupportedVersion;
45
46pub struct AcpConnection {
47 id: AgentId,
48 telemetry_id: SharedString,
49 connection: Rc<acp::ClientSideConnection>,
50 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
51 pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
52 auth_methods: Vec<acp::AuthMethod>,
53 agent_server_store: WeakEntity<AgentServerStore>,
54 agent_capabilities: acp::AgentCapabilities,
55 default_mode: Option<acp::SessionModeId>,
56 default_model: Option<acp::ModelId>,
57 default_config_options: HashMap<String, String>,
58 child: Option<Child>,
59 session_list: Option<Rc<AcpSessionList>>,
60 _io_task: Task<Result<(), acp::Error>>,
61 _wait_task: Task<Result<()>>,
62 _stderr_task: Task<Result<()>>,
63}
64
65struct PendingAcpSession {
66 task: Shared<Task<Result<Entity<AcpThread>, Arc<anyhow::Error>>>>,
67 ref_count: usize,
68}
69
70struct SessionConfigResponse {
71 modes: Option<acp::SessionModeState>,
72 models: Option<acp::SessionModelState>,
73 config_options: Option<Vec<acp::SessionConfigOption>>,
74}
75
76#[derive(Clone)]
77struct ConfigOptions {
78 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
79 tx: Rc<RefCell<watch::Sender<()>>>,
80 rx: watch::Receiver<()>,
81}
82
83impl ConfigOptions {
84 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
85 let (tx, rx) = watch::channel(());
86 Self {
87 config_options,
88 tx: Rc::new(RefCell::new(tx)),
89 rx,
90 }
91 }
92}
93
94pub struct AcpSession {
95 thread: WeakEntity<AcpThread>,
96 suppress_abort_err: bool,
97 models: Option<Rc<RefCell<acp::SessionModelState>>>,
98 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
99 config_options: Option<ConfigOptions>,
100 ref_count: usize,
101}
102
103pub struct AcpSessionList {
104 connection: Rc<acp::ClientSideConnection>,
105 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
106 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
107}
108
109impl AcpSessionList {
110 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
111 let (tx, rx) = smol::channel::unbounded();
112 Self {
113 connection,
114 updates_tx: tx,
115 updates_rx: rx,
116 }
117 }
118
119 fn notify_update(&self) {
120 self.updates_tx
121 .try_send(acp_thread::SessionListUpdate::Refresh)
122 .log_err();
123 }
124
125 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
126 self.updates_tx
127 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
128 .log_err();
129 }
130}
131
132impl AgentSessionList for AcpSessionList {
133 fn list_sessions(
134 &self,
135 request: AgentSessionListRequest,
136 cx: &mut App,
137 ) -> Task<Result<AgentSessionListResponse>> {
138 let conn = self.connection.clone();
139 cx.foreground_executor().spawn(async move {
140 let acp_request = acp::ListSessionsRequest::new()
141 .cwd(request.cwd)
142 .cursor(request.cursor);
143 let response = conn.list_sessions(acp_request).await?;
144 Ok(AgentSessionListResponse {
145 sessions: response
146 .sessions
147 .into_iter()
148 .map(|s| AgentSessionInfo {
149 session_id: s.session_id,
150 work_dirs: Some(PathList::new(&[s.cwd])),
151 title: s.title.map(Into::into),
152 updated_at: s.updated_at.and_then(|date_str| {
153 chrono::DateTime::parse_from_rfc3339(&date_str)
154 .ok()
155 .map(|dt| dt.with_timezone(&chrono::Utc))
156 }),
157 created_at: None,
158 meta: s.meta,
159 })
160 .collect(),
161 next_cursor: response.next_cursor,
162 meta: response.meta,
163 })
164 })
165 }
166
167 fn watch(
168 &self,
169 _cx: &mut App,
170 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
171 Some(self.updates_rx.clone())
172 }
173
174 fn notify_refresh(&self) {
175 self.notify_update();
176 }
177
178 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
179 self
180 }
181}
182
183pub async fn connect(
184 agent_id: AgentId,
185 project: Entity<Project>,
186 command: AgentServerCommand,
187 agent_server_store: WeakEntity<AgentServerStore>,
188 default_mode: Option<acp::SessionModeId>,
189 default_model: Option<acp::ModelId>,
190 default_config_options: HashMap<String, String>,
191 cx: &mut AsyncApp,
192) -> Result<Rc<dyn AgentConnection>> {
193 let conn = AcpConnection::stdio(
194 agent_id,
195 project,
196 command.clone(),
197 agent_server_store,
198 default_mode,
199 default_model,
200 default_config_options,
201 cx,
202 )
203 .await?;
204 Ok(Rc::new(conn) as _)
205}
206
207const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
208
209impl AcpConnection {
210 pub async fn stdio(
211 agent_id: AgentId,
212 project: Entity<Project>,
213 command: AgentServerCommand,
214 agent_server_store: WeakEntity<AgentServerStore>,
215 default_mode: Option<acp::SessionModeId>,
216 default_model: Option<acp::ModelId>,
217 default_config_options: HashMap<String, String>,
218 cx: &mut AsyncApp,
219 ) -> Result<Self> {
220 let root_dir = project.read_with(cx, |project, cx| {
221 project
222 .default_path_list(cx)
223 .ordered_paths()
224 .next()
225 .cloned()
226 });
227 let original_command = command.clone();
228 let (path, args, env) = project
229 .read_with(cx, |project, cx| {
230 project.remote_client().and_then(|client| {
231 let template = client
232 .read(cx)
233 .build_command_with_options(
234 Some(command.path.display().to_string()),
235 &command.args,
236 &command.env.clone().into_iter().flatten().collect(),
237 root_dir.as_ref().map(|path| path.display().to_string()),
238 None,
239 Interactive::No,
240 )
241 .log_err()?;
242 Some((template.program, template.args, template.env))
243 })
244 })
245 .unwrap_or_else(|| {
246 (
247 command.path.display().to_string(),
248 command.args,
249 command.env.unwrap_or_default(),
250 )
251 });
252
253 let builder = ShellBuilder::new(&Shell::System, cfg!(windows)).non_interactive();
254 let mut child = builder.build_std_command(Some(path.clone()), &args);
255 child.envs(env.clone());
256 if let Some(cwd) = project.read_with(cx, |project, _cx| {
257 if project.is_local() {
258 root_dir.as_ref()
259 } else {
260 None
261 }
262 }) {
263 child.current_dir(cwd);
264 }
265 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
266
267 let stdout = child.stdout.take().context("Failed to take stdout")?;
268 let stdin = child.stdin.take().context("Failed to take stdin")?;
269 let stderr = child.stderr.take().context("Failed to take stderr")?;
270 log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
271 log::trace!("Spawned (pid: {})", child.id());
272
273 let sessions = Rc::new(RefCell::new(HashMap::default()));
274
275 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
276 (
277 release_channel::ReleaseChannel::try_global(cx)
278 .map(|release_channel| release_channel.display_name()),
279 release_channel::AppVersion::global(cx).to_string(),
280 )
281 });
282
283 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
284 Rc::new(RefCell::new(None));
285
286 let client = ClientDelegate {
287 sessions: sessions.clone(),
288 session_list: client_session_list.clone(),
289 cx: cx.clone(),
290 };
291 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
292 let foreground_executor = cx.foreground_executor().clone();
293 move |fut| {
294 foreground_executor.spawn(fut).detach();
295 }
296 });
297
298 let io_task = cx.background_spawn(io_task);
299
300 let stderr_task = cx.background_spawn(async move {
301 let mut stderr = BufReader::new(stderr);
302 let mut line = String::new();
303 while let Ok(n) = stderr.read_line(&mut line).await
304 && n > 0
305 {
306 log::warn!("agent stderr: {}", line.trim());
307 line.clear();
308 }
309 Ok(())
310 });
311
312 let wait_task = cx.spawn({
313 let sessions = sessions.clone();
314 let status_fut = child.status();
315 async move |cx| {
316 let status = status_fut.await?;
317 emit_load_error_to_all_sessions(&sessions, LoadError::Exited { status }, cx);
318 anyhow::Ok(())
319 }
320 });
321
322 let connection = Rc::new(connection);
323
324 cx.update(|cx| {
325 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
326 registry.set_active_connection(agent_id.clone(), &connection, cx)
327 });
328 });
329
330 let response = connection
331 .initialize(
332 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
333 .client_capabilities(
334 acp::ClientCapabilities::new()
335 .fs(acp::FileSystemCapabilities::new()
336 .read_text_file(true)
337 .write_text_file(true))
338 .terminal(true)
339 .auth(acp::AuthCapabilities::new().terminal(true))
340 // Experimental: Allow for rendering terminal output from the agents
341 .meta(acp::Meta::from_iter([
342 ("terminal_output".into(), true.into()),
343 ("terminal-auth".into(), true.into()),
344 ])),
345 )
346 .client_info(
347 acp::Implementation::new("zed", version)
348 .title(release_channel.map(ToOwned::to_owned)),
349 ),
350 )
351 .await?;
352
353 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
354 return Err(UnsupportedVersion.into());
355 }
356
357 let telemetry_id = response
358 .agent_info
359 // Use the one the agent provides if we have one
360 .map(|info| info.name.into())
361 // Otherwise, just use the name
362 .unwrap_or_else(|| agent_id.0.clone());
363
364 let session_list = if response
365 .agent_capabilities
366 .session_capabilities
367 .list
368 .is_some()
369 {
370 let list = Rc::new(AcpSessionList::new(connection.clone()));
371 *client_session_list.borrow_mut() = Some(list.clone());
372 Some(list)
373 } else {
374 None
375 };
376
377 // TODO: Remove this override once Google team releases their official auth methods
378 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
379 let mut gemini_args = original_command.args.clone();
380 gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
381 let value = serde_json::json!({
382 "label": "gemini /auth",
383 "command": original_command.path.to_string_lossy(),
384 "args": gemini_args,
385 "env": original_command.env.unwrap_or_default(),
386 });
387 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
388 vec![acp::AuthMethod::Agent(
389 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
390 .description("Login with your Google or Vertex AI account")
391 .meta(meta),
392 )]
393 } else {
394 response.auth_methods
395 };
396 Ok(Self {
397 id: agent_id,
398 auth_methods,
399 agent_server_store,
400 connection,
401 telemetry_id,
402 sessions,
403 pending_sessions: Rc::new(RefCell::new(HashMap::default())),
404 agent_capabilities: response.agent_capabilities,
405 default_mode,
406 default_model,
407 default_config_options,
408 session_list,
409 _io_task: io_task,
410 _wait_task: wait_task,
411 _stderr_task: stderr_task,
412 child: Some(child),
413 })
414 }
415
416 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
417 &self.agent_capabilities.prompt_capabilities
418 }
419
420 #[cfg(any(test, feature = "test-support"))]
421 fn new_for_test(
422 connection: Rc<acp::ClientSideConnection>,
423 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
424 agent_capabilities: acp::AgentCapabilities,
425 agent_server_store: WeakEntity<AgentServerStore>,
426 io_task: Task<Result<(), acp::Error>>,
427 _cx: &mut App,
428 ) -> Self {
429 Self {
430 id: AgentId::new("test"),
431 telemetry_id: "test".into(),
432 connection,
433 sessions,
434 pending_sessions: Rc::new(RefCell::new(HashMap::default())),
435 auth_methods: vec![],
436 agent_server_store,
437 agent_capabilities,
438 default_mode: None,
439 default_model: None,
440 default_config_options: HashMap::default(),
441 child: None,
442 session_list: None,
443 _io_task: io_task,
444 _wait_task: Task::ready(Ok(())),
445 _stderr_task: Task::ready(Ok(())),
446 }
447 }
448
449 fn open_or_create_session(
450 self: Rc<Self>,
451 session_id: acp::SessionId,
452 project: Entity<Project>,
453 work_dirs: PathList,
454 title: Option<SharedString>,
455 rpc_call: impl FnOnce(
456 Rc<acp::ClientSideConnection>,
457 acp::SessionId,
458 PathBuf,
459 )
460 -> futures::future::LocalBoxFuture<'static, Result<SessionConfigResponse>>
461 + 'static,
462 cx: &mut App,
463 ) -> Task<Result<Entity<AcpThread>>> {
464 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
465 session.ref_count += 1;
466 if let Some(thread) = session.thread.upgrade() {
467 return Task::ready(Ok(thread));
468 }
469 }
470
471 if let Some(pending) = self.pending_sessions.borrow_mut().get_mut(&session_id) {
472 pending.ref_count += 1;
473 let task = pending.task.clone();
474 return cx
475 .foreground_executor()
476 .spawn(async move { task.await.map_err(|err| anyhow!(err)) });
477 }
478
479 // TODO: remove this once ACP supports multiple working directories
480 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
481 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
482 };
483
484 let shared_task = cx
485 .spawn({
486 let session_id = session_id.clone();
487 let this = self.clone();
488 async move |cx| {
489 let action_log = cx.new(|_| ActionLog::new(project.clone()));
490 let thread: Entity<AcpThread> = cx.new(|cx| {
491 AcpThread::new(
492 None,
493 title,
494 Some(work_dirs),
495 this.clone(),
496 project,
497 action_log,
498 session_id.clone(),
499 watch::Receiver::constant(
500 this.agent_capabilities.prompt_capabilities.clone(),
501 ),
502 cx,
503 )
504 });
505
506 let response =
507 match rpc_call(this.connection.clone(), session_id.clone(), cwd).await {
508 Ok(response) => response,
509 Err(err) => {
510 this.pending_sessions.borrow_mut().remove(&session_id);
511 return Err(Arc::new(err));
512 }
513 };
514
515 let (modes, models, config_options) =
516 config_state(response.modes, response.models, response.config_options);
517
518 if let Some(config_opts) = config_options.as_ref() {
519 this.apply_default_config_options(&session_id, config_opts, cx);
520 }
521
522 let ref_count = this
523 .pending_sessions
524 .borrow_mut()
525 .remove(&session_id)
526 .map_or(1, |pending| pending.ref_count);
527
528 this.sessions.borrow_mut().insert(
529 session_id,
530 AcpSession {
531 thread: thread.downgrade(),
532 suppress_abort_err: false,
533 session_modes: modes,
534 models,
535 config_options: config_options.map(ConfigOptions::new),
536 ref_count,
537 },
538 );
539
540 Ok(thread)
541 }
542 })
543 .shared();
544
545 self.pending_sessions.borrow_mut().insert(
546 session_id,
547 PendingAcpSession {
548 task: shared_task.clone(),
549 ref_count: 1,
550 },
551 );
552
553 cx.foreground_executor()
554 .spawn(async move { shared_task.await.map_err(|err| anyhow!(err)) })
555 }
556
557 fn apply_default_config_options(
558 &self,
559 session_id: &acp::SessionId,
560 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
561 cx: &mut AsyncApp,
562 ) {
563 let id = self.id.clone();
564 let defaults_to_apply: Vec<_> = {
565 let config_opts_ref = config_options.borrow();
566 config_opts_ref
567 .iter()
568 .filter_map(|config_option| {
569 let default_value = self.default_config_options.get(&*config_option.id.0)?;
570
571 let is_valid = match &config_option.kind {
572 acp::SessionConfigKind::Select(select) => match &select.options {
573 acp::SessionConfigSelectOptions::Ungrouped(options) => options
574 .iter()
575 .any(|opt| &*opt.value.0 == default_value.as_str()),
576 acp::SessionConfigSelectOptions::Grouped(groups) => {
577 groups.iter().any(|g| {
578 g.options
579 .iter()
580 .any(|opt| &*opt.value.0 == default_value.as_str())
581 })
582 }
583 _ => false,
584 },
585 _ => false,
586 };
587
588 if is_valid {
589 let initial_value = match &config_option.kind {
590 acp::SessionConfigKind::Select(select) => {
591 Some(select.current_value.clone())
592 }
593 _ => None,
594 };
595 Some((
596 config_option.id.clone(),
597 default_value.clone(),
598 initial_value,
599 ))
600 } else {
601 log::warn!(
602 "`{}` is not a valid value for config option `{}` in {}",
603 default_value,
604 config_option.id.0,
605 id
606 );
607 None
608 }
609 })
610 .collect()
611 };
612
613 for (config_id, default_value, initial_value) in defaults_to_apply {
614 cx.spawn({
615 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
616 let session_id = session_id.clone();
617 let config_id_clone = config_id.clone();
618 let config_opts = config_options.clone();
619 let conn = self.connection.clone();
620 async move |_| {
621 let result = conn
622 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
623 session_id,
624 config_id_clone.clone(),
625 default_value_id,
626 ))
627 .await
628 .log_err();
629
630 if result.is_none() {
631 if let Some(initial) = initial_value {
632 let mut opts = config_opts.borrow_mut();
633 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
634 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
635 select.current_value = initial;
636 }
637 }
638 }
639 }
640 }
641 })
642 .detach();
643
644 let mut opts = config_options.borrow_mut();
645 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
646 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
647 select.current_value = acp::SessionConfigValueId::new(default_value);
648 }
649 }
650 }
651 }
652}
653
654fn emit_load_error_to_all_sessions(
655 sessions: &Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
656 error: LoadError,
657 cx: &mut AsyncApp,
658) {
659 let threads: Vec<_> = sessions
660 .borrow()
661 .values()
662 .map(|session| session.thread.clone())
663 .collect();
664
665 for thread in threads {
666 thread
667 .update(cx, |thread, cx| thread.emit_load_error(error.clone(), cx))
668 .ok();
669 }
670}
671
672impl Drop for AcpConnection {
673 fn drop(&mut self) {
674 if let Some(ref mut child) = self.child {
675 child.kill().log_err();
676 }
677 }
678}
679
680fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
681 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
682}
683
684fn terminal_auth_task(
685 command: &AgentServerCommand,
686 agent_id: &AgentId,
687 method: &acp::AuthMethodTerminal,
688) -> SpawnInTerminal {
689 acp_thread::build_terminal_auth_task(
690 terminal_auth_task_id(agent_id, &method.id),
691 method.name.clone(),
692 command.path.to_string_lossy().into_owned(),
693 command.args.clone(),
694 command.env.clone().unwrap_or_default(),
695 )
696}
697
698/// Used to support the _meta method prior to stabilization
699fn meta_terminal_auth_task(
700 agent_id: &AgentId,
701 method_id: &acp::AuthMethodId,
702 method: &acp::AuthMethod,
703) -> Option<SpawnInTerminal> {
704 #[derive(Deserialize)]
705 struct MetaTerminalAuth {
706 label: String,
707 command: String,
708 #[serde(default)]
709 args: Vec<String>,
710 #[serde(default)]
711 env: HashMap<String, String>,
712 }
713
714 let meta = match method {
715 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
716 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
717 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
718 _ => None,
719 }?;
720 let terminal_auth =
721 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
722
723 Some(acp_thread::build_terminal_auth_task(
724 terminal_auth_task_id(agent_id, method_id),
725 terminal_auth.label.clone(),
726 terminal_auth.command,
727 terminal_auth.args,
728 terminal_auth.env,
729 ))
730}
731
732impl AgentConnection for AcpConnection {
733 fn agent_id(&self) -> AgentId {
734 self.id.clone()
735 }
736
737 fn telemetry_id(&self) -> SharedString {
738 self.telemetry_id.clone()
739 }
740
741 fn new_session(
742 self: Rc<Self>,
743 project: Entity<Project>,
744 work_dirs: PathList,
745 cx: &mut App,
746 ) -> Task<Result<Entity<AcpThread>>> {
747 // TODO: remove this once ACP supports multiple working directories
748 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
749 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
750 };
751 let name = self.id.0.clone();
752 let mcp_servers = mcp_servers_for_project(&project, cx);
753
754 cx.spawn(async move |cx| {
755 let response = self.connection
756 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
757 .await
758 .map_err(map_acp_error)?;
759
760 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
761
762 if let Some(default_mode) = self.default_mode.clone() {
763 if let Some(modes) = modes.as_ref() {
764 let mut modes_ref = modes.borrow_mut();
765 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
766
767 if has_mode {
768 let initial_mode_id = modes_ref.current_mode_id.clone();
769
770 cx.spawn({
771 let default_mode = default_mode.clone();
772 let session_id = response.session_id.clone();
773 let modes = modes.clone();
774 let conn = self.connection.clone();
775 async move |_| {
776 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
777 .await.log_err();
778
779 if result.is_none() {
780 modes.borrow_mut().current_mode_id = initial_mode_id;
781 }
782 }
783 }).detach();
784
785 modes_ref.current_mode_id = default_mode;
786 } else {
787 let available_modes = modes_ref
788 .available_modes
789 .iter()
790 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
791 .collect::<Vec<_>>()
792 .join("\n");
793
794 log::warn!(
795 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
796 );
797 }
798 }
799 }
800
801 if let Some(default_model) = self.default_model.clone() {
802 if let Some(models) = models.as_ref() {
803 let mut models_ref = models.borrow_mut();
804 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
805
806 if has_model {
807 let initial_model_id = models_ref.current_model_id.clone();
808
809 cx.spawn({
810 let default_model = default_model.clone();
811 let session_id = response.session_id.clone();
812 let models = models.clone();
813 let conn = self.connection.clone();
814 async move |_| {
815 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
816 .await.log_err();
817
818 if result.is_none() {
819 models.borrow_mut().current_model_id = initial_model_id;
820 }
821 }
822 }).detach();
823
824 models_ref.current_model_id = default_model;
825 } else {
826 let available_models = models_ref
827 .available_models
828 .iter()
829 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
830 .collect::<Vec<_>>()
831 .join("\n");
832
833 log::warn!(
834 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
835 );
836 }
837 }
838 }
839
840 if let Some(config_opts) = config_options.as_ref() {
841 self.apply_default_config_options(&response.session_id, config_opts, cx);
842 }
843
844 let action_log = cx.new(|_| ActionLog::new(project.clone()));
845 let thread: Entity<AcpThread> = cx.new(|cx| {
846 AcpThread::new(
847 None,
848 None,
849 Some(work_dirs),
850 self.clone(),
851 project,
852 action_log,
853 response.session_id.clone(),
854 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
855 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
856 cx,
857 )
858 });
859
860 self.sessions.borrow_mut().insert(
861 response.session_id,
862 AcpSession {
863 thread: thread.downgrade(),
864 suppress_abort_err: false,
865 session_modes: modes,
866 models,
867 config_options: config_options.map(ConfigOptions::new),
868 ref_count: 1,
869 },
870 );
871
872 Ok(thread)
873 })
874 }
875
876 fn supports_load_session(&self) -> bool {
877 self.agent_capabilities.load_session
878 }
879
880 fn supports_resume_session(&self) -> bool {
881 self.agent_capabilities
882 .session_capabilities
883 .resume
884 .is_some()
885 }
886
887 fn load_session(
888 self: Rc<Self>,
889 session_id: acp::SessionId,
890 project: Entity<Project>,
891 work_dirs: PathList,
892 title: Option<SharedString>,
893 cx: &mut App,
894 ) -> Task<Result<Entity<AcpThread>>> {
895 if !self.agent_capabilities.load_session {
896 return Task::ready(Err(anyhow!(LoadError::Other(
897 "Loading sessions is not supported by this agent.".into()
898 ))));
899 }
900
901 let mcp_servers = mcp_servers_for_project(&project, cx);
902 self.open_or_create_session(
903 session_id,
904 project,
905 work_dirs,
906 title,
907 move |connection, session_id, cwd| {
908 Box::pin(async move {
909 let response = connection
910 .load_session(
911 acp::LoadSessionRequest::new(session_id, cwd).mcp_servers(mcp_servers),
912 )
913 .await
914 .map_err(map_acp_error)?;
915 Ok(SessionConfigResponse {
916 modes: response.modes,
917 models: response.models,
918 config_options: response.config_options,
919 })
920 })
921 },
922 cx,
923 )
924 }
925
926 fn resume_session(
927 self: Rc<Self>,
928 session_id: acp::SessionId,
929 project: Entity<Project>,
930 work_dirs: PathList,
931 title: Option<SharedString>,
932 cx: &mut App,
933 ) -> Task<Result<Entity<AcpThread>>> {
934 if self
935 .agent_capabilities
936 .session_capabilities
937 .resume
938 .is_none()
939 {
940 return Task::ready(Err(anyhow!(LoadError::Other(
941 "Resuming sessions is not supported by this agent.".into()
942 ))));
943 }
944
945 let mcp_servers = mcp_servers_for_project(&project, cx);
946 self.open_or_create_session(
947 session_id,
948 project,
949 work_dirs,
950 title,
951 move |connection, session_id, cwd| {
952 Box::pin(async move {
953 let response = connection
954 .resume_session(
955 acp::ResumeSessionRequest::new(session_id, cwd)
956 .mcp_servers(mcp_servers),
957 )
958 .await
959 .map_err(map_acp_error)?;
960 Ok(SessionConfigResponse {
961 modes: response.modes,
962 models: response.models,
963 config_options: response.config_options,
964 })
965 })
966 },
967 cx,
968 )
969 }
970
971 fn supports_close_session(&self) -> bool {
972 self.agent_capabilities.session_capabilities.close.is_some()
973 }
974
975 fn close_session(
976 self: Rc<Self>,
977 session_id: &acp::SessionId,
978 cx: &mut App,
979 ) -> Task<Result<()>> {
980 if !self.supports_close_session() {
981 return Task::ready(Err(anyhow!(LoadError::Other(
982 "Closing sessions is not supported by this agent.".into()
983 ))));
984 }
985
986 let mut sessions = self.sessions.borrow_mut();
987 let Some(session) = sessions.get_mut(session_id) else {
988 return Task::ready(Ok(()));
989 };
990
991 session.ref_count -= 1;
992 if session.ref_count > 0 {
993 return Task::ready(Ok(()));
994 }
995
996 sessions.remove(session_id);
997 drop(sessions);
998
999 let conn = self.connection.clone();
1000 let session_id = session_id.clone();
1001 cx.foreground_executor().spawn(async move {
1002 conn.close_session(acp::CloseSessionRequest::new(session_id))
1003 .await?;
1004 Ok(())
1005 })
1006 }
1007
1008 fn auth_methods(&self) -> &[acp::AuthMethod] {
1009 &self.auth_methods
1010 }
1011
1012 fn terminal_auth_task(
1013 &self,
1014 method_id: &acp::AuthMethodId,
1015 cx: &App,
1016 ) -> Option<Task<Result<SpawnInTerminal>>> {
1017 let method = self
1018 .auth_methods
1019 .iter()
1020 .find(|method| method.id() == method_id)?;
1021
1022 match method {
1023 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1024 let agent_id = self.id.clone();
1025 let terminal = terminal.clone();
1026 let store = self.agent_server_store.clone();
1027 Some(cx.spawn(async move |cx| {
1028 let command = store
1029 .update(cx, |store, cx| {
1030 let agent = store
1031 .get_external_agent(&agent_id)
1032 .context("Agent server not found")?;
1033 anyhow::Ok(agent.get_command(
1034 terminal.args.clone(),
1035 HashMap::from_iter(terminal.env.clone()),
1036 &mut cx.to_async(),
1037 ))
1038 })?
1039 .context("Failed to get agent command")?
1040 .await?;
1041 Ok(terminal_auth_task(&command, &agent_id, &terminal))
1042 }))
1043 }
1044 _ => meta_terminal_auth_task(&self.id, method_id, method)
1045 .map(|task| Task::ready(Ok(task))),
1046 }
1047 }
1048
1049 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1050 let conn = self.connection.clone();
1051 cx.foreground_executor().spawn(async move {
1052 conn.authenticate(acp::AuthenticateRequest::new(method_id))
1053 .await?;
1054 Ok(())
1055 })
1056 }
1057
1058 fn prompt(
1059 &self,
1060 _id: acp_thread::UserMessageId,
1061 params: acp::PromptRequest,
1062 cx: &mut App,
1063 ) -> Task<Result<acp::PromptResponse>> {
1064 let conn = self.connection.clone();
1065 let sessions = self.sessions.clone();
1066 let session_id = params.session_id.clone();
1067 cx.foreground_executor().spawn(async move {
1068 let result = conn.prompt(params).await;
1069
1070 let mut suppress_abort_err = false;
1071
1072 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
1073 suppress_abort_err = session.suppress_abort_err;
1074 session.suppress_abort_err = false;
1075 }
1076
1077 match result {
1078 Ok(response) => Ok(response),
1079 Err(err) => {
1080 if err.code == acp::ErrorCode::AuthRequired {
1081 return Err(anyhow!(acp::Error::auth_required()));
1082 }
1083
1084 if err.code != ErrorCode::InternalError {
1085 anyhow::bail!(err)
1086 }
1087
1088 let Some(data) = &err.data else {
1089 anyhow::bail!(err)
1090 };
1091
1092 // Temporary workaround until the following PR is generally available:
1093 // https://github.com/google-gemini/gemini-cli/pull/6656
1094
1095 #[derive(Deserialize)]
1096 #[serde(deny_unknown_fields)]
1097 struct ErrorDetails {
1098 details: Box<str>,
1099 }
1100
1101 match serde_json::from_value(data.clone()) {
1102 Ok(ErrorDetails { details }) => {
1103 if suppress_abort_err
1104 && (details.contains("This operation was aborted")
1105 || details.contains("The user aborted a request"))
1106 {
1107 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1108 } else {
1109 Err(anyhow!(details))
1110 }
1111 }
1112 Err(_) => Err(anyhow!(err)),
1113 }
1114 }
1115 }
1116 })
1117 }
1118
1119 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1120 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1121 session.suppress_abort_err = true;
1122 }
1123 let conn = self.connection.clone();
1124 let params = acp::CancelNotification::new(session_id.clone());
1125 cx.foreground_executor()
1126 .spawn(async move { conn.cancel(params).await })
1127 .detach();
1128 }
1129
1130 fn session_modes(
1131 &self,
1132 session_id: &acp::SessionId,
1133 _cx: &App,
1134 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1135 let sessions = self.sessions.clone();
1136 let sessions_ref = sessions.borrow();
1137 let Some(session) = sessions_ref.get(session_id) else {
1138 return None;
1139 };
1140
1141 if let Some(modes) = session.session_modes.as_ref() {
1142 Some(Rc::new(AcpSessionModes {
1143 connection: self.connection.clone(),
1144 session_id: session_id.clone(),
1145 state: modes.clone(),
1146 }) as _)
1147 } else {
1148 None
1149 }
1150 }
1151
1152 fn model_selector(
1153 &self,
1154 session_id: &acp::SessionId,
1155 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1156 let sessions = self.sessions.clone();
1157 let sessions_ref = sessions.borrow();
1158 let Some(session) = sessions_ref.get(session_id) else {
1159 return None;
1160 };
1161
1162 if let Some(models) = session.models.as_ref() {
1163 Some(Rc::new(AcpModelSelector::new(
1164 session_id.clone(),
1165 self.connection.clone(),
1166 models.clone(),
1167 )) as _)
1168 } else {
1169 None
1170 }
1171 }
1172
1173 fn session_config_options(
1174 &self,
1175 session_id: &acp::SessionId,
1176 _cx: &App,
1177 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1178 let sessions = self.sessions.borrow();
1179 let session = sessions.get(session_id)?;
1180
1181 let config_opts = session.config_options.as_ref()?;
1182
1183 Some(Rc::new(AcpSessionConfigOptions {
1184 session_id: session_id.clone(),
1185 connection: self.connection.clone(),
1186 state: config_opts.config_options.clone(),
1187 watch_tx: config_opts.tx.clone(),
1188 watch_rx: config_opts.rx.clone(),
1189 }) as _)
1190 }
1191
1192 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1193 self.session_list.clone().map(|s| s as _)
1194 }
1195
1196 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1197 self
1198 }
1199}
1200
1201fn map_acp_error(err: acp::Error) -> anyhow::Error {
1202 if err.code == acp::ErrorCode::AuthRequired {
1203 let mut error = AuthRequired::new();
1204
1205 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1206 error = error.with_description(err.message);
1207 }
1208
1209 anyhow!(error)
1210 } else {
1211 anyhow!(err)
1212 }
1213}
1214
1215#[cfg(any(test, feature = "test-support"))]
1216pub mod test_support {
1217 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1218
1219 use acp_thread::{
1220 AgentModelSelector, AgentSessionConfigOptions, AgentSessionModes, AgentSessionRetry,
1221 AgentSessionSetTitle, AgentSessionTruncate, AgentTelemetry, UserMessageId,
1222 };
1223
1224 use super::*;
1225
1226 #[derive(Clone, Default)]
1227 pub struct FakeAcpAgentServer {
1228 load_session_count: Arc<AtomicUsize>,
1229 close_session_count: Arc<AtomicUsize>,
1230 fail_next_prompt: Arc<AtomicBool>,
1231 exit_status_sender:
1232 Arc<std::sync::Mutex<Option<smol::channel::Sender<std::process::ExitStatus>>>>,
1233 }
1234
1235 impl FakeAcpAgentServer {
1236 pub fn new() -> Self {
1237 Self::default()
1238 }
1239
1240 pub fn load_session_count(&self) -> Arc<AtomicUsize> {
1241 self.load_session_count.clone()
1242 }
1243
1244 pub fn close_session_count(&self) -> Arc<AtomicUsize> {
1245 self.close_session_count.clone()
1246 }
1247
1248 pub fn simulate_server_exit(&self) {
1249 let sender = self
1250 .exit_status_sender
1251 .lock()
1252 .expect("exit status sender lock should not be poisoned")
1253 .clone()
1254 .expect("fake ACP server must be connected before simulating exit");
1255 sender
1256 .try_send(std::process::ExitStatus::default())
1257 .expect("fake ACP server exit receiver should still be alive");
1258 }
1259
1260 pub fn fail_next_prompt(&self) {
1261 self.fail_next_prompt.store(true, Ordering::SeqCst);
1262 }
1263 }
1264
1265 impl crate::AgentServer for FakeAcpAgentServer {
1266 fn logo(&self) -> ui::IconName {
1267 ui::IconName::ZedAgent
1268 }
1269
1270 fn agent_id(&self) -> AgentId {
1271 AgentId::new("Test")
1272 }
1273
1274 fn connect(
1275 &self,
1276 _delegate: crate::AgentServerDelegate,
1277 project: Entity<Project>,
1278 cx: &mut App,
1279 ) -> Task<anyhow::Result<Rc<dyn AgentConnection>>> {
1280 let load_session_count = self.load_session_count.clone();
1281 let close_session_count = self.close_session_count.clone();
1282 let fail_next_prompt = self.fail_next_prompt.clone();
1283 let exit_status_sender = self.exit_status_sender.clone();
1284 cx.spawn(async move |cx| {
1285 let harness = build_fake_acp_connection(
1286 project,
1287 load_session_count,
1288 close_session_count,
1289 fail_next_prompt,
1290 cx,
1291 )
1292 .await?;
1293 let (exit_tx, exit_rx) = smol::channel::bounded(1);
1294 *exit_status_sender
1295 .lock()
1296 .expect("exit status sender lock should not be poisoned") = Some(exit_tx);
1297 let connection = harness.connection.clone();
1298 let simulate_exit_task = cx.spawn(async move |cx| {
1299 while let Ok(status) = exit_rx.recv().await {
1300 emit_load_error_to_all_sessions(
1301 &connection.sessions,
1302 LoadError::Exited { status },
1303 cx,
1304 );
1305 }
1306 Ok(())
1307 });
1308 Ok(Rc::new(FakeAcpAgentConnection {
1309 inner: harness.connection,
1310 _keep_agent_alive: harness.keep_agent_alive,
1311 _simulate_exit_task: simulate_exit_task,
1312 }) as Rc<dyn AgentConnection>)
1313 })
1314 }
1315
1316 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1317 self
1318 }
1319 }
1320
1321 pub struct FakeAcpConnectionHarness {
1322 pub connection: Rc<AcpConnection>,
1323 pub load_session_count: Arc<AtomicUsize>,
1324 pub close_session_count: Arc<AtomicUsize>,
1325 pub keep_agent_alive: Task<anyhow::Result<()>>,
1326 }
1327
1328 struct FakeAcpAgentConnection {
1329 inner: Rc<AcpConnection>,
1330 _keep_agent_alive: Task<anyhow::Result<()>>,
1331 _simulate_exit_task: Task<anyhow::Result<()>>,
1332 }
1333
1334 impl AgentConnection for FakeAcpAgentConnection {
1335 fn agent_id(&self) -> AgentId {
1336 self.inner.agent_id()
1337 }
1338
1339 fn telemetry_id(&self) -> SharedString {
1340 self.inner.telemetry_id()
1341 }
1342
1343 fn new_session(
1344 self: Rc<Self>,
1345 project: Entity<Project>,
1346 work_dirs: PathList,
1347 cx: &mut App,
1348 ) -> Task<Result<Entity<AcpThread>>> {
1349 self.inner.clone().new_session(project, work_dirs, cx)
1350 }
1351
1352 fn supports_load_session(&self) -> bool {
1353 self.inner.supports_load_session()
1354 }
1355
1356 fn load_session(
1357 self: Rc<Self>,
1358 session_id: acp::SessionId,
1359 project: Entity<Project>,
1360 work_dirs: PathList,
1361 title: Option<SharedString>,
1362 cx: &mut App,
1363 ) -> Task<Result<Entity<AcpThread>>> {
1364 self.inner
1365 .clone()
1366 .load_session(session_id, project, work_dirs, title, cx)
1367 }
1368
1369 fn supports_close_session(&self) -> bool {
1370 self.inner.supports_close_session()
1371 }
1372
1373 fn close_session(
1374 self: Rc<Self>,
1375 session_id: &acp::SessionId,
1376 cx: &mut App,
1377 ) -> Task<Result<()>> {
1378 self.inner.clone().close_session(session_id, cx)
1379 }
1380
1381 fn supports_resume_session(&self) -> bool {
1382 self.inner.supports_resume_session()
1383 }
1384
1385 fn resume_session(
1386 self: Rc<Self>,
1387 session_id: acp::SessionId,
1388 project: Entity<Project>,
1389 work_dirs: PathList,
1390 title: Option<SharedString>,
1391 cx: &mut App,
1392 ) -> Task<Result<Entity<AcpThread>>> {
1393 self.inner
1394 .clone()
1395 .resume_session(session_id, project, work_dirs, title, cx)
1396 }
1397
1398 fn auth_methods(&self) -> &[acp::AuthMethod] {
1399 self.inner.auth_methods()
1400 }
1401
1402 fn terminal_auth_task(
1403 &self,
1404 method: &acp::AuthMethodId,
1405 cx: &App,
1406 ) -> Option<Task<Result<SpawnInTerminal>>> {
1407 self.inner.terminal_auth_task(method, cx)
1408 }
1409
1410 fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
1411 self.inner.authenticate(method, cx)
1412 }
1413
1414 fn prompt(
1415 &self,
1416 user_message_id: UserMessageId,
1417 params: acp::PromptRequest,
1418 cx: &mut App,
1419 ) -> Task<Result<acp::PromptResponse>> {
1420 self.inner.prompt(user_message_id, params, cx)
1421 }
1422
1423 fn retry(
1424 &self,
1425 session_id: &acp::SessionId,
1426 cx: &App,
1427 ) -> Option<Rc<dyn AgentSessionRetry>> {
1428 self.inner.retry(session_id, cx)
1429 }
1430
1431 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1432 self.inner.cancel(session_id, cx)
1433 }
1434
1435 fn truncate(
1436 &self,
1437 session_id: &acp::SessionId,
1438 cx: &App,
1439 ) -> Option<Rc<dyn AgentSessionTruncate>> {
1440 self.inner.truncate(session_id, cx)
1441 }
1442
1443 fn set_title(
1444 &self,
1445 session_id: &acp::SessionId,
1446 cx: &App,
1447 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
1448 self.inner.set_title(session_id, cx)
1449 }
1450
1451 fn model_selector(
1452 &self,
1453 session_id: &acp::SessionId,
1454 ) -> Option<Rc<dyn AgentModelSelector>> {
1455 self.inner.model_selector(session_id)
1456 }
1457
1458 fn telemetry(&self) -> Option<Rc<dyn AgentTelemetry>> {
1459 self.inner.telemetry()
1460 }
1461
1462 fn session_modes(
1463 &self,
1464 session_id: &acp::SessionId,
1465 cx: &App,
1466 ) -> Option<Rc<dyn AgentSessionModes>> {
1467 self.inner.session_modes(session_id, cx)
1468 }
1469
1470 fn session_config_options(
1471 &self,
1472 session_id: &acp::SessionId,
1473 cx: &App,
1474 ) -> Option<Rc<dyn AgentSessionConfigOptions>> {
1475 self.inner.session_config_options(session_id, cx)
1476 }
1477
1478 fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1479 self.inner.session_list(cx)
1480 }
1481
1482 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1483 self
1484 }
1485 }
1486
1487 struct FakeAcpAgent {
1488 load_session_count: Arc<AtomicUsize>,
1489 close_session_count: Arc<AtomicUsize>,
1490 fail_next_prompt: Arc<AtomicBool>,
1491 }
1492
1493 #[async_trait::async_trait(?Send)]
1494 impl acp::Agent for FakeAcpAgent {
1495 async fn initialize(
1496 &self,
1497 args: acp::InitializeRequest,
1498 ) -> acp::Result<acp::InitializeResponse> {
1499 Ok(
1500 acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
1501 acp::AgentCapabilities::default()
1502 .load_session(true)
1503 .session_capabilities(
1504 acp::SessionCapabilities::default()
1505 .close(acp::SessionCloseCapabilities::new()),
1506 ),
1507 ),
1508 )
1509 }
1510
1511 async fn authenticate(
1512 &self,
1513 _: acp::AuthenticateRequest,
1514 ) -> acp::Result<acp::AuthenticateResponse> {
1515 Ok(Default::default())
1516 }
1517
1518 async fn new_session(
1519 &self,
1520 _: acp::NewSessionRequest,
1521 ) -> acp::Result<acp::NewSessionResponse> {
1522 Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1523 }
1524
1525 async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
1526 if self.fail_next_prompt.swap(false, Ordering::SeqCst) {
1527 Err(acp::ErrorCode::InternalError.into())
1528 } else {
1529 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
1530 }
1531 }
1532
1533 async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
1534 Ok(())
1535 }
1536
1537 async fn load_session(
1538 &self,
1539 _: acp::LoadSessionRequest,
1540 ) -> acp::Result<acp::LoadSessionResponse> {
1541 self.load_session_count.fetch_add(1, Ordering::SeqCst);
1542 Ok(acp::LoadSessionResponse::new())
1543 }
1544
1545 async fn close_session(
1546 &self,
1547 _: acp::CloseSessionRequest,
1548 ) -> acp::Result<acp::CloseSessionResponse> {
1549 self.close_session_count.fetch_add(1, Ordering::SeqCst);
1550 Ok(acp::CloseSessionResponse::new())
1551 }
1552 }
1553
1554 async fn build_fake_acp_connection(
1555 project: Entity<Project>,
1556 load_session_count: Arc<AtomicUsize>,
1557 close_session_count: Arc<AtomicUsize>,
1558 fail_next_prompt: Arc<AtomicBool>,
1559 cx: &mut AsyncApp,
1560 ) -> Result<FakeAcpConnectionHarness> {
1561 let (c2a_writer, c2a_reader) = async_pipe::pipe();
1562 let (a2c_writer, a2c_reader) = async_pipe::pipe();
1563
1564 let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1565 Rc::new(RefCell::new(HashMap::default()));
1566 let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1567 Rc::new(RefCell::new(None));
1568
1569 let foreground = cx.foreground_executor().clone();
1570
1571 let client_delegate = ClientDelegate {
1572 sessions: sessions.clone(),
1573 session_list: session_list_container,
1574 cx: cx.clone(),
1575 };
1576
1577 let (client_conn, client_io_task) =
1578 acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
1579 let foreground = foreground.clone();
1580 move |fut| {
1581 foreground.spawn(fut).detach();
1582 }
1583 });
1584
1585 let fake_agent = FakeAcpAgent {
1586 load_session_count: load_session_count.clone(),
1587 close_session_count: close_session_count.clone(),
1588 fail_next_prompt,
1589 };
1590
1591 let (_, agent_io_task) =
1592 acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
1593 let foreground = foreground.clone();
1594 move |fut| {
1595 foreground.spawn(fut).detach();
1596 }
1597 });
1598
1599 let client_io_task = cx.background_spawn(client_io_task);
1600 let agent_io_task = cx.background_spawn(agent_io_task);
1601
1602 let response = client_conn
1603 .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
1604 .await?;
1605
1606 let agent_capabilities = response.agent_capabilities;
1607
1608 let agent_server_store =
1609 project.read_with(cx, |project, _| project.agent_server_store().downgrade());
1610
1611 let connection = cx.update(|cx| {
1612 AcpConnection::new_for_test(
1613 Rc::new(client_conn),
1614 sessions,
1615 agent_capabilities,
1616 agent_server_store,
1617 client_io_task,
1618 cx,
1619 )
1620 });
1621
1622 let keep_agent_alive = cx.background_spawn(async move {
1623 agent_io_task.await.ok();
1624 anyhow::Ok(())
1625 });
1626
1627 Ok(FakeAcpConnectionHarness {
1628 connection: Rc::new(connection),
1629 load_session_count,
1630 close_session_count,
1631 keep_agent_alive,
1632 })
1633 }
1634
1635 pub async fn connect_fake_acp_connection(
1636 project: Entity<Project>,
1637 cx: &mut gpui::TestAppContext,
1638 ) -> FakeAcpConnectionHarness {
1639 cx.update(|cx| {
1640 let store = settings::SettingsStore::test(cx);
1641 cx.set_global(store);
1642 });
1643
1644 build_fake_acp_connection(
1645 project,
1646 Arc::new(AtomicUsize::new(0)),
1647 Arc::new(AtomicUsize::new(0)),
1648 Arc::new(AtomicBool::new(false)),
1649 &mut cx.to_async(),
1650 )
1651 .await
1652 .expect("failed to initialize ACP connection")
1653 }
1654}
1655
1656#[cfg(test)]
1657mod tests {
1658 use std::sync::atomic::{AtomicUsize, Ordering};
1659
1660 use super::*;
1661
1662 #[test]
1663 fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1664 let command = AgentServerCommand {
1665 path: "/path/to/agent".into(),
1666 args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1667 env: Some(HashMap::from_iter([
1668 ("BASE".into(), "1".into()),
1669 ("SHARED".into(), "override".into()),
1670 ("EXTRA".into(), "2".into()),
1671 ])),
1672 };
1673 let method = acp::AuthMethodTerminal::new("login", "Login");
1674
1675 let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1676
1677 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1678 assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1679 assert_eq!(
1680 task.env,
1681 HashMap::from_iter([
1682 ("BASE".into(), "1".into()),
1683 ("SHARED".into(), "override".into()),
1684 ("EXTRA".into(), "2".into()),
1685 ])
1686 );
1687 assert_eq!(task.label, "Login");
1688 assert_eq!(task.command_label, "Login");
1689 }
1690
1691 #[test]
1692 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1693 let method_id = acp::AuthMethodId::new("legacy-login");
1694 let method = acp::AuthMethod::Agent(
1695 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1696 "terminal-auth".to_string(),
1697 serde_json::json!({
1698 "label": "legacy /auth",
1699 "command": "legacy-agent",
1700 "args": ["auth", "--interactive"],
1701 "env": {
1702 "AUTH_MODE": "interactive",
1703 },
1704 }),
1705 )])),
1706 );
1707
1708 let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1709 .expect("expected legacy terminal auth task");
1710
1711 assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1712 assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1713 assert_eq!(task.args, vec!["auth", "--interactive"]);
1714 assert_eq!(
1715 task.env,
1716 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1717 );
1718 assert_eq!(task.label, "legacy /auth");
1719 }
1720
1721 #[test]
1722 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1723 let method_id = acp::AuthMethodId::new("legacy-login");
1724 let method = acp::AuthMethod::Agent(
1725 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1726 "terminal-auth".to_string(),
1727 serde_json::json!({
1728 "label": "legacy /auth",
1729 }),
1730 )])),
1731 );
1732
1733 assert!(
1734 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1735 );
1736 }
1737
1738 #[test]
1739 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1740 let method_id = acp::AuthMethodId::new("login");
1741 let method = acp::AuthMethod::Terminal(
1742 acp::AuthMethodTerminal::new(method_id, "Login")
1743 .args(vec!["/auth".into()])
1744 .env(std::collections::HashMap::from_iter([(
1745 "AUTH_MODE".into(),
1746 "first-class".into(),
1747 )]))
1748 .meta(acp::Meta::from_iter([(
1749 "terminal-auth".to_string(),
1750 serde_json::json!({
1751 "label": "legacy /auth",
1752 "command": "legacy-agent",
1753 "args": ["legacy-auth"],
1754 "env": {
1755 "AUTH_MODE": "legacy",
1756 },
1757 }),
1758 )])),
1759 );
1760
1761 let command = AgentServerCommand {
1762 path: "/path/to/agent".into(),
1763 args: vec!["--acp".into(), "/auth".into()],
1764 env: Some(HashMap::from_iter([
1765 ("BASE".into(), "1".into()),
1766 ("AUTH_MODE".into(), "first-class".into()),
1767 ])),
1768 };
1769
1770 let task = match &method {
1771 acp::AuthMethod::Terminal(terminal) => {
1772 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1773 }
1774 _ => unreachable!(),
1775 };
1776
1777 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1778 assert_eq!(task.args, vec!["--acp", "/auth"]);
1779 assert_eq!(
1780 task.env,
1781 HashMap::from_iter([
1782 ("BASE".into(), "1".into()),
1783 ("AUTH_MODE".into(), "first-class".into()),
1784 ])
1785 );
1786 assert_eq!(task.label, "Login");
1787 }
1788
1789 struct FakeAcpAgent {
1790 load_session_count: Arc<AtomicUsize>,
1791 close_session_count: Arc<AtomicUsize>,
1792 }
1793
1794 #[async_trait::async_trait(?Send)]
1795 impl acp::Agent for FakeAcpAgent {
1796 async fn initialize(
1797 &self,
1798 args: acp::InitializeRequest,
1799 ) -> acp::Result<acp::InitializeResponse> {
1800 Ok(
1801 acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
1802 acp::AgentCapabilities::default()
1803 .load_session(true)
1804 .session_capabilities(
1805 acp::SessionCapabilities::default()
1806 .close(acp::SessionCloseCapabilities::new()),
1807 ),
1808 ),
1809 )
1810 }
1811
1812 async fn authenticate(
1813 &self,
1814 _: acp::AuthenticateRequest,
1815 ) -> acp::Result<acp::AuthenticateResponse> {
1816 Ok(Default::default())
1817 }
1818
1819 async fn new_session(
1820 &self,
1821 _: acp::NewSessionRequest,
1822 ) -> acp::Result<acp::NewSessionResponse> {
1823 Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
1824 }
1825
1826 async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
1827 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
1828 }
1829
1830 async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
1831 Ok(())
1832 }
1833
1834 async fn load_session(
1835 &self,
1836 _: acp::LoadSessionRequest,
1837 ) -> acp::Result<acp::LoadSessionResponse> {
1838 self.load_session_count.fetch_add(1, Ordering::SeqCst);
1839 Ok(acp::LoadSessionResponse::new())
1840 }
1841
1842 async fn close_session(
1843 &self,
1844 _: acp::CloseSessionRequest,
1845 ) -> acp::Result<acp::CloseSessionResponse> {
1846 self.close_session_count.fetch_add(1, Ordering::SeqCst);
1847 Ok(acp::CloseSessionResponse::new())
1848 }
1849 }
1850
1851 async fn connect_fake_agent(
1852 cx: &mut gpui::TestAppContext,
1853 ) -> (
1854 Rc<AcpConnection>,
1855 Entity<project::Project>,
1856 Arc<AtomicUsize>,
1857 Arc<AtomicUsize>,
1858 Task<anyhow::Result<()>>,
1859 ) {
1860 cx.update(|cx| {
1861 let store = settings::SettingsStore::test(cx);
1862 cx.set_global(store);
1863 });
1864
1865 let fs = fs::FakeFs::new(cx.executor());
1866 fs.insert_tree("/", serde_json::json!({ "a": {} })).await;
1867 let project = project::Project::test(fs, [std::path::Path::new("/a")], cx).await;
1868
1869 let load_count = Arc::new(AtomicUsize::new(0));
1870 let close_count = Arc::new(AtomicUsize::new(0));
1871
1872 let (c2a_writer, c2a_reader) = async_pipe::pipe();
1873 let (a2c_writer, a2c_reader) = async_pipe::pipe();
1874
1875 let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
1876 Rc::new(RefCell::new(HashMap::default()));
1877 let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
1878 Rc::new(RefCell::new(None));
1879
1880 let foreground = cx.foreground_executor().clone();
1881
1882 let client_delegate = ClientDelegate {
1883 sessions: sessions.clone(),
1884 session_list: session_list_container,
1885 cx: cx.to_async(),
1886 };
1887
1888 let (client_conn, client_io_task) =
1889 acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
1890 let foreground = foreground.clone();
1891 move |fut| {
1892 foreground.spawn(fut).detach();
1893 }
1894 });
1895
1896 let fake_agent = FakeAcpAgent {
1897 load_session_count: load_count.clone(),
1898 close_session_count: close_count.clone(),
1899 };
1900
1901 let (_, agent_io_task) =
1902 acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
1903 let foreground = foreground.clone();
1904 move |fut| {
1905 foreground.spawn(fut).detach();
1906 }
1907 });
1908
1909 let client_io_task = cx.background_spawn(client_io_task);
1910 let agent_io_task = cx.background_spawn(agent_io_task);
1911
1912 let response = client_conn
1913 .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
1914 .await
1915 .expect("failed to initialize ACP connection");
1916
1917 let agent_capabilities = response.agent_capabilities;
1918
1919 let agent_server_store =
1920 project.read_with(cx, |project, _| project.agent_server_store().downgrade());
1921
1922 let connection = cx.update(|cx| {
1923 AcpConnection::new_for_test(
1924 Rc::new(client_conn),
1925 sessions,
1926 agent_capabilities,
1927 agent_server_store,
1928 client_io_task,
1929 cx,
1930 )
1931 });
1932
1933 let keep_agent_alive = cx.background_spawn(async move {
1934 agent_io_task.await.ok();
1935 anyhow::Ok(())
1936 });
1937
1938 (
1939 Rc::new(connection),
1940 project,
1941 load_count,
1942 close_count,
1943 keep_agent_alive,
1944 )
1945 }
1946
1947 #[gpui::test]
1948 async fn test_loaded_sessions_keep_state_until_last_close(cx: &mut gpui::TestAppContext) {
1949 let (connection, project, load_count, close_count, _keep_agent_alive) =
1950 connect_fake_agent(cx).await;
1951
1952 let session_id = acp::SessionId::new("session-1");
1953 let work_dirs = util::path_list::PathList::new(&[std::path::Path::new("/a")]);
1954
1955 // Load the same session twice concurrently — the second call should join
1956 // the pending task rather than issuing a second ACP load_session RPC.
1957 let first_load = cx.update(|cx| {
1958 connection.clone().load_session(
1959 session_id.clone(),
1960 project.clone(),
1961 work_dirs.clone(),
1962 None,
1963 cx,
1964 )
1965 });
1966 let second_load = cx.update(|cx| {
1967 connection.clone().load_session(
1968 session_id.clone(),
1969 project.clone(),
1970 work_dirs.clone(),
1971 None,
1972 cx,
1973 )
1974 });
1975
1976 let first_thread = first_load.await.expect("first load failed");
1977 let second_thread = second_load.await.expect("second load failed");
1978 cx.run_until_parked();
1979
1980 assert_eq!(
1981 first_thread.entity_id(),
1982 second_thread.entity_id(),
1983 "concurrent loads for the same session should share one AcpThread"
1984 );
1985 assert_eq!(
1986 load_count.load(Ordering::SeqCst),
1987 1,
1988 "underlying ACP load_session should be called exactly once for concurrent loads"
1989 );
1990
1991 // The session has ref_count 2. The first close should not send the ACP
1992 // close_session RPC — the session is still referenced.
1993 cx.update(|cx| connection.clone().close_session(&session_id, cx))
1994 .await
1995 .expect("first close failed");
1996
1997 assert_eq!(
1998 close_count.load(Ordering::SeqCst),
1999 0,
2000 "ACP close_session should not be sent while ref_count > 0"
2001 );
2002 assert!(
2003 connection.sessions.borrow().contains_key(&session_id),
2004 "session should still be tracked after first close"
2005 );
2006
2007 // The second close drops ref_count to 0 — now the ACP RPC must be sent.
2008 cx.update(|cx| connection.clone().close_session(&session_id, cx))
2009 .await
2010 .expect("second close failed");
2011 cx.run_until_parked();
2012
2013 assert_eq!(
2014 close_count.load(Ordering::SeqCst),
2015 1,
2016 "ACP close_session should be sent exactly once when ref_count reaches 0"
2017 );
2018 assert!(
2019 !connection.sessions.borrow().contains_key(&session_id),
2020 "session should be removed after final close"
2021 );
2022 }
2023}
2024
2025fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
2026 let context_server_store = project.read(cx).context_server_store().read(cx);
2027 let is_local = project.read(cx).is_local();
2028 context_server_store
2029 .configured_server_ids()
2030 .iter()
2031 .filter_map(|id| {
2032 let configuration = context_server_store.configuration_for_server(id)?;
2033 match &*configuration {
2034 project::context_server_store::ContextServerConfiguration::Custom {
2035 command,
2036 remote,
2037 ..
2038 }
2039 | project::context_server_store::ContextServerConfiguration::Extension {
2040 command,
2041 remote,
2042 ..
2043 } if is_local || *remote => Some(acp::McpServer::Stdio(
2044 acp::McpServerStdio::new(id.0.to_string(), &command.path)
2045 .args(command.args.clone())
2046 .env(if let Some(env) = command.env.as_ref() {
2047 env.iter()
2048 .map(|(name, value)| acp::EnvVariable::new(name, value))
2049 .collect()
2050 } else {
2051 vec![]
2052 }),
2053 )),
2054 project::context_server_store::ContextServerConfiguration::Http {
2055 url,
2056 headers,
2057 timeout: _,
2058 } => Some(acp::McpServer::Http(
2059 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
2060 headers
2061 .iter()
2062 .map(|(name, value)| acp::HttpHeader::new(name, value))
2063 .collect(),
2064 ),
2065 )),
2066 _ => None,
2067 }
2068 })
2069 .collect()
2070}
2071
2072fn config_state(
2073 modes: Option<acp::SessionModeState>,
2074 models: Option<acp::SessionModelState>,
2075 config_options: Option<Vec<acp::SessionConfigOption>>,
2076) -> (
2077 Option<Rc<RefCell<acp::SessionModeState>>>,
2078 Option<Rc<RefCell<acp::SessionModelState>>>,
2079 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
2080) {
2081 if let Some(opts) = config_options {
2082 return (None, None, Some(Rc::new(RefCell::new(opts))));
2083 }
2084
2085 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
2086 let models = models.map(|models| Rc::new(RefCell::new(models)));
2087 (modes, models, None)
2088}
2089
2090struct AcpSessionModes {
2091 session_id: acp::SessionId,
2092 connection: Rc<acp::ClientSideConnection>,
2093 state: Rc<RefCell<acp::SessionModeState>>,
2094}
2095
2096impl acp_thread::AgentSessionModes for AcpSessionModes {
2097 fn current_mode(&self) -> acp::SessionModeId {
2098 self.state.borrow().current_mode_id.clone()
2099 }
2100
2101 fn all_modes(&self) -> Vec<acp::SessionMode> {
2102 self.state.borrow().available_modes.clone()
2103 }
2104
2105 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
2106 let connection = self.connection.clone();
2107 let session_id = self.session_id.clone();
2108 let old_mode_id;
2109 {
2110 let mut state = self.state.borrow_mut();
2111 old_mode_id = state.current_mode_id.clone();
2112 state.current_mode_id = mode_id.clone();
2113 };
2114 let state = self.state.clone();
2115 cx.foreground_executor().spawn(async move {
2116 let result = connection
2117 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
2118 .await;
2119
2120 if result.is_err() {
2121 state.borrow_mut().current_mode_id = old_mode_id;
2122 }
2123
2124 result?;
2125
2126 Ok(())
2127 })
2128 }
2129}
2130
2131struct AcpModelSelector {
2132 session_id: acp::SessionId,
2133 connection: Rc<acp::ClientSideConnection>,
2134 state: Rc<RefCell<acp::SessionModelState>>,
2135}
2136
2137impl AcpModelSelector {
2138 fn new(
2139 session_id: acp::SessionId,
2140 connection: Rc<acp::ClientSideConnection>,
2141 state: Rc<RefCell<acp::SessionModelState>>,
2142 ) -> Self {
2143 Self {
2144 session_id,
2145 connection,
2146 state,
2147 }
2148 }
2149}
2150
2151impl acp_thread::AgentModelSelector for AcpModelSelector {
2152 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
2153 Task::ready(Ok(acp_thread::AgentModelList::Flat(
2154 self.state
2155 .borrow()
2156 .available_models
2157 .clone()
2158 .into_iter()
2159 .map(acp_thread::AgentModelInfo::from)
2160 .collect(),
2161 )))
2162 }
2163
2164 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
2165 let connection = self.connection.clone();
2166 let session_id = self.session_id.clone();
2167 let old_model_id;
2168 {
2169 let mut state = self.state.borrow_mut();
2170 old_model_id = state.current_model_id.clone();
2171 state.current_model_id = model_id.clone();
2172 };
2173 let state = self.state.clone();
2174 cx.foreground_executor().spawn(async move {
2175 let result = connection
2176 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
2177 .await;
2178
2179 if result.is_err() {
2180 state.borrow_mut().current_model_id = old_model_id;
2181 }
2182
2183 result?;
2184
2185 Ok(())
2186 })
2187 }
2188
2189 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
2190 let state = self.state.borrow();
2191 Task::ready(
2192 state
2193 .available_models
2194 .iter()
2195 .find(|m| m.model_id == state.current_model_id)
2196 .cloned()
2197 .map(acp_thread::AgentModelInfo::from)
2198 .ok_or_else(|| anyhow::anyhow!("Model not found")),
2199 )
2200 }
2201}
2202
2203struct AcpSessionConfigOptions {
2204 session_id: acp::SessionId,
2205 connection: Rc<acp::ClientSideConnection>,
2206 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
2207 watch_tx: Rc<RefCell<watch::Sender<()>>>,
2208 watch_rx: watch::Receiver<()>,
2209}
2210
2211impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
2212 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
2213 self.state.borrow().clone()
2214 }
2215
2216 fn set_config_option(
2217 &self,
2218 config_id: acp::SessionConfigId,
2219 value: acp::SessionConfigValueId,
2220 cx: &mut App,
2221 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
2222 let connection = self.connection.clone();
2223 let session_id = self.session_id.clone();
2224 let state = self.state.clone();
2225
2226 let watch_tx = self.watch_tx.clone();
2227
2228 cx.foreground_executor().spawn(async move {
2229 let response = connection
2230 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
2231 session_id, config_id, value,
2232 ))
2233 .await?;
2234
2235 *state.borrow_mut() = response.config_options.clone();
2236 watch_tx.borrow_mut().send(()).ok();
2237 Ok(response.config_options)
2238 })
2239 }
2240
2241 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
2242 Some(self.watch_rx.clone())
2243 }
2244}
2245
2246struct ClientDelegate {
2247 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
2248 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
2249 cx: AsyncApp,
2250}
2251
2252#[async_trait::async_trait(?Send)]
2253impl acp::Client for ClientDelegate {
2254 async fn request_permission(
2255 &self,
2256 arguments: acp::RequestPermissionRequest,
2257 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
2258 let thread;
2259 {
2260 let sessions_ref = self.sessions.borrow();
2261 let session = sessions_ref
2262 .get(&arguments.session_id)
2263 .context("Failed to get session")?;
2264 thread = session.thread.clone();
2265 }
2266
2267 let cx = &mut self.cx.clone();
2268
2269 let task = thread.update(cx, |thread, cx| {
2270 thread.request_tool_call_authorization(
2271 arguments.tool_call,
2272 acp_thread::PermissionOptions::Flat(arguments.options),
2273 cx,
2274 )
2275 })??;
2276
2277 let outcome = task.await;
2278
2279 Ok(acp::RequestPermissionResponse::new(outcome.into()))
2280 }
2281
2282 async fn write_text_file(
2283 &self,
2284 arguments: acp::WriteTextFileRequest,
2285 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
2286 let cx = &mut self.cx.clone();
2287 let task = self
2288 .session_thread(&arguments.session_id)?
2289 .update(cx, |thread, cx| {
2290 thread.write_text_file(arguments.path, arguments.content, cx)
2291 })?;
2292
2293 task.await?;
2294
2295 Ok(Default::default())
2296 }
2297
2298 async fn read_text_file(
2299 &self,
2300 arguments: acp::ReadTextFileRequest,
2301 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
2302 let task = self.session_thread(&arguments.session_id)?.update(
2303 &mut self.cx.clone(),
2304 |thread, cx| {
2305 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
2306 },
2307 )?;
2308
2309 let content = task.await?;
2310
2311 Ok(acp::ReadTextFileResponse::new(content))
2312 }
2313
2314 async fn session_notification(
2315 &self,
2316 notification: acp::SessionNotification,
2317 ) -> Result<(), acp::Error> {
2318 let (thread, session_modes, session_config_options) = {
2319 let sessions = self.sessions.borrow();
2320 let session = sessions
2321 .get(¬ification.session_id)
2322 .context("Failed to get session")?;
2323 (
2324 session.thread.clone(),
2325 session.session_modes.clone(),
2326 session.config_options.clone(),
2327 )
2328 };
2329
2330 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
2331 current_mode_id,
2332 ..
2333 }) = ¬ification.update
2334 {
2335 if let Some(session_modes) = &session_modes {
2336 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
2337 }
2338 }
2339
2340 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
2341 config_options,
2342 ..
2343 }) = ¬ification.update
2344 {
2345 if let Some(opts) = &session_config_options {
2346 *opts.config_options.borrow_mut() = config_options.clone();
2347 opts.tx.borrow_mut().send(()).ok();
2348 }
2349 }
2350
2351 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
2352 && let Some(session_list) = self.session_list.borrow().as_ref()
2353 {
2354 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
2355 }
2356
2357 // Clone so we can inspect meta both before and after handing off to the thread
2358 let update_clone = notification.update.clone();
2359
2360 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
2361 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
2362 if let Some(meta) = &tc.meta {
2363 if let Some(terminal_info) = meta.get("terminal_info") {
2364 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
2365 {
2366 let terminal_id = acp::TerminalId::new(id_str);
2367 let cwd = terminal_info
2368 .get("cwd")
2369 .and_then(|v| v.as_str().map(PathBuf::from));
2370
2371 // Create a minimal display-only lower-level terminal and register it.
2372 let _ = thread.update(&mut self.cx.clone(), |thread, cx| {
2373 let builder = TerminalBuilder::new_display_only(
2374 CursorShape::default(),
2375 AlternateScroll::On,
2376 None,
2377 0,
2378 cx.background_executor(),
2379 thread.project().read(cx).path_style(cx),
2380 )?;
2381 let lower = cx.new(|cx| builder.subscribe(cx));
2382 thread.on_terminal_provider_event(
2383 TerminalProviderEvent::Created {
2384 terminal_id,
2385 label: tc.title.clone(),
2386 cwd,
2387 output_byte_limit: None,
2388 terminal: lower,
2389 },
2390 cx,
2391 );
2392 anyhow::Ok(())
2393 });
2394 }
2395 }
2396 }
2397 }
2398
2399 // Forward the update to the acp_thread as usual.
2400 thread.update(&mut self.cx.clone(), |thread, cx| {
2401 thread.handle_session_update(notification.update.clone(), cx)
2402 })??;
2403
2404 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
2405 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
2406 if let Some(meta) = &tcu.meta {
2407 if let Some(term_out) = meta.get("terminal_output") {
2408 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
2409 let terminal_id = acp::TerminalId::new(id_str);
2410 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
2411 let data = s.as_bytes().to_vec();
2412 let _ = thread.update(&mut self.cx.clone(), |thread, cx| {
2413 thread.on_terminal_provider_event(
2414 TerminalProviderEvent::Output { terminal_id, data },
2415 cx,
2416 );
2417 });
2418 }
2419 }
2420 }
2421
2422 // terminal_exit
2423 if let Some(term_exit) = meta.get("terminal_exit") {
2424 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
2425 let terminal_id = acp::TerminalId::new(id_str);
2426 let status = acp::TerminalExitStatus::new()
2427 .exit_code(
2428 term_exit
2429 .get("exit_code")
2430 .and_then(|v| v.as_u64())
2431 .map(|i| i as u32),
2432 )
2433 .signal(
2434 term_exit
2435 .get("signal")
2436 .and_then(|v| v.as_str().map(|s| s.to_string())),
2437 );
2438
2439 let _ = thread.update(&mut self.cx.clone(), |thread, cx| {
2440 thread.on_terminal_provider_event(
2441 TerminalProviderEvent::Exit {
2442 terminal_id,
2443 status,
2444 },
2445 cx,
2446 );
2447 });
2448 }
2449 }
2450 }
2451 }
2452
2453 Ok(())
2454 }
2455
2456 async fn create_terminal(
2457 &self,
2458 args: acp::CreateTerminalRequest,
2459 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
2460 let thread = self.session_thread(&args.session_id)?;
2461 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
2462
2463 let terminal_entity = acp_thread::create_terminal_entity(
2464 args.command.clone(),
2465 &args.args,
2466 args.env
2467 .into_iter()
2468 .map(|env| (env.name, env.value))
2469 .collect(),
2470 args.cwd.clone(),
2471 &project,
2472 &mut self.cx.clone(),
2473 )
2474 .await?;
2475
2476 // Register with renderer
2477 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
2478 thread.register_terminal_created(
2479 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
2480 format!("{} {}", args.command, args.args.join(" ")),
2481 args.cwd.clone(),
2482 args.output_byte_limit,
2483 terminal_entity,
2484 cx,
2485 )
2486 })?;
2487 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
2488 Ok(acp::CreateTerminalResponse::new(terminal_id))
2489 }
2490
2491 async fn kill_terminal(
2492 &self,
2493 args: acp::KillTerminalRequest,
2494 ) -> Result<acp::KillTerminalResponse, acp::Error> {
2495 self.session_thread(&args.session_id)?
2496 .update(&mut self.cx.clone(), |thread, cx| {
2497 thread.kill_terminal(args.terminal_id, cx)
2498 })??;
2499
2500 Ok(Default::default())
2501 }
2502
2503 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
2504 Err(acp::Error::method_not_found())
2505 }
2506
2507 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
2508 Err(acp::Error::method_not_found())
2509 }
2510
2511 async fn release_terminal(
2512 &self,
2513 args: acp::ReleaseTerminalRequest,
2514 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
2515 self.session_thread(&args.session_id)?
2516 .update(&mut self.cx.clone(), |thread, cx| {
2517 thread.release_terminal(args.terminal_id, cx)
2518 })??;
2519
2520 Ok(Default::default())
2521 }
2522
2523 async fn terminal_output(
2524 &self,
2525 args: acp::TerminalOutputRequest,
2526 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
2527 self.session_thread(&args.session_id)?
2528 .read_with(&mut self.cx.clone(), |thread, cx| {
2529 let out = thread
2530 .terminal(args.terminal_id)?
2531 .read(cx)
2532 .current_output(cx);
2533
2534 Ok(out)
2535 })?
2536 }
2537
2538 async fn wait_for_terminal_exit(
2539 &self,
2540 args: acp::WaitForTerminalExitRequest,
2541 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
2542 let exit_status = self
2543 .session_thread(&args.session_id)?
2544 .update(&mut self.cx.clone(), |thread, cx| {
2545 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
2546 })??
2547 .await;
2548
2549 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
2550 }
2551}
2552
2553impl ClientDelegate {
2554 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
2555 let sessions = self.sessions.borrow();
2556 sessions
2557 .get(session_id)
2558 .context("Failed to get session")
2559 .map(|session| session.thread.clone())
2560 }
2561}