1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::agent_server_store::{AgentServerCommand, AgentServerStore};
14use project::{AgentId, Project};
15use remote::remote_client::Interactive;
16use serde::Deserialize;
17use settings::Settings as _;
18use std::path::PathBuf;
19use std::process::Stdio;
20use std::rc::Rc;
21use std::{any::Any, cell::RefCell};
22use task::{ShellBuilder, SpawnInTerminal};
23use thiserror::Error;
24use util::ResultExt as _;
25use util::path_list::PathList;
26use util::process::Child;
27
28use anyhow::{Context as _, Result};
29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
30
31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
32use terminal::TerminalBuilder;
33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
34
35use crate::GEMINI_ID;
36
37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
38
39#[derive(Debug, Error)]
40#[error("Unsupported version")]
41pub struct UnsupportedVersion;
42
43pub struct AcpConnection {
44 id: AgentId,
45 telemetry_id: SharedString,
46 connection: Rc<acp::ClientSideConnection>,
47 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
48 auth_methods: Vec<acp::AuthMethod>,
49 agent_server_store: WeakEntity<AgentServerStore>,
50 agent_capabilities: acp::AgentCapabilities,
51 default_mode: Option<acp::SessionModeId>,
52 default_model: Option<acp::ModelId>,
53 default_config_options: HashMap<String, String>,
54 child: Child,
55 session_list: Option<Rc<AcpSessionList>>,
56 _io_task: Task<Result<(), acp::Error>>,
57 _wait_task: Task<Result<()>>,
58 _stderr_task: Task<Result<()>>,
59}
60
61struct ConfigOptions {
62 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
63 tx: Rc<RefCell<watch::Sender<()>>>,
64 rx: watch::Receiver<()>,
65}
66
67impl ConfigOptions {
68 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
69 let (tx, rx) = watch::channel(());
70 Self {
71 config_options,
72 tx: Rc::new(RefCell::new(tx)),
73 rx,
74 }
75 }
76}
77
78pub struct AcpSession {
79 thread: WeakEntity<AcpThread>,
80 suppress_abort_err: bool,
81 models: Option<Rc<RefCell<acp::SessionModelState>>>,
82 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
83 config_options: Option<ConfigOptions>,
84}
85
86pub struct AcpSessionList {
87 connection: Rc<acp::ClientSideConnection>,
88 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
89 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
90}
91
92impl AcpSessionList {
93 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
94 let (tx, rx) = smol::channel::unbounded();
95 Self {
96 connection,
97 updates_tx: tx,
98 updates_rx: rx,
99 }
100 }
101
102 fn notify_update(&self) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::Refresh)
105 .log_err();
106 }
107
108 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
109 self.updates_tx
110 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
111 .log_err();
112 }
113}
114
115impl AgentSessionList for AcpSessionList {
116 fn list_sessions(
117 &self,
118 request: AgentSessionListRequest,
119 cx: &mut App,
120 ) -> Task<Result<AgentSessionListResponse>> {
121 let conn = self.connection.clone();
122 cx.foreground_executor().spawn(async move {
123 let acp_request = acp::ListSessionsRequest::new()
124 .cwd(request.cwd)
125 .cursor(request.cursor);
126 let response = conn.list_sessions(acp_request).await?;
127 Ok(AgentSessionListResponse {
128 sessions: response
129 .sessions
130 .into_iter()
131 .map(|s| AgentSessionInfo {
132 session_id: s.session_id,
133 work_dirs: Some(PathList::new(&[s.cwd])),
134 title: s.title.map(Into::into),
135 updated_at: s.updated_at.and_then(|date_str| {
136 chrono::DateTime::parse_from_rfc3339(&date_str)
137 .ok()
138 .map(|dt| dt.with_timezone(&chrono::Utc))
139 }),
140 created_at: None,
141 meta: s.meta,
142 })
143 .collect(),
144 next_cursor: response.next_cursor,
145 meta: response.meta,
146 })
147 })
148 }
149
150 fn watch(
151 &self,
152 _cx: &mut App,
153 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
154 Some(self.updates_rx.clone())
155 }
156
157 fn notify_refresh(&self) {
158 self.notify_update();
159 }
160
161 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
162 self
163 }
164}
165
166pub async fn connect(
167 agent_id: AgentId,
168 project: Entity<Project>,
169 command: AgentServerCommand,
170 agent_server_store: WeakEntity<AgentServerStore>,
171 default_mode: Option<acp::SessionModeId>,
172 default_model: Option<acp::ModelId>,
173 default_config_options: HashMap<String, String>,
174 cx: &mut AsyncApp,
175) -> Result<Rc<dyn AgentConnection>> {
176 let conn = AcpConnection::stdio(
177 agent_id,
178 project,
179 command.clone(),
180 agent_server_store,
181 default_mode,
182 default_model,
183 default_config_options,
184 cx,
185 )
186 .await?;
187 Ok(Rc::new(conn) as _)
188}
189
190const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
191
192impl AcpConnection {
193 pub async fn stdio(
194 agent_id: AgentId,
195 project: Entity<Project>,
196 command: AgentServerCommand,
197 agent_server_store: WeakEntity<AgentServerStore>,
198 default_mode: Option<acp::SessionModeId>,
199 default_model: Option<acp::ModelId>,
200 default_config_options: HashMap<String, String>,
201 cx: &mut AsyncApp,
202 ) -> Result<Self> {
203 let root_dir = project.read_with(cx, |project, cx| {
204 project
205 .default_path_list(cx)
206 .ordered_paths()
207 .next()
208 .cloned()
209 });
210 let original_command = command.clone();
211 let (path, args, env) = project
212 .read_with(cx, |project, cx| {
213 project.remote_client().and_then(|client| {
214 let template = client
215 .read(cx)
216 .build_command_with_options(
217 Some(command.path.display().to_string()),
218 &command.args,
219 &command.env.clone().into_iter().flatten().collect(),
220 root_dir.as_ref().map(|path| path.display().to_string()),
221 None,
222 Interactive::No,
223 )
224 .log_err()?;
225 Some((template.program, template.args, template.env))
226 })
227 })
228 .unwrap_or_else(|| {
229 (
230 command.path.display().to_string(),
231 command.args,
232 command.env.unwrap_or_default(),
233 )
234 });
235
236 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
237 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
238 let mut child = builder.build_std_command(Some(path.clone()), &args);
239 child.envs(env.clone());
240 if let Some(cwd) = project.read_with(cx, |project, _cx| {
241 if project.is_local() {
242 root_dir.as_ref()
243 } else {
244 None
245 }
246 }) {
247 child.current_dir(cwd);
248 }
249 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
250
251 let stdout = child.stdout.take().context("Failed to take stdout")?;
252 let stdin = child.stdin.take().context("Failed to take stdin")?;
253 let stderr = child.stderr.take().context("Failed to take stderr")?;
254 log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
255 log::trace!("Spawned (pid: {})", child.id());
256
257 let sessions = Rc::new(RefCell::new(HashMap::default()));
258
259 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
260 (
261 release_channel::ReleaseChannel::try_global(cx)
262 .map(|release_channel| release_channel.display_name()),
263 release_channel::AppVersion::global(cx).to_string(),
264 )
265 });
266
267 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
268 Rc::new(RefCell::new(None));
269
270 let client = ClientDelegate {
271 sessions: sessions.clone(),
272 session_list: client_session_list.clone(),
273 cx: cx.clone(),
274 };
275 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
276 let foreground_executor = cx.foreground_executor().clone();
277 move |fut| {
278 foreground_executor.spawn(fut).detach();
279 }
280 });
281
282 let io_task = cx.background_spawn(io_task);
283
284 let stderr_task = cx.background_spawn(async move {
285 let mut stderr = BufReader::new(stderr);
286 let mut line = String::new();
287 while let Ok(n) = stderr.read_line(&mut line).await
288 && n > 0
289 {
290 log::warn!("agent stderr: {}", line.trim());
291 line.clear();
292 }
293 Ok(())
294 });
295
296 let wait_task = cx.spawn({
297 let sessions = sessions.clone();
298 let status_fut = child.status();
299 async move |cx| {
300 let status = status_fut.await?;
301
302 for session in sessions.borrow().values() {
303 session
304 .thread
305 .update(cx, |thread, cx| {
306 thread.emit_load_error(LoadError::Exited { status }, cx)
307 })
308 .ok();
309 }
310
311 anyhow::Ok(())
312 }
313 });
314
315 let connection = Rc::new(connection);
316
317 cx.update(|cx| {
318 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
319 registry.set_active_connection(agent_id.clone(), &connection, cx)
320 });
321 });
322
323 let response = connection
324 .initialize(
325 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
326 .client_capabilities(
327 acp::ClientCapabilities::new()
328 .fs(acp::FileSystemCapabilities::new()
329 .read_text_file(true)
330 .write_text_file(true))
331 .terminal(true)
332 .auth(acp::AuthCapabilities::new().terminal(true))
333 // Experimental: Allow for rendering terminal output from the agents
334 .meta(acp::Meta::from_iter([
335 ("terminal_output".into(), true.into()),
336 ("terminal-auth".into(), true.into()),
337 ])),
338 )
339 .client_info(
340 acp::Implementation::new("zed", version)
341 .title(release_channel.map(ToOwned::to_owned)),
342 ),
343 )
344 .await?;
345
346 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
347 return Err(UnsupportedVersion.into());
348 }
349
350 let telemetry_id = response
351 .agent_info
352 // Use the one the agent provides if we have one
353 .map(|info| info.name.into())
354 // Otherwise, just use the name
355 .unwrap_or_else(|| agent_id.0.clone());
356
357 let session_list = if response
358 .agent_capabilities
359 .session_capabilities
360 .list
361 .is_some()
362 {
363 let list = Rc::new(AcpSessionList::new(connection.clone()));
364 *client_session_list.borrow_mut() = Some(list.clone());
365 Some(list)
366 } else {
367 None
368 };
369
370 // TODO: Remove this override once Google team releases their official auth methods
371 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
372 let mut gemini_args = original_command.args.clone();
373 gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
374 let value = serde_json::json!({
375 "label": "gemini /auth",
376 "command": original_command.path.to_string_lossy(),
377 "args": gemini_args,
378 "env": original_command.env.unwrap_or_default(),
379 });
380 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
381 vec![acp::AuthMethod::Agent(
382 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
383 .description("Login with your Google or Vertex AI account")
384 .meta(meta),
385 )]
386 } else {
387 response.auth_methods
388 };
389 Ok(Self {
390 id: agent_id,
391 auth_methods,
392 agent_server_store,
393 connection,
394 telemetry_id,
395 sessions,
396 agent_capabilities: response.agent_capabilities,
397 default_mode,
398 default_model,
399 default_config_options,
400 session_list,
401 _io_task: io_task,
402 _wait_task: wait_task,
403 _stderr_task: stderr_task,
404 child,
405 })
406 }
407
408 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
409 &self.agent_capabilities.prompt_capabilities
410 }
411
412 fn apply_default_config_options(
413 &self,
414 session_id: &acp::SessionId,
415 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
416 cx: &mut AsyncApp,
417 ) {
418 let id = self.id.clone();
419 let defaults_to_apply: Vec<_> = {
420 let config_opts_ref = config_options.borrow();
421 config_opts_ref
422 .iter()
423 .filter_map(|config_option| {
424 let default_value = self.default_config_options.get(&*config_option.id.0)?;
425
426 let is_valid = match &config_option.kind {
427 acp::SessionConfigKind::Select(select) => match &select.options {
428 acp::SessionConfigSelectOptions::Ungrouped(options) => options
429 .iter()
430 .any(|opt| &*opt.value.0 == default_value.as_str()),
431 acp::SessionConfigSelectOptions::Grouped(groups) => {
432 groups.iter().any(|g| {
433 g.options
434 .iter()
435 .any(|opt| &*opt.value.0 == default_value.as_str())
436 })
437 }
438 _ => false,
439 },
440 _ => false,
441 };
442
443 if is_valid {
444 let initial_value = match &config_option.kind {
445 acp::SessionConfigKind::Select(select) => {
446 Some(select.current_value.clone())
447 }
448 _ => None,
449 };
450 Some((
451 config_option.id.clone(),
452 default_value.clone(),
453 initial_value,
454 ))
455 } else {
456 log::warn!(
457 "`{}` is not a valid value for config option `{}` in {}",
458 default_value,
459 config_option.id.0,
460 id
461 );
462 None
463 }
464 })
465 .collect()
466 };
467
468 for (config_id, default_value, initial_value) in defaults_to_apply {
469 cx.spawn({
470 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
471 let session_id = session_id.clone();
472 let config_id_clone = config_id.clone();
473 let config_opts = config_options.clone();
474 let conn = self.connection.clone();
475 async move |_| {
476 let result = conn
477 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
478 session_id,
479 config_id_clone.clone(),
480 default_value_id,
481 ))
482 .await
483 .log_err();
484
485 if result.is_none() {
486 if let Some(initial) = initial_value {
487 let mut opts = config_opts.borrow_mut();
488 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
489 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
490 select.current_value = initial;
491 }
492 }
493 }
494 }
495 }
496 })
497 .detach();
498
499 let mut opts = config_options.borrow_mut();
500 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
501 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
502 select.current_value = acp::SessionConfigValueId::new(default_value);
503 }
504 }
505 }
506 }
507}
508
509impl Drop for AcpConnection {
510 fn drop(&mut self) {
511 self.child.kill().log_err();
512 }
513}
514
515fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
516 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
517}
518
519fn terminal_auth_task(
520 command: &AgentServerCommand,
521 agent_id: &AgentId,
522 method: &acp::AuthMethodTerminal,
523) -> SpawnInTerminal {
524 acp_thread::build_terminal_auth_task(
525 terminal_auth_task_id(agent_id, &method.id),
526 method.name.clone(),
527 command.path.to_string_lossy().into_owned(),
528 command.args.clone(),
529 command.env.clone().unwrap_or_default(),
530 )
531}
532
533/// Used to support the _meta method prior to stabilization
534fn meta_terminal_auth_task(
535 agent_id: &AgentId,
536 method_id: &acp::AuthMethodId,
537 method: &acp::AuthMethod,
538) -> Option<SpawnInTerminal> {
539 #[derive(Deserialize)]
540 struct MetaTerminalAuth {
541 label: String,
542 command: String,
543 #[serde(default)]
544 args: Vec<String>,
545 #[serde(default)]
546 env: HashMap<String, String>,
547 }
548
549 let meta = match method {
550 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
551 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
552 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
553 _ => None,
554 }?;
555 let terminal_auth =
556 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
557
558 Some(acp_thread::build_terminal_auth_task(
559 terminal_auth_task_id(agent_id, method_id),
560 terminal_auth.label.clone(),
561 terminal_auth.command,
562 terminal_auth.args,
563 terminal_auth.env,
564 ))
565}
566
567impl AgentConnection for AcpConnection {
568 fn agent_id(&self) -> AgentId {
569 self.id.clone()
570 }
571
572 fn telemetry_id(&self) -> SharedString {
573 self.telemetry_id.clone()
574 }
575
576 fn new_session(
577 self: Rc<Self>,
578 project: Entity<Project>,
579 work_dirs: PathList,
580 cx: &mut App,
581 ) -> Task<Result<Entity<AcpThread>>> {
582 // TODO: remove this once ACP supports multiple working directories
583 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
584 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
585 };
586 let name = self.id.0.clone();
587 let mcp_servers = mcp_servers_for_project(&project, cx);
588
589 cx.spawn(async move |cx| {
590 let response = self.connection
591 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
592 .await
593 .map_err(map_acp_error)?;
594
595 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
596
597 if let Some(default_mode) = self.default_mode.clone() {
598 if let Some(modes) = modes.as_ref() {
599 let mut modes_ref = modes.borrow_mut();
600 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
601
602 if has_mode {
603 let initial_mode_id = modes_ref.current_mode_id.clone();
604
605 cx.spawn({
606 let default_mode = default_mode.clone();
607 let session_id = response.session_id.clone();
608 let modes = modes.clone();
609 let conn = self.connection.clone();
610 async move |_| {
611 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
612 .await.log_err();
613
614 if result.is_none() {
615 modes.borrow_mut().current_mode_id = initial_mode_id;
616 }
617 }
618 }).detach();
619
620 modes_ref.current_mode_id = default_mode;
621 } else {
622 let available_modes = modes_ref
623 .available_modes
624 .iter()
625 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
626 .collect::<Vec<_>>()
627 .join("\n");
628
629 log::warn!(
630 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
631 );
632 }
633 }
634 }
635
636 if let Some(default_model) = self.default_model.clone() {
637 if let Some(models) = models.as_ref() {
638 let mut models_ref = models.borrow_mut();
639 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
640
641 if has_model {
642 let initial_model_id = models_ref.current_model_id.clone();
643
644 cx.spawn({
645 let default_model = default_model.clone();
646 let session_id = response.session_id.clone();
647 let models = models.clone();
648 let conn = self.connection.clone();
649 async move |_| {
650 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
651 .await.log_err();
652
653 if result.is_none() {
654 models.borrow_mut().current_model_id = initial_model_id;
655 }
656 }
657 }).detach();
658
659 models_ref.current_model_id = default_model;
660 } else {
661 let available_models = models_ref
662 .available_models
663 .iter()
664 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
665 .collect::<Vec<_>>()
666 .join("\n");
667
668 log::warn!(
669 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
670 );
671 }
672 }
673 }
674
675 if let Some(config_opts) = config_options.as_ref() {
676 self.apply_default_config_options(&response.session_id, config_opts, cx);
677 }
678
679 let action_log = cx.new(|_| ActionLog::new(project.clone()));
680 let thread: Entity<AcpThread> = cx.new(|cx| {
681 AcpThread::new(
682 None,
683 None,
684 Some(work_dirs),
685 self.clone(),
686 project,
687 action_log,
688 acp_thread::ThreadId::new(),
689 response.session_id.clone(),
690 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
691 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
692 cx,
693 )
694 });
695
696 self.sessions.borrow_mut().insert(
697 response.session_id,
698 AcpSession {
699 thread: thread.downgrade(),
700 suppress_abort_err: false,
701 session_modes: modes,
702 models,
703 config_options: config_options.map(ConfigOptions::new),
704 },
705 );
706
707 Ok(thread)
708 })
709 }
710
711 fn supports_load_session(&self) -> bool {
712 self.agent_capabilities.load_session
713 }
714
715 fn supports_resume_session(&self) -> bool {
716 self.agent_capabilities
717 .session_capabilities
718 .resume
719 .is_some()
720 }
721
722 fn load_session(
723 self: Rc<Self>,
724 session_id: acp::SessionId,
725 project: Entity<Project>,
726 work_dirs: PathList,
727 title: Option<SharedString>,
728 cx: &mut App,
729 ) -> Task<Result<Entity<AcpThread>>> {
730 if !self.agent_capabilities.load_session {
731 return Task::ready(Err(anyhow!(LoadError::Other(
732 "Loading sessions is not supported by this agent.".into()
733 ))));
734 }
735 // TODO: remove this once ACP supports multiple working directories
736 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
737 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
738 };
739
740 let mcp_servers = mcp_servers_for_project(&project, cx);
741 let action_log = cx.new(|_| ActionLog::new(project.clone()));
742 let thread: Entity<AcpThread> = cx.new(|cx| {
743 AcpThread::new(
744 None,
745 title,
746 Some(work_dirs.clone()),
747 self.clone(),
748 project,
749 action_log,
750 acp_thread::ThreadId::new(),
751 session_id.clone(),
752 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
753 cx,
754 )
755 });
756
757 self.sessions.borrow_mut().insert(
758 session_id.clone(),
759 AcpSession {
760 thread: thread.downgrade(),
761 suppress_abort_err: false,
762 session_modes: None,
763 models: None,
764 config_options: None,
765 },
766 );
767
768 cx.spawn(async move |cx| {
769 let response = match self
770 .connection
771 .load_session(
772 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
773 )
774 .await
775 {
776 Ok(response) => response,
777 Err(err) => {
778 self.sessions.borrow_mut().remove(&session_id);
779 return Err(map_acp_error(err));
780 }
781 };
782
783 let (modes, models, config_options) =
784 config_state(response.modes, response.models, response.config_options);
785
786 if let Some(config_opts) = config_options.as_ref() {
787 self.apply_default_config_options(&session_id, config_opts, cx);
788 }
789
790 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
791 session.session_modes = modes;
792 session.models = models;
793 session.config_options = config_options.map(ConfigOptions::new);
794 }
795
796 Ok(thread)
797 })
798 }
799
800 fn resume_session(
801 self: Rc<Self>,
802 session_id: acp::SessionId,
803 project: Entity<Project>,
804 work_dirs: PathList,
805 title: Option<SharedString>,
806 cx: &mut App,
807 ) -> Task<Result<Entity<AcpThread>>> {
808 if self
809 .agent_capabilities
810 .session_capabilities
811 .resume
812 .is_none()
813 {
814 return Task::ready(Err(anyhow!(LoadError::Other(
815 "Resuming sessions is not supported by this agent.".into()
816 ))));
817 }
818 // TODO: remove this once ACP supports multiple working directories
819 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
820 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
821 };
822
823 let mcp_servers = mcp_servers_for_project(&project, cx);
824 let action_log = cx.new(|_| ActionLog::new(project.clone()));
825 let thread: Entity<AcpThread> = cx.new(|cx| {
826 AcpThread::new(
827 None,
828 title,
829 Some(work_dirs),
830 self.clone(),
831 project,
832 action_log,
833 acp_thread::ThreadId::new(),
834 session_id.clone(),
835 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
836 cx,
837 )
838 });
839
840 self.sessions.borrow_mut().insert(
841 session_id.clone(),
842 AcpSession {
843 thread: thread.downgrade(),
844 suppress_abort_err: false,
845 session_modes: None,
846 models: None,
847 config_options: None,
848 },
849 );
850
851 cx.spawn(async move |cx| {
852 let response = match self
853 .connection
854 .resume_session(
855 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
856 .mcp_servers(mcp_servers),
857 )
858 .await
859 {
860 Ok(response) => response,
861 Err(err) => {
862 self.sessions.borrow_mut().remove(&session_id);
863 return Err(map_acp_error(err));
864 }
865 };
866
867 let (modes, models, config_options) =
868 config_state(response.modes, response.models, response.config_options);
869
870 if let Some(config_opts) = config_options.as_ref() {
871 self.apply_default_config_options(&session_id, config_opts, cx);
872 }
873
874 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
875 session.session_modes = modes;
876 session.models = models;
877 session.config_options = config_options.map(ConfigOptions::new);
878 }
879
880 Ok(thread)
881 })
882 }
883
884 fn supports_close_session(&self) -> bool {
885 self.agent_capabilities.session_capabilities.close.is_some()
886 }
887
888 fn close_session(
889 self: Rc<Self>,
890 session_id: &acp::SessionId,
891 cx: &mut App,
892 ) -> Task<Result<()>> {
893 if !self.supports_close_session() {
894 return Task::ready(Err(anyhow!(LoadError::Other(
895 "Closing sessions is not supported by this agent.".into()
896 ))));
897 }
898
899 let conn = self.connection.clone();
900 let session_id = session_id.clone();
901 cx.foreground_executor().spawn(async move {
902 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
903 .await?;
904 self.sessions.borrow_mut().remove(&session_id);
905 Ok(())
906 })
907 }
908
909 fn auth_methods(&self) -> &[acp::AuthMethod] {
910 &self.auth_methods
911 }
912
913 fn terminal_auth_task(
914 &self,
915 method_id: &acp::AuthMethodId,
916 cx: &App,
917 ) -> Option<Task<Result<SpawnInTerminal>>> {
918 let method = self
919 .auth_methods
920 .iter()
921 .find(|method| method.id() == method_id)?;
922
923 match method {
924 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
925 let agent_id = self.id.clone();
926 let terminal = terminal.clone();
927 let store = self.agent_server_store.clone();
928 Some(cx.spawn(async move |cx| {
929 let command = store
930 .update(cx, |store, cx| {
931 let agent = store
932 .get_external_agent(&agent_id)
933 .context("Agent server not found")?;
934 anyhow::Ok(agent.get_command(
935 terminal.args.clone(),
936 HashMap::from_iter(terminal.env.clone()),
937 &mut cx.to_async(),
938 ))
939 })?
940 .context("Failed to get agent command")?
941 .await?;
942 Ok(terminal_auth_task(&command, &agent_id, &terminal))
943 }))
944 }
945 _ => meta_terminal_auth_task(&self.id, method_id, method)
946 .map(|task| Task::ready(Ok(task))),
947 }
948 }
949
950 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
951 let conn = self.connection.clone();
952 cx.foreground_executor().spawn(async move {
953 conn.authenticate(acp::AuthenticateRequest::new(method_id))
954 .await?;
955 Ok(())
956 })
957 }
958
959 fn prompt(
960 &self,
961 _id: Option<acp_thread::UserMessageId>,
962 params: acp::PromptRequest,
963 cx: &mut App,
964 ) -> Task<Result<acp::PromptResponse>> {
965 let conn = self.connection.clone();
966 let sessions = self.sessions.clone();
967 let session_id = params.session_id.clone();
968 cx.foreground_executor().spawn(async move {
969 let result = conn.prompt(params).await;
970
971 let mut suppress_abort_err = false;
972
973 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
974 suppress_abort_err = session.suppress_abort_err;
975 session.suppress_abort_err = false;
976 }
977
978 match result {
979 Ok(response) => Ok(response),
980 Err(err) => {
981 if err.code == acp::ErrorCode::AuthRequired {
982 return Err(anyhow!(acp::Error::auth_required()));
983 }
984
985 if err.code != ErrorCode::InternalError {
986 anyhow::bail!(err)
987 }
988
989 let Some(data) = &err.data else {
990 anyhow::bail!(err)
991 };
992
993 // Temporary workaround until the following PR is generally available:
994 // https://github.com/google-gemini/gemini-cli/pull/6656
995
996 #[derive(Deserialize)]
997 #[serde(deny_unknown_fields)]
998 struct ErrorDetails {
999 details: Box<str>,
1000 }
1001
1002 match serde_json::from_value(data.clone()) {
1003 Ok(ErrorDetails { details }) => {
1004 if suppress_abort_err
1005 && (details.contains("This operation was aborted")
1006 || details.contains("The user aborted a request"))
1007 {
1008 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1009 } else {
1010 Err(anyhow!(details))
1011 }
1012 }
1013 Err(_) => Err(anyhow!(err)),
1014 }
1015 }
1016 }
1017 })
1018 }
1019
1020 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1021 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1022 session.suppress_abort_err = true;
1023 }
1024 let conn = self.connection.clone();
1025 let params = acp::CancelNotification::new(session_id.clone());
1026 cx.foreground_executor()
1027 .spawn(async move { conn.cancel(params).await })
1028 .detach();
1029 }
1030
1031 fn session_modes(
1032 &self,
1033 session_id: &acp::SessionId,
1034 _cx: &App,
1035 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1036 let sessions = self.sessions.clone();
1037 let sessions_ref = sessions.borrow();
1038 let Some(session) = sessions_ref.get(session_id) else {
1039 return None;
1040 };
1041
1042 if let Some(modes) = session.session_modes.as_ref() {
1043 Some(Rc::new(AcpSessionModes {
1044 connection: self.connection.clone(),
1045 session_id: session_id.clone(),
1046 state: modes.clone(),
1047 }) as _)
1048 } else {
1049 None
1050 }
1051 }
1052
1053 fn model_selector(
1054 &self,
1055 session_id: &acp::SessionId,
1056 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1057 let sessions = self.sessions.clone();
1058 let sessions_ref = sessions.borrow();
1059 let Some(session) = sessions_ref.get(session_id) else {
1060 return None;
1061 };
1062
1063 if let Some(models) = session.models.as_ref() {
1064 Some(Rc::new(AcpModelSelector::new(
1065 session_id.clone(),
1066 self.connection.clone(),
1067 models.clone(),
1068 )) as _)
1069 } else {
1070 None
1071 }
1072 }
1073
1074 fn session_config_options(
1075 &self,
1076 session_id: &acp::SessionId,
1077 _cx: &App,
1078 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1079 let sessions = self.sessions.borrow();
1080 let session = sessions.get(session_id)?;
1081
1082 let config_opts = session.config_options.as_ref()?;
1083
1084 Some(Rc::new(AcpSessionConfigOptions {
1085 session_id: session_id.clone(),
1086 connection: self.connection.clone(),
1087 state: config_opts.config_options.clone(),
1088 watch_tx: config_opts.tx.clone(),
1089 watch_rx: config_opts.rx.clone(),
1090 }) as _)
1091 }
1092
1093 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1094 self.session_list.clone().map(|s| s as _)
1095 }
1096
1097 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1098 self
1099 }
1100}
1101
1102fn map_acp_error(err: acp::Error) -> anyhow::Error {
1103 if err.code == acp::ErrorCode::AuthRequired {
1104 let mut error = AuthRequired::new();
1105
1106 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1107 error = error.with_description(err.message);
1108 }
1109
1110 anyhow!(error)
1111 } else {
1112 anyhow!(err)
1113 }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118 use super::*;
1119
1120 #[test]
1121 fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1122 let command = AgentServerCommand {
1123 path: "/path/to/agent".into(),
1124 args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1125 env: Some(HashMap::from_iter([
1126 ("BASE".into(), "1".into()),
1127 ("SHARED".into(), "override".into()),
1128 ("EXTRA".into(), "2".into()),
1129 ])),
1130 };
1131 let method = acp::AuthMethodTerminal::new("login", "Login");
1132
1133 let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1134
1135 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1136 assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1137 assert_eq!(
1138 task.env,
1139 HashMap::from_iter([
1140 ("BASE".into(), "1".into()),
1141 ("SHARED".into(), "override".into()),
1142 ("EXTRA".into(), "2".into()),
1143 ])
1144 );
1145 assert_eq!(task.label, "Login");
1146 assert_eq!(task.command_label, "Login");
1147 }
1148
1149 #[test]
1150 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1151 let method_id = acp::AuthMethodId::new("legacy-login");
1152 let method = acp::AuthMethod::Agent(
1153 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1154 "terminal-auth".to_string(),
1155 serde_json::json!({
1156 "label": "legacy /auth",
1157 "command": "legacy-agent",
1158 "args": ["auth", "--interactive"],
1159 "env": {
1160 "AUTH_MODE": "interactive",
1161 },
1162 }),
1163 )])),
1164 );
1165
1166 let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1167 .expect("expected legacy terminal auth task");
1168
1169 assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1170 assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1171 assert_eq!(task.args, vec!["auth", "--interactive"]);
1172 assert_eq!(
1173 task.env,
1174 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1175 );
1176 assert_eq!(task.label, "legacy /auth");
1177 }
1178
1179 #[test]
1180 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1181 let method_id = acp::AuthMethodId::new("legacy-login");
1182 let method = acp::AuthMethod::Agent(
1183 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1184 "terminal-auth".to_string(),
1185 serde_json::json!({
1186 "label": "legacy /auth",
1187 }),
1188 )])),
1189 );
1190
1191 assert!(
1192 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1193 );
1194 }
1195
1196 #[test]
1197 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1198 let method_id = acp::AuthMethodId::new("login");
1199 let method = acp::AuthMethod::Terminal(
1200 acp::AuthMethodTerminal::new(method_id, "Login")
1201 .args(vec!["/auth".into()])
1202 .env(std::collections::HashMap::from_iter([(
1203 "AUTH_MODE".into(),
1204 "first-class".into(),
1205 )]))
1206 .meta(acp::Meta::from_iter([(
1207 "terminal-auth".to_string(),
1208 serde_json::json!({
1209 "label": "legacy /auth",
1210 "command": "legacy-agent",
1211 "args": ["legacy-auth"],
1212 "env": {
1213 "AUTH_MODE": "legacy",
1214 },
1215 }),
1216 )])),
1217 );
1218
1219 let command = AgentServerCommand {
1220 path: "/path/to/agent".into(),
1221 args: vec!["--acp".into(), "/auth".into()],
1222 env: Some(HashMap::from_iter([
1223 ("BASE".into(), "1".into()),
1224 ("AUTH_MODE".into(), "first-class".into()),
1225 ])),
1226 };
1227
1228 let task = match &method {
1229 acp::AuthMethod::Terminal(terminal) => {
1230 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1231 }
1232 _ => unreachable!(),
1233 };
1234
1235 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1236 assert_eq!(task.args, vec!["--acp", "/auth"]);
1237 assert_eq!(
1238 task.env,
1239 HashMap::from_iter([
1240 ("BASE".into(), "1".into()),
1241 ("AUTH_MODE".into(), "first-class".into()),
1242 ])
1243 );
1244 assert_eq!(task.label, "Login");
1245 }
1246}
1247
1248fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1249 let context_server_store = project.read(cx).context_server_store().read(cx);
1250 let is_local = project.read(cx).is_local();
1251 context_server_store
1252 .configured_server_ids()
1253 .iter()
1254 .filter_map(|id| {
1255 let configuration = context_server_store.configuration_for_server(id)?;
1256 match &*configuration {
1257 project::context_server_store::ContextServerConfiguration::Custom {
1258 command,
1259 remote,
1260 ..
1261 }
1262 | project::context_server_store::ContextServerConfiguration::Extension {
1263 command,
1264 remote,
1265 ..
1266 } if is_local || *remote => Some(acp::McpServer::Stdio(
1267 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1268 .args(command.args.clone())
1269 .env(if let Some(env) = command.env.as_ref() {
1270 env.iter()
1271 .map(|(name, value)| acp::EnvVariable::new(name, value))
1272 .collect()
1273 } else {
1274 vec![]
1275 }),
1276 )),
1277 project::context_server_store::ContextServerConfiguration::Http {
1278 url,
1279 headers,
1280 timeout: _,
1281 } => Some(acp::McpServer::Http(
1282 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1283 headers
1284 .iter()
1285 .map(|(name, value)| acp::HttpHeader::new(name, value))
1286 .collect(),
1287 ),
1288 )),
1289 _ => None,
1290 }
1291 })
1292 .collect()
1293}
1294
1295fn config_state(
1296 modes: Option<acp::SessionModeState>,
1297 models: Option<acp::SessionModelState>,
1298 config_options: Option<Vec<acp::SessionConfigOption>>,
1299) -> (
1300 Option<Rc<RefCell<acp::SessionModeState>>>,
1301 Option<Rc<RefCell<acp::SessionModelState>>>,
1302 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1303) {
1304 if let Some(opts) = config_options {
1305 return (None, None, Some(Rc::new(RefCell::new(opts))));
1306 }
1307
1308 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1309 let models = models.map(|models| Rc::new(RefCell::new(models)));
1310 (modes, models, None)
1311}
1312
1313struct AcpSessionModes {
1314 session_id: acp::SessionId,
1315 connection: Rc<acp::ClientSideConnection>,
1316 state: Rc<RefCell<acp::SessionModeState>>,
1317}
1318
1319impl acp_thread::AgentSessionModes for AcpSessionModes {
1320 fn current_mode(&self) -> acp::SessionModeId {
1321 self.state.borrow().current_mode_id.clone()
1322 }
1323
1324 fn all_modes(&self) -> Vec<acp::SessionMode> {
1325 self.state.borrow().available_modes.clone()
1326 }
1327
1328 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1329 let connection = self.connection.clone();
1330 let session_id = self.session_id.clone();
1331 let old_mode_id;
1332 {
1333 let mut state = self.state.borrow_mut();
1334 old_mode_id = state.current_mode_id.clone();
1335 state.current_mode_id = mode_id.clone();
1336 };
1337 let state = self.state.clone();
1338 cx.foreground_executor().spawn(async move {
1339 let result = connection
1340 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1341 .await;
1342
1343 if result.is_err() {
1344 state.borrow_mut().current_mode_id = old_mode_id;
1345 }
1346
1347 result?;
1348
1349 Ok(())
1350 })
1351 }
1352}
1353
1354struct AcpModelSelector {
1355 session_id: acp::SessionId,
1356 connection: Rc<acp::ClientSideConnection>,
1357 state: Rc<RefCell<acp::SessionModelState>>,
1358}
1359
1360impl AcpModelSelector {
1361 fn new(
1362 session_id: acp::SessionId,
1363 connection: Rc<acp::ClientSideConnection>,
1364 state: Rc<RefCell<acp::SessionModelState>>,
1365 ) -> Self {
1366 Self {
1367 session_id,
1368 connection,
1369 state,
1370 }
1371 }
1372}
1373
1374impl acp_thread::AgentModelSelector for AcpModelSelector {
1375 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1376 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1377 self.state
1378 .borrow()
1379 .available_models
1380 .clone()
1381 .into_iter()
1382 .map(acp_thread::AgentModelInfo::from)
1383 .collect(),
1384 )))
1385 }
1386
1387 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1388 let connection = self.connection.clone();
1389 let session_id = self.session_id.clone();
1390 let old_model_id;
1391 {
1392 let mut state = self.state.borrow_mut();
1393 old_model_id = state.current_model_id.clone();
1394 state.current_model_id = model_id.clone();
1395 };
1396 let state = self.state.clone();
1397 cx.foreground_executor().spawn(async move {
1398 let result = connection
1399 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1400 .await;
1401
1402 if result.is_err() {
1403 state.borrow_mut().current_model_id = old_model_id;
1404 }
1405
1406 result?;
1407
1408 Ok(())
1409 })
1410 }
1411
1412 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1413 let state = self.state.borrow();
1414 Task::ready(
1415 state
1416 .available_models
1417 .iter()
1418 .find(|m| m.model_id == state.current_model_id)
1419 .cloned()
1420 .map(acp_thread::AgentModelInfo::from)
1421 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1422 )
1423 }
1424}
1425
1426struct AcpSessionConfigOptions {
1427 session_id: acp::SessionId,
1428 connection: Rc<acp::ClientSideConnection>,
1429 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1430 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1431 watch_rx: watch::Receiver<()>,
1432}
1433
1434impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1435 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1436 self.state.borrow().clone()
1437 }
1438
1439 fn set_config_option(
1440 &self,
1441 config_id: acp::SessionConfigId,
1442 value: acp::SessionConfigValueId,
1443 cx: &mut App,
1444 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1445 let connection = self.connection.clone();
1446 let session_id = self.session_id.clone();
1447 let state = self.state.clone();
1448
1449 let watch_tx = self.watch_tx.clone();
1450
1451 cx.foreground_executor().spawn(async move {
1452 let response = connection
1453 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1454 session_id, config_id, value,
1455 ))
1456 .await?;
1457
1458 *state.borrow_mut() = response.config_options.clone();
1459 watch_tx.borrow_mut().send(()).ok();
1460 Ok(response.config_options)
1461 })
1462 }
1463
1464 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1465 Some(self.watch_rx.clone())
1466 }
1467}
1468
1469struct ClientDelegate {
1470 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1471 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1472 cx: AsyncApp,
1473}
1474
1475#[async_trait::async_trait(?Send)]
1476impl acp::Client for ClientDelegate {
1477 async fn request_permission(
1478 &self,
1479 arguments: acp::RequestPermissionRequest,
1480 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1481 let thread;
1482 {
1483 let sessions_ref = self.sessions.borrow();
1484 let session = sessions_ref
1485 .get(&arguments.session_id)
1486 .context("Failed to get session")?;
1487 thread = session.thread.clone();
1488 }
1489
1490 let cx = &mut self.cx.clone();
1491
1492 let task = thread.update(cx, |thread, cx| {
1493 thread.request_tool_call_authorization(
1494 arguments.tool_call,
1495 acp_thread::PermissionOptions::Flat(arguments.options),
1496 cx,
1497 )
1498 })??;
1499
1500 let outcome = task.await;
1501
1502 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1503 }
1504
1505 async fn write_text_file(
1506 &self,
1507 arguments: acp::WriteTextFileRequest,
1508 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1509 let cx = &mut self.cx.clone();
1510 let task = self
1511 .session_thread(&arguments.session_id)?
1512 .update(cx, |thread, cx| {
1513 thread.write_text_file(arguments.path, arguments.content, cx)
1514 })?;
1515
1516 task.await?;
1517
1518 Ok(Default::default())
1519 }
1520
1521 async fn read_text_file(
1522 &self,
1523 arguments: acp::ReadTextFileRequest,
1524 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1525 let task = self.session_thread(&arguments.session_id)?.update(
1526 &mut self.cx.clone(),
1527 |thread, cx| {
1528 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1529 },
1530 )?;
1531
1532 let content = task.await?;
1533
1534 Ok(acp::ReadTextFileResponse::new(content))
1535 }
1536
1537 async fn session_notification(
1538 &self,
1539 notification: acp::SessionNotification,
1540 ) -> Result<(), acp::Error> {
1541 let sessions = self.sessions.borrow();
1542 let session = sessions
1543 .get(¬ification.session_id)
1544 .context("Failed to get session")?;
1545
1546 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1547 current_mode_id,
1548 ..
1549 }) = ¬ification.update
1550 {
1551 if let Some(session_modes) = &session.session_modes {
1552 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1553 }
1554 }
1555
1556 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1557 config_options,
1558 ..
1559 }) = ¬ification.update
1560 {
1561 if let Some(opts) = &session.config_options {
1562 *opts.config_options.borrow_mut() = config_options.clone();
1563 opts.tx.borrow_mut().send(()).ok();
1564 }
1565 }
1566
1567 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1568 && let Some(session_list) = self.session_list.borrow().as_ref()
1569 {
1570 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1571 }
1572
1573 // Clone so we can inspect meta both before and after handing off to the thread
1574 let update_clone = notification.update.clone();
1575
1576 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1577 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1578 if let Some(meta) = &tc.meta {
1579 if let Some(terminal_info) = meta.get("terminal_info") {
1580 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1581 {
1582 let terminal_id = acp::TerminalId::new(id_str);
1583 let cwd = terminal_info
1584 .get("cwd")
1585 .and_then(|v| v.as_str().map(PathBuf::from));
1586
1587 // Create a minimal display-only lower-level terminal and register it.
1588 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1589 let builder = TerminalBuilder::new_display_only(
1590 CursorShape::default(),
1591 AlternateScroll::On,
1592 None,
1593 0,
1594 cx.background_executor(),
1595 thread.project().read(cx).path_style(cx),
1596 )?;
1597 let lower = cx.new(|cx| builder.subscribe(cx));
1598 thread.on_terminal_provider_event(
1599 TerminalProviderEvent::Created {
1600 terminal_id,
1601 label: tc.title.clone(),
1602 cwd,
1603 output_byte_limit: None,
1604 terminal: lower,
1605 },
1606 cx,
1607 );
1608 anyhow::Ok(())
1609 });
1610 }
1611 }
1612 }
1613 }
1614
1615 // Forward the update to the acp_thread as usual.
1616 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1617 thread.handle_session_update(notification.update.clone(), cx)
1618 })??;
1619
1620 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1621 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1622 if let Some(meta) = &tcu.meta {
1623 if let Some(term_out) = meta.get("terminal_output") {
1624 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1625 let terminal_id = acp::TerminalId::new(id_str);
1626 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1627 let data = s.as_bytes().to_vec();
1628 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1629 thread.on_terminal_provider_event(
1630 TerminalProviderEvent::Output { terminal_id, data },
1631 cx,
1632 );
1633 });
1634 }
1635 }
1636 }
1637
1638 // terminal_exit
1639 if let Some(term_exit) = meta.get("terminal_exit") {
1640 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1641 let terminal_id = acp::TerminalId::new(id_str);
1642 let status = acp::TerminalExitStatus::new()
1643 .exit_code(
1644 term_exit
1645 .get("exit_code")
1646 .and_then(|v| v.as_u64())
1647 .map(|i| i as u32),
1648 )
1649 .signal(
1650 term_exit
1651 .get("signal")
1652 .and_then(|v| v.as_str().map(|s| s.to_string())),
1653 );
1654
1655 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1656 thread.on_terminal_provider_event(
1657 TerminalProviderEvent::Exit {
1658 terminal_id,
1659 status,
1660 },
1661 cx,
1662 );
1663 });
1664 }
1665 }
1666 }
1667 }
1668
1669 Ok(())
1670 }
1671
1672 async fn create_terminal(
1673 &self,
1674 args: acp::CreateTerminalRequest,
1675 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1676 let thread = self.session_thread(&args.session_id)?;
1677 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1678
1679 let terminal_entity = acp_thread::create_terminal_entity(
1680 args.command.clone(),
1681 &args.args,
1682 args.env
1683 .into_iter()
1684 .map(|env| (env.name, env.value))
1685 .collect(),
1686 args.cwd.clone(),
1687 &project,
1688 &mut self.cx.clone(),
1689 )
1690 .await?;
1691
1692 // Register with renderer
1693 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1694 thread.register_terminal_created(
1695 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1696 format!("{} {}", args.command, args.args.join(" ")),
1697 args.cwd.clone(),
1698 args.output_byte_limit,
1699 terminal_entity,
1700 cx,
1701 )
1702 })?;
1703 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1704 Ok(acp::CreateTerminalResponse::new(terminal_id))
1705 }
1706
1707 async fn kill_terminal(
1708 &self,
1709 args: acp::KillTerminalRequest,
1710 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1711 self.session_thread(&args.session_id)?
1712 .update(&mut self.cx.clone(), |thread, cx| {
1713 thread.kill_terminal(args.terminal_id, cx)
1714 })??;
1715
1716 Ok(Default::default())
1717 }
1718
1719 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1720 Err(acp::Error::method_not_found())
1721 }
1722
1723 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1724 Err(acp::Error::method_not_found())
1725 }
1726
1727 async fn release_terminal(
1728 &self,
1729 args: acp::ReleaseTerminalRequest,
1730 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1731 self.session_thread(&args.session_id)?
1732 .update(&mut self.cx.clone(), |thread, cx| {
1733 thread.release_terminal(args.terminal_id, cx)
1734 })??;
1735
1736 Ok(Default::default())
1737 }
1738
1739 async fn terminal_output(
1740 &self,
1741 args: acp::TerminalOutputRequest,
1742 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1743 self.session_thread(&args.session_id)?
1744 .read_with(&mut self.cx.clone(), |thread, cx| {
1745 let out = thread
1746 .terminal(args.terminal_id)?
1747 .read(cx)
1748 .current_output(cx);
1749
1750 Ok(out)
1751 })?
1752 }
1753
1754 async fn wait_for_terminal_exit(
1755 &self,
1756 args: acp::WaitForTerminalExitRequest,
1757 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1758 let exit_status = self
1759 .session_thread(&args.session_id)?
1760 .update(&mut self.cx.clone(), |thread, cx| {
1761 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1762 })??
1763 .await;
1764
1765 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1766 }
1767}
1768
1769impl ClientDelegate {
1770 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1771 let sessions = self.sessions.borrow();
1772 sessions
1773 .get(session_id)
1774 .context("Failed to get session")
1775 .map(|session| session.thread.clone())
1776 }
1777}