1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::agent_server_store::{AgentServerCommand, AgentServerStore};
14use project::{AgentId, Project};
15use remote::remote_client::Interactive;
16use serde::Deserialize;
17use settings::Settings as _;
18use std::path::PathBuf;
19use std::process::Stdio;
20use std::rc::Rc;
21use std::{any::Any, cell::RefCell};
22use task::{ShellBuilder, SpawnInTerminal};
23use thiserror::Error;
24use util::ResultExt as _;
25use util::path_list::PathList;
26use util::process::Child;
27
28use anyhow::{Context as _, Result};
29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
30
31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
32use terminal::TerminalBuilder;
33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
34
35use crate::GEMINI_ID;
36
37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
38
39#[derive(Debug, Error)]
40#[error("Unsupported version")]
41pub struct UnsupportedVersion;
42
43pub struct AcpConnection {
44 id: AgentId,
45 telemetry_id: SharedString,
46 connection: Rc<acp::ClientSideConnection>,
47 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
48 auth_methods: Vec<acp::AuthMethod>,
49 agent_server_store: WeakEntity<AgentServerStore>,
50 agent_capabilities: acp::AgentCapabilities,
51 default_mode: Option<acp::SessionModeId>,
52 default_model: Option<acp::ModelId>,
53 default_config_options: HashMap<String, String>,
54 child: Child,
55 session_list: Option<Rc<AcpSessionList>>,
56 _io_task: Task<Result<(), acp::Error>>,
57 _wait_task: Task<Result<()>>,
58 _stderr_task: Task<Result<()>>,
59}
60
61struct ConfigOptions {
62 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
63 tx: Rc<RefCell<watch::Sender<()>>>,
64 rx: watch::Receiver<()>,
65}
66
67impl ConfigOptions {
68 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
69 let (tx, rx) = watch::channel(());
70 Self {
71 config_options,
72 tx: Rc::new(RefCell::new(tx)),
73 rx,
74 }
75 }
76}
77
78pub struct AcpSession {
79 thread: WeakEntity<AcpThread>,
80 suppress_abort_err: bool,
81 models: Option<Rc<RefCell<acp::SessionModelState>>>,
82 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
83 config_options: Option<ConfigOptions>,
84}
85
86pub struct AcpSessionList {
87 connection: Rc<acp::ClientSideConnection>,
88 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
89 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
90}
91
92impl AcpSessionList {
93 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
94 let (tx, rx) = smol::channel::unbounded();
95 Self {
96 connection,
97 updates_tx: tx,
98 updates_rx: rx,
99 }
100 }
101
102 fn notify_update(&self) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::Refresh)
105 .log_err();
106 }
107
108 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
109 self.updates_tx
110 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
111 .log_err();
112 }
113}
114
115impl AgentSessionList for AcpSessionList {
116 fn list_sessions(
117 &self,
118 request: AgentSessionListRequest,
119 cx: &mut App,
120 ) -> Task<Result<AgentSessionListResponse>> {
121 let conn = self.connection.clone();
122 cx.foreground_executor().spawn(async move {
123 let acp_request = acp::ListSessionsRequest::new()
124 .cwd(request.cwd)
125 .cursor(request.cursor);
126 let response = conn.list_sessions(acp_request).await?;
127 Ok(AgentSessionListResponse {
128 sessions: response
129 .sessions
130 .into_iter()
131 .map(|s| AgentSessionInfo {
132 session_id: s.session_id,
133 work_dirs: Some(PathList::new(&[s.cwd])),
134 title: s.title.map(Into::into),
135 updated_at: s.updated_at.and_then(|date_str| {
136 chrono::DateTime::parse_from_rfc3339(&date_str)
137 .ok()
138 .map(|dt| dt.with_timezone(&chrono::Utc))
139 }),
140 created_at: None,
141 meta: s.meta,
142 })
143 .collect(),
144 next_cursor: response.next_cursor,
145 meta: response.meta,
146 })
147 })
148 }
149
150 fn watch(
151 &self,
152 _cx: &mut App,
153 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
154 Some(self.updates_rx.clone())
155 }
156
157 fn notify_refresh(&self) {
158 self.notify_update();
159 }
160
161 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
162 self
163 }
164}
165
166pub async fn connect(
167 agent_id: AgentId,
168 project: Entity<Project>,
169 command: AgentServerCommand,
170 agent_server_store: WeakEntity<AgentServerStore>,
171 default_mode: Option<acp::SessionModeId>,
172 default_model: Option<acp::ModelId>,
173 default_config_options: HashMap<String, String>,
174 cx: &mut AsyncApp,
175) -> Result<Rc<dyn AgentConnection>> {
176 let conn = AcpConnection::stdio(
177 agent_id,
178 project,
179 command.clone(),
180 agent_server_store,
181 default_mode,
182 default_model,
183 default_config_options,
184 cx,
185 )
186 .await?;
187 Ok(Rc::new(conn) as _)
188}
189
190const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
191
192impl AcpConnection {
193 pub async fn stdio(
194 agent_id: AgentId,
195 project: Entity<Project>,
196 command: AgentServerCommand,
197 agent_server_store: WeakEntity<AgentServerStore>,
198 default_mode: Option<acp::SessionModeId>,
199 default_model: Option<acp::ModelId>,
200 default_config_options: HashMap<String, String>,
201 cx: &mut AsyncApp,
202 ) -> Result<Self> {
203 let root_dir = project.read_with(cx, |project, cx| {
204 project
205 .default_path_list(cx)
206 .ordered_paths()
207 .next()
208 .cloned()
209 });
210 let original_command = command.clone();
211 let (path, args, env) = project
212 .read_with(cx, |project, cx| {
213 project.remote_client().and_then(|client| {
214 let template = client
215 .read(cx)
216 .build_command_with_options(
217 Some(command.path.display().to_string()),
218 &command.args,
219 &command.env.clone().into_iter().flatten().collect(),
220 root_dir.as_ref().map(|path| path.display().to_string()),
221 None,
222 Interactive::No,
223 )
224 .log_err()?;
225 Some((template.program, template.args, template.env))
226 })
227 })
228 .unwrap_or_else(|| {
229 (
230 command.path.display().to_string(),
231 command.args,
232 command.env.unwrap_or_default(),
233 )
234 });
235
236 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
237 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
238 let mut child = builder.build_std_command(Some(path.clone()), &args);
239 child.envs(env.clone());
240 if let Some(cwd) = project.read_with(cx, |project, _cx| {
241 if project.is_local() {
242 root_dir.as_ref()
243 } else {
244 None
245 }
246 }) {
247 child.current_dir(cwd);
248 }
249 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
250
251 let stdout = child.stdout.take().context("Failed to take stdout")?;
252 let stdin = child.stdin.take().context("Failed to take stdin")?;
253 let stderr = child.stderr.take().context("Failed to take stderr")?;
254 log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
255 log::trace!("Spawned (pid: {})", child.id());
256
257 let sessions = Rc::new(RefCell::new(HashMap::default()));
258
259 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
260 (
261 release_channel::ReleaseChannel::try_global(cx)
262 .map(|release_channel| release_channel.display_name()),
263 release_channel::AppVersion::global(cx).to_string(),
264 )
265 });
266
267 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
268 Rc::new(RefCell::new(None));
269
270 let client = ClientDelegate {
271 sessions: sessions.clone(),
272 session_list: client_session_list.clone(),
273 cx: cx.clone(),
274 };
275 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
276 let foreground_executor = cx.foreground_executor().clone();
277 move |fut| {
278 foreground_executor.spawn(fut).detach();
279 }
280 });
281
282 let io_task = cx.background_spawn(io_task);
283
284 let stderr_task = cx.background_spawn(async move {
285 let mut stderr = BufReader::new(stderr);
286 let mut line = String::new();
287 while let Ok(n) = stderr.read_line(&mut line).await
288 && n > 0
289 {
290 log::warn!("agent stderr: {}", line.trim());
291 line.clear();
292 }
293 Ok(())
294 });
295
296 let wait_task = cx.spawn({
297 let sessions = sessions.clone();
298 let status_fut = child.status();
299 async move |cx| {
300 let status = status_fut.await?;
301
302 for session in sessions.borrow().values() {
303 session
304 .thread
305 .update(cx, |thread, cx| {
306 thread.emit_load_error(LoadError::Exited { status }, cx)
307 })
308 .ok();
309 }
310
311 anyhow::Ok(())
312 }
313 });
314
315 let connection = Rc::new(connection);
316
317 cx.update(|cx| {
318 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
319 registry.set_active_connection(agent_id.clone(), &connection, cx)
320 });
321 });
322
323 let response = connection
324 .initialize(
325 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
326 .client_capabilities(
327 acp::ClientCapabilities::new()
328 .fs(acp::FileSystemCapabilities::new()
329 .read_text_file(true)
330 .write_text_file(true))
331 .terminal(true)
332 .auth(acp::AuthCapabilities::new().terminal(true))
333 // Experimental: Allow for rendering terminal output from the agents
334 .meta(acp::Meta::from_iter([
335 ("terminal_output".into(), true.into()),
336 ("terminal-auth".into(), true.into()),
337 ])),
338 )
339 .client_info(
340 acp::Implementation::new("zed", version)
341 .title(release_channel.map(ToOwned::to_owned)),
342 ),
343 )
344 .await?;
345
346 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
347 return Err(UnsupportedVersion.into());
348 }
349
350 let telemetry_id = response
351 .agent_info
352 // Use the one the agent provides if we have one
353 .map(|info| info.name.into())
354 // Otherwise, just use the name
355 .unwrap_or_else(|| agent_id.0.clone());
356
357 let session_list = if response
358 .agent_capabilities
359 .session_capabilities
360 .list
361 .is_some()
362 {
363 let list = Rc::new(AcpSessionList::new(connection.clone()));
364 *client_session_list.borrow_mut() = Some(list.clone());
365 Some(list)
366 } else {
367 None
368 };
369
370 // TODO: Remove this override once Google team releases their official auth methods
371 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
372 let mut gemini_args = original_command.args.clone();
373 gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
374 let value = serde_json::json!({
375 "label": "gemini /auth",
376 "command": original_command.path.to_string_lossy(),
377 "args": gemini_args,
378 "env": original_command.env.unwrap_or_default(),
379 });
380 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
381 vec![acp::AuthMethod::Agent(
382 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
383 .description("Login with your Google or Vertex AI account")
384 .meta(meta),
385 )]
386 } else {
387 response.auth_methods
388 };
389 Ok(Self {
390 id: agent_id,
391 auth_methods,
392 agent_server_store,
393 connection,
394 telemetry_id,
395 sessions,
396 agent_capabilities: response.agent_capabilities,
397 default_mode,
398 default_model,
399 default_config_options,
400 session_list,
401 _io_task: io_task,
402 _wait_task: wait_task,
403 _stderr_task: stderr_task,
404 child,
405 })
406 }
407
408 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
409 &self.agent_capabilities.prompt_capabilities
410 }
411
412 fn apply_default_config_options(
413 &self,
414 session_id: &acp::SessionId,
415 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
416 cx: &mut AsyncApp,
417 ) {
418 let id = self.id.clone();
419 let defaults_to_apply: Vec<_> = {
420 let config_opts_ref = config_options.borrow();
421 config_opts_ref
422 .iter()
423 .filter_map(|config_option| {
424 let default_value = self.default_config_options.get(&*config_option.id.0)?;
425
426 let is_valid = match &config_option.kind {
427 acp::SessionConfigKind::Select(select) => match &select.options {
428 acp::SessionConfigSelectOptions::Ungrouped(options) => options
429 .iter()
430 .any(|opt| &*opt.value.0 == default_value.as_str()),
431 acp::SessionConfigSelectOptions::Grouped(groups) => {
432 groups.iter().any(|g| {
433 g.options
434 .iter()
435 .any(|opt| &*opt.value.0 == default_value.as_str())
436 })
437 }
438 _ => false,
439 },
440 _ => false,
441 };
442
443 if is_valid {
444 let initial_value = match &config_option.kind {
445 acp::SessionConfigKind::Select(select) => {
446 Some(select.current_value.clone())
447 }
448 _ => None,
449 };
450 Some((
451 config_option.id.clone(),
452 default_value.clone(),
453 initial_value,
454 ))
455 } else {
456 log::warn!(
457 "`{}` is not a valid value for config option `{}` in {}",
458 default_value,
459 config_option.id.0,
460 id
461 );
462 None
463 }
464 })
465 .collect()
466 };
467
468 for (config_id, default_value, initial_value) in defaults_to_apply {
469 cx.spawn({
470 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
471 let session_id = session_id.clone();
472 let config_id_clone = config_id.clone();
473 let config_opts = config_options.clone();
474 let conn = self.connection.clone();
475 async move |_| {
476 let result = conn
477 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
478 session_id,
479 config_id_clone.clone(),
480 default_value_id,
481 ))
482 .await
483 .log_err();
484
485 if result.is_none() {
486 if let Some(initial) = initial_value {
487 let mut opts = config_opts.borrow_mut();
488 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
489 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
490 select.current_value = initial;
491 }
492 }
493 }
494 }
495 }
496 })
497 .detach();
498
499 let mut opts = config_options.borrow_mut();
500 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
501 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
502 select.current_value = acp::SessionConfigValueId::new(default_value);
503 }
504 }
505 }
506 }
507}
508
509impl Drop for AcpConnection {
510 fn drop(&mut self) {
511 self.child.kill().log_err();
512 }
513}
514
515fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
516 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
517}
518
519fn terminal_auth_task(
520 command: &AgentServerCommand,
521 agent_id: &AgentId,
522 method: &acp::AuthMethodTerminal,
523) -> SpawnInTerminal {
524 acp_thread::build_terminal_auth_task(
525 terminal_auth_task_id(agent_id, &method.id),
526 method.name.clone(),
527 command.path.to_string_lossy().into_owned(),
528 command.args.clone(),
529 command.env.clone().unwrap_or_default(),
530 )
531}
532
533/// Used to support the _meta method prior to stabilization
534fn meta_terminal_auth_task(
535 agent_id: &AgentId,
536 method_id: &acp::AuthMethodId,
537 method: &acp::AuthMethod,
538) -> Option<SpawnInTerminal> {
539 #[derive(Deserialize)]
540 struct MetaTerminalAuth {
541 label: String,
542 command: String,
543 #[serde(default)]
544 args: Vec<String>,
545 #[serde(default)]
546 env: HashMap<String, String>,
547 }
548
549 let meta = match method {
550 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
551 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
552 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
553 _ => None,
554 }?;
555 let terminal_auth =
556 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
557
558 Some(acp_thread::build_terminal_auth_task(
559 terminal_auth_task_id(agent_id, method_id),
560 terminal_auth.label.clone(),
561 terminal_auth.command,
562 terminal_auth.args,
563 terminal_auth.env,
564 ))
565}
566
567impl AgentConnection for AcpConnection {
568 fn agent_id(&self) -> AgentId {
569 self.id.clone()
570 }
571
572 fn telemetry_id(&self) -> SharedString {
573 self.telemetry_id.clone()
574 }
575
576 fn new_session(
577 self: Rc<Self>,
578 project: Entity<Project>,
579 work_dirs: PathList,
580 cx: &mut App,
581 ) -> Task<Result<Entity<AcpThread>>> {
582 // TODO: remove this once ACP supports multiple working directories
583 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
584 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
585 };
586 let name = self.id.0.clone();
587 let mcp_servers = mcp_servers_for_project(&project, cx);
588
589 cx.spawn(async move |cx| {
590 let response = self.connection
591 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
592 .await
593 .map_err(map_acp_error)?;
594
595 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
596
597 if let Some(default_mode) = self.default_mode.clone() {
598 if let Some(modes) = modes.as_ref() {
599 let mut modes_ref = modes.borrow_mut();
600 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
601
602 if has_mode {
603 let initial_mode_id = modes_ref.current_mode_id.clone();
604
605 cx.spawn({
606 let default_mode = default_mode.clone();
607 let session_id = response.session_id.clone();
608 let modes = modes.clone();
609 let conn = self.connection.clone();
610 async move |_| {
611 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
612 .await.log_err();
613
614 if result.is_none() {
615 modes.borrow_mut().current_mode_id = initial_mode_id;
616 }
617 }
618 }).detach();
619
620 modes_ref.current_mode_id = default_mode;
621 } else {
622 let available_modes = modes_ref
623 .available_modes
624 .iter()
625 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
626 .collect::<Vec<_>>()
627 .join("\n");
628
629 log::warn!(
630 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
631 );
632 }
633 }
634 }
635
636 if let Some(default_model) = self.default_model.clone() {
637 if let Some(models) = models.as_ref() {
638 let mut models_ref = models.borrow_mut();
639 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
640
641 if has_model {
642 let initial_model_id = models_ref.current_model_id.clone();
643
644 cx.spawn({
645 let default_model = default_model.clone();
646 let session_id = response.session_id.clone();
647 let models = models.clone();
648 let conn = self.connection.clone();
649 async move |_| {
650 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
651 .await.log_err();
652
653 if result.is_none() {
654 models.borrow_mut().current_model_id = initial_model_id;
655 }
656 }
657 }).detach();
658
659 models_ref.current_model_id = default_model;
660 } else {
661 let available_models = models_ref
662 .available_models
663 .iter()
664 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
665 .collect::<Vec<_>>()
666 .join("\n");
667
668 log::warn!(
669 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
670 );
671 }
672 }
673 }
674
675 if let Some(config_opts) = config_options.as_ref() {
676 self.apply_default_config_options(&response.session_id, config_opts, cx);
677 }
678
679 let action_log = cx.new(|_| ActionLog::new(project.clone()));
680 let thread: Entity<AcpThread> = cx.new(|cx| {
681 AcpThread::new(
682 None,
683 None,
684 Some(work_dirs),
685 self.clone(),
686 project,
687 action_log,
688 response.session_id.clone(),
689 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
690 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
691 cx,
692 )
693 });
694
695 self.sessions.borrow_mut().insert(
696 response.session_id,
697 AcpSession {
698 thread: thread.downgrade(),
699 suppress_abort_err: false,
700 session_modes: modes,
701 models,
702 config_options: config_options.map(ConfigOptions::new),
703 },
704 );
705
706 Ok(thread)
707 })
708 }
709
710 fn supports_load_session(&self) -> bool {
711 self.agent_capabilities.load_session
712 }
713
714 fn supports_resume_session(&self) -> bool {
715 self.agent_capabilities
716 .session_capabilities
717 .resume
718 .is_some()
719 }
720
721 fn load_session(
722 self: Rc<Self>,
723 session_id: acp::SessionId,
724 project: Entity<Project>,
725 work_dirs: PathList,
726 title: Option<SharedString>,
727 cx: &mut App,
728 ) -> Task<Result<Entity<AcpThread>>> {
729 if !self.agent_capabilities.load_session {
730 return Task::ready(Err(anyhow!(LoadError::Other(
731 "Loading sessions is not supported by this agent.".into()
732 ))));
733 }
734 // TODO: remove this once ACP supports multiple working directories
735 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
736 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
737 };
738
739 let mcp_servers = mcp_servers_for_project(&project, cx);
740 let action_log = cx.new(|_| ActionLog::new(project.clone()));
741 let thread: Entity<AcpThread> = cx.new(|cx| {
742 AcpThread::new(
743 None,
744 title,
745 Some(work_dirs.clone()),
746 self.clone(),
747 project,
748 action_log,
749 session_id.clone(),
750 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
751 cx,
752 )
753 });
754
755 self.sessions.borrow_mut().insert(
756 session_id.clone(),
757 AcpSession {
758 thread: thread.downgrade(),
759 suppress_abort_err: false,
760 session_modes: None,
761 models: None,
762 config_options: None,
763 },
764 );
765
766 cx.spawn(async move |cx| {
767 let response = match self
768 .connection
769 .load_session(
770 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
771 )
772 .await
773 {
774 Ok(response) => response,
775 Err(err) => {
776 self.sessions.borrow_mut().remove(&session_id);
777 return Err(map_acp_error(err));
778 }
779 };
780
781 let (modes, models, config_options) =
782 config_state(response.modes, response.models, response.config_options);
783
784 if let Some(config_opts) = config_options.as_ref() {
785 self.apply_default_config_options(&session_id, config_opts, cx);
786 }
787
788 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
789 session.session_modes = modes;
790 session.models = models;
791 session.config_options = config_options.map(ConfigOptions::new);
792 }
793
794 Ok(thread)
795 })
796 }
797
798 fn resume_session(
799 self: Rc<Self>,
800 session_id: acp::SessionId,
801 project: Entity<Project>,
802 work_dirs: PathList,
803 title: Option<SharedString>,
804 cx: &mut App,
805 ) -> Task<Result<Entity<AcpThread>>> {
806 if self
807 .agent_capabilities
808 .session_capabilities
809 .resume
810 .is_none()
811 {
812 return Task::ready(Err(anyhow!(LoadError::Other(
813 "Resuming sessions is not supported by this agent.".into()
814 ))));
815 }
816 // TODO: remove this once ACP supports multiple working directories
817 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
818 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
819 };
820
821 let mcp_servers = mcp_servers_for_project(&project, cx);
822 let action_log = cx.new(|_| ActionLog::new(project.clone()));
823 let thread: Entity<AcpThread> = cx.new(|cx| {
824 AcpThread::new(
825 None,
826 title,
827 Some(work_dirs),
828 self.clone(),
829 project,
830 action_log,
831 session_id.clone(),
832 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
833 cx,
834 )
835 });
836
837 self.sessions.borrow_mut().insert(
838 session_id.clone(),
839 AcpSession {
840 thread: thread.downgrade(),
841 suppress_abort_err: false,
842 session_modes: None,
843 models: None,
844 config_options: None,
845 },
846 );
847
848 cx.spawn(async move |cx| {
849 let response = match self
850 .connection
851 .resume_session(
852 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
853 .mcp_servers(mcp_servers),
854 )
855 .await
856 {
857 Ok(response) => response,
858 Err(err) => {
859 self.sessions.borrow_mut().remove(&session_id);
860 return Err(map_acp_error(err));
861 }
862 };
863
864 let (modes, models, config_options) =
865 config_state(response.modes, response.models, response.config_options);
866
867 if let Some(config_opts) = config_options.as_ref() {
868 self.apply_default_config_options(&session_id, config_opts, cx);
869 }
870
871 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
872 session.session_modes = modes;
873 session.models = models;
874 session.config_options = config_options.map(ConfigOptions::new);
875 }
876
877 Ok(thread)
878 })
879 }
880
881 fn supports_close_session(&self) -> bool {
882 self.agent_capabilities.session_capabilities.close.is_some()
883 }
884
885 fn close_session(
886 self: Rc<Self>,
887 session_id: &acp::SessionId,
888 cx: &mut App,
889 ) -> Task<Result<()>> {
890 if !self.supports_close_session() {
891 return Task::ready(Err(anyhow!(LoadError::Other(
892 "Closing sessions is not supported by this agent.".into()
893 ))));
894 }
895
896 let conn = self.connection.clone();
897 let session_id = session_id.clone();
898 cx.foreground_executor().spawn(async move {
899 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
900 .await?;
901 self.sessions.borrow_mut().remove(&session_id);
902 Ok(())
903 })
904 }
905
906 fn auth_methods(&self) -> &[acp::AuthMethod] {
907 &self.auth_methods
908 }
909
910 fn terminal_auth_task(
911 &self,
912 method_id: &acp::AuthMethodId,
913 cx: &App,
914 ) -> Option<Task<Result<SpawnInTerminal>>> {
915 let method = self
916 .auth_methods
917 .iter()
918 .find(|method| method.id() == method_id)?;
919
920 match method {
921 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
922 let agent_id = self.id.clone();
923 let terminal = terminal.clone();
924 let store = self.agent_server_store.clone();
925 Some(cx.spawn(async move |cx| {
926 let command = store
927 .update(cx, |store, cx| {
928 let agent = store
929 .get_external_agent(&agent_id)
930 .context("Agent server not found")?;
931 anyhow::Ok(agent.get_command(
932 terminal.args.clone(),
933 HashMap::from_iter(terminal.env.clone()),
934 &mut cx.to_async(),
935 ))
936 })?
937 .context("Failed to get agent command")?
938 .await?;
939 Ok(terminal_auth_task(&command, &agent_id, &terminal))
940 }))
941 }
942 _ => meta_terminal_auth_task(&self.id, method_id, method)
943 .map(|task| Task::ready(Ok(task))),
944 }
945 }
946
947 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
948 let conn = self.connection.clone();
949 cx.foreground_executor().spawn(async move {
950 conn.authenticate(acp::AuthenticateRequest::new(method_id))
951 .await?;
952 Ok(())
953 })
954 }
955
956 fn prompt(
957 &self,
958 _id: Option<acp_thread::UserMessageId>,
959 params: acp::PromptRequest,
960 cx: &mut App,
961 ) -> Task<Result<acp::PromptResponse>> {
962 let conn = self.connection.clone();
963 let sessions = self.sessions.clone();
964 let session_id = params.session_id.clone();
965 cx.foreground_executor().spawn(async move {
966 let result = conn.prompt(params).await;
967
968 let mut suppress_abort_err = false;
969
970 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
971 suppress_abort_err = session.suppress_abort_err;
972 session.suppress_abort_err = false;
973 }
974
975 match result {
976 Ok(response) => Ok(response),
977 Err(err) => {
978 if err.code == acp::ErrorCode::AuthRequired {
979 return Err(anyhow!(acp::Error::auth_required()));
980 }
981
982 if err.code != ErrorCode::InternalError {
983 anyhow::bail!(err)
984 }
985
986 let Some(data) = &err.data else {
987 anyhow::bail!(err)
988 };
989
990 // Temporary workaround until the following PR is generally available:
991 // https://github.com/google-gemini/gemini-cli/pull/6656
992
993 #[derive(Deserialize)]
994 #[serde(deny_unknown_fields)]
995 struct ErrorDetails {
996 details: Box<str>,
997 }
998
999 match serde_json::from_value(data.clone()) {
1000 Ok(ErrorDetails { details }) => {
1001 if suppress_abort_err
1002 && (details.contains("This operation was aborted")
1003 || details.contains("The user aborted a request"))
1004 {
1005 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1006 } else {
1007 Err(anyhow!(details))
1008 }
1009 }
1010 Err(_) => Err(anyhow!(err)),
1011 }
1012 }
1013 }
1014 })
1015 }
1016
1017 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1018 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1019 session.suppress_abort_err = true;
1020 }
1021 let conn = self.connection.clone();
1022 let params = acp::CancelNotification::new(session_id.clone());
1023 cx.foreground_executor()
1024 .spawn(async move { conn.cancel(params).await })
1025 .detach();
1026 }
1027
1028 fn session_modes(
1029 &self,
1030 session_id: &acp::SessionId,
1031 _cx: &App,
1032 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1033 let sessions = self.sessions.clone();
1034 let sessions_ref = sessions.borrow();
1035 let Some(session) = sessions_ref.get(session_id) else {
1036 return None;
1037 };
1038
1039 if let Some(modes) = session.session_modes.as_ref() {
1040 Some(Rc::new(AcpSessionModes {
1041 connection: self.connection.clone(),
1042 session_id: session_id.clone(),
1043 state: modes.clone(),
1044 }) as _)
1045 } else {
1046 None
1047 }
1048 }
1049
1050 fn model_selector(
1051 &self,
1052 session_id: &acp::SessionId,
1053 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1054 let sessions = self.sessions.clone();
1055 let sessions_ref = sessions.borrow();
1056 let Some(session) = sessions_ref.get(session_id) else {
1057 return None;
1058 };
1059
1060 if let Some(models) = session.models.as_ref() {
1061 Some(Rc::new(AcpModelSelector::new(
1062 session_id.clone(),
1063 self.connection.clone(),
1064 models.clone(),
1065 )) as _)
1066 } else {
1067 None
1068 }
1069 }
1070
1071 fn session_config_options(
1072 &self,
1073 session_id: &acp::SessionId,
1074 _cx: &App,
1075 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1076 let sessions = self.sessions.borrow();
1077 let session = sessions.get(session_id)?;
1078
1079 let config_opts = session.config_options.as_ref()?;
1080
1081 Some(Rc::new(AcpSessionConfigOptions {
1082 session_id: session_id.clone(),
1083 connection: self.connection.clone(),
1084 state: config_opts.config_options.clone(),
1085 watch_tx: config_opts.tx.clone(),
1086 watch_rx: config_opts.rx.clone(),
1087 }) as _)
1088 }
1089
1090 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1091 self.session_list.clone().map(|s| s as _)
1092 }
1093
1094 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1095 self
1096 }
1097}
1098
1099fn map_acp_error(err: acp::Error) -> anyhow::Error {
1100 if err.code == acp::ErrorCode::AuthRequired {
1101 let mut error = AuthRequired::new();
1102
1103 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1104 error = error.with_description(err.message);
1105 }
1106
1107 anyhow!(error)
1108 } else {
1109 anyhow!(err)
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use super::*;
1116
1117 #[test]
1118 fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1119 let command = AgentServerCommand {
1120 path: "/path/to/agent".into(),
1121 args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1122 env: Some(HashMap::from_iter([
1123 ("BASE".into(), "1".into()),
1124 ("SHARED".into(), "override".into()),
1125 ("EXTRA".into(), "2".into()),
1126 ])),
1127 };
1128 let method = acp::AuthMethodTerminal::new("login", "Login");
1129
1130 let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1131
1132 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1133 assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1134 assert_eq!(
1135 task.env,
1136 HashMap::from_iter([
1137 ("BASE".into(), "1".into()),
1138 ("SHARED".into(), "override".into()),
1139 ("EXTRA".into(), "2".into()),
1140 ])
1141 );
1142 assert_eq!(task.label, "Login");
1143 assert_eq!(task.command_label, "Login");
1144 }
1145
1146 #[test]
1147 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1148 let method_id = acp::AuthMethodId::new("legacy-login");
1149 let method = acp::AuthMethod::Agent(
1150 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1151 "terminal-auth".to_string(),
1152 serde_json::json!({
1153 "label": "legacy /auth",
1154 "command": "legacy-agent",
1155 "args": ["auth", "--interactive"],
1156 "env": {
1157 "AUTH_MODE": "interactive",
1158 },
1159 }),
1160 )])),
1161 );
1162
1163 let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1164 .expect("expected legacy terminal auth task");
1165
1166 assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1167 assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1168 assert_eq!(task.args, vec!["auth", "--interactive"]);
1169 assert_eq!(
1170 task.env,
1171 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1172 );
1173 assert_eq!(task.label, "legacy /auth");
1174 }
1175
1176 #[test]
1177 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1178 let method_id = acp::AuthMethodId::new("legacy-login");
1179 let method = acp::AuthMethod::Agent(
1180 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1181 "terminal-auth".to_string(),
1182 serde_json::json!({
1183 "label": "legacy /auth",
1184 }),
1185 )])),
1186 );
1187
1188 assert!(
1189 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1190 );
1191 }
1192
1193 #[test]
1194 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1195 let method_id = acp::AuthMethodId::new("login");
1196 let method = acp::AuthMethod::Terminal(
1197 acp::AuthMethodTerminal::new(method_id, "Login")
1198 .args(vec!["/auth".into()])
1199 .env(std::collections::HashMap::from_iter([(
1200 "AUTH_MODE".into(),
1201 "first-class".into(),
1202 )]))
1203 .meta(acp::Meta::from_iter([(
1204 "terminal-auth".to_string(),
1205 serde_json::json!({
1206 "label": "legacy /auth",
1207 "command": "legacy-agent",
1208 "args": ["legacy-auth"],
1209 "env": {
1210 "AUTH_MODE": "legacy",
1211 },
1212 }),
1213 )])),
1214 );
1215
1216 let command = AgentServerCommand {
1217 path: "/path/to/agent".into(),
1218 args: vec!["--acp".into(), "/auth".into()],
1219 env: Some(HashMap::from_iter([
1220 ("BASE".into(), "1".into()),
1221 ("AUTH_MODE".into(), "first-class".into()),
1222 ])),
1223 };
1224
1225 let task = match &method {
1226 acp::AuthMethod::Terminal(terminal) => {
1227 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1228 }
1229 _ => unreachable!(),
1230 };
1231
1232 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1233 assert_eq!(task.args, vec!["--acp", "/auth"]);
1234 assert_eq!(
1235 task.env,
1236 HashMap::from_iter([
1237 ("BASE".into(), "1".into()),
1238 ("AUTH_MODE".into(), "first-class".into()),
1239 ])
1240 );
1241 assert_eq!(task.label, "Login");
1242 }
1243}
1244
1245fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1246 let context_server_store = project.read(cx).context_server_store().read(cx);
1247 let is_local = project.read(cx).is_local();
1248 context_server_store
1249 .configured_server_ids()
1250 .iter()
1251 .filter_map(|id| {
1252 let configuration = context_server_store.configuration_for_server(id)?;
1253 match &*configuration {
1254 project::context_server_store::ContextServerConfiguration::Custom {
1255 command,
1256 remote,
1257 ..
1258 }
1259 | project::context_server_store::ContextServerConfiguration::Extension {
1260 command,
1261 remote,
1262 ..
1263 } if is_local || *remote => Some(acp::McpServer::Stdio(
1264 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1265 .args(command.args.clone())
1266 .env(if let Some(env) = command.env.as_ref() {
1267 env.iter()
1268 .map(|(name, value)| acp::EnvVariable::new(name, value))
1269 .collect()
1270 } else {
1271 vec![]
1272 }),
1273 )),
1274 project::context_server_store::ContextServerConfiguration::Http {
1275 url,
1276 headers,
1277 timeout: _,
1278 } => Some(acp::McpServer::Http(
1279 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1280 headers
1281 .iter()
1282 .map(|(name, value)| acp::HttpHeader::new(name, value))
1283 .collect(),
1284 ),
1285 )),
1286 _ => None,
1287 }
1288 })
1289 .collect()
1290}
1291
1292fn config_state(
1293 modes: Option<acp::SessionModeState>,
1294 models: Option<acp::SessionModelState>,
1295 config_options: Option<Vec<acp::SessionConfigOption>>,
1296) -> (
1297 Option<Rc<RefCell<acp::SessionModeState>>>,
1298 Option<Rc<RefCell<acp::SessionModelState>>>,
1299 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1300) {
1301 if let Some(opts) = config_options {
1302 return (None, None, Some(Rc::new(RefCell::new(opts))));
1303 }
1304
1305 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1306 let models = models.map(|models| Rc::new(RefCell::new(models)));
1307 (modes, models, None)
1308}
1309
1310struct AcpSessionModes {
1311 session_id: acp::SessionId,
1312 connection: Rc<acp::ClientSideConnection>,
1313 state: Rc<RefCell<acp::SessionModeState>>,
1314}
1315
1316impl acp_thread::AgentSessionModes for AcpSessionModes {
1317 fn current_mode(&self) -> acp::SessionModeId {
1318 self.state.borrow().current_mode_id.clone()
1319 }
1320
1321 fn all_modes(&self) -> Vec<acp::SessionMode> {
1322 self.state.borrow().available_modes.clone()
1323 }
1324
1325 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1326 let connection = self.connection.clone();
1327 let session_id = self.session_id.clone();
1328 let old_mode_id;
1329 {
1330 let mut state = self.state.borrow_mut();
1331 old_mode_id = state.current_mode_id.clone();
1332 state.current_mode_id = mode_id.clone();
1333 };
1334 let state = self.state.clone();
1335 cx.foreground_executor().spawn(async move {
1336 let result = connection
1337 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1338 .await;
1339
1340 if result.is_err() {
1341 state.borrow_mut().current_mode_id = old_mode_id;
1342 }
1343
1344 result?;
1345
1346 Ok(())
1347 })
1348 }
1349}
1350
1351struct AcpModelSelector {
1352 session_id: acp::SessionId,
1353 connection: Rc<acp::ClientSideConnection>,
1354 state: Rc<RefCell<acp::SessionModelState>>,
1355}
1356
1357impl AcpModelSelector {
1358 fn new(
1359 session_id: acp::SessionId,
1360 connection: Rc<acp::ClientSideConnection>,
1361 state: Rc<RefCell<acp::SessionModelState>>,
1362 ) -> Self {
1363 Self {
1364 session_id,
1365 connection,
1366 state,
1367 }
1368 }
1369}
1370
1371impl acp_thread::AgentModelSelector for AcpModelSelector {
1372 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1373 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1374 self.state
1375 .borrow()
1376 .available_models
1377 .clone()
1378 .into_iter()
1379 .map(acp_thread::AgentModelInfo::from)
1380 .collect(),
1381 )))
1382 }
1383
1384 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1385 let connection = self.connection.clone();
1386 let session_id = self.session_id.clone();
1387 let old_model_id;
1388 {
1389 let mut state = self.state.borrow_mut();
1390 old_model_id = state.current_model_id.clone();
1391 state.current_model_id = model_id.clone();
1392 };
1393 let state = self.state.clone();
1394 cx.foreground_executor().spawn(async move {
1395 let result = connection
1396 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1397 .await;
1398
1399 if result.is_err() {
1400 state.borrow_mut().current_model_id = old_model_id;
1401 }
1402
1403 result?;
1404
1405 Ok(())
1406 })
1407 }
1408
1409 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1410 let state = self.state.borrow();
1411 Task::ready(
1412 state
1413 .available_models
1414 .iter()
1415 .find(|m| m.model_id == state.current_model_id)
1416 .cloned()
1417 .map(acp_thread::AgentModelInfo::from)
1418 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1419 )
1420 }
1421}
1422
1423struct AcpSessionConfigOptions {
1424 session_id: acp::SessionId,
1425 connection: Rc<acp::ClientSideConnection>,
1426 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1427 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1428 watch_rx: watch::Receiver<()>,
1429}
1430
1431impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1432 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1433 self.state.borrow().clone()
1434 }
1435
1436 fn set_config_option(
1437 &self,
1438 config_id: acp::SessionConfigId,
1439 value: acp::SessionConfigValueId,
1440 cx: &mut App,
1441 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1442 let connection = self.connection.clone();
1443 let session_id = self.session_id.clone();
1444 let state = self.state.clone();
1445
1446 let watch_tx = self.watch_tx.clone();
1447
1448 cx.foreground_executor().spawn(async move {
1449 let response = connection
1450 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1451 session_id, config_id, value,
1452 ))
1453 .await?;
1454
1455 *state.borrow_mut() = response.config_options.clone();
1456 watch_tx.borrow_mut().send(()).ok();
1457 Ok(response.config_options)
1458 })
1459 }
1460
1461 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1462 Some(self.watch_rx.clone())
1463 }
1464}
1465
1466struct ClientDelegate {
1467 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1468 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1469 cx: AsyncApp,
1470}
1471
1472#[async_trait::async_trait(?Send)]
1473impl acp::Client for ClientDelegate {
1474 async fn request_permission(
1475 &self,
1476 arguments: acp::RequestPermissionRequest,
1477 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1478 let thread;
1479 {
1480 let sessions_ref = self.sessions.borrow();
1481 let session = sessions_ref
1482 .get(&arguments.session_id)
1483 .context("Failed to get session")?;
1484 thread = session.thread.clone();
1485 }
1486
1487 let cx = &mut self.cx.clone();
1488
1489 let task = thread.update(cx, |thread, cx| {
1490 thread.request_tool_call_authorization(
1491 arguments.tool_call,
1492 acp_thread::PermissionOptions::Flat(arguments.options),
1493 cx,
1494 )
1495 })??;
1496
1497 let outcome = task.await;
1498
1499 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1500 }
1501
1502 async fn write_text_file(
1503 &self,
1504 arguments: acp::WriteTextFileRequest,
1505 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1506 let cx = &mut self.cx.clone();
1507 let task = self
1508 .session_thread(&arguments.session_id)?
1509 .update(cx, |thread, cx| {
1510 thread.write_text_file(arguments.path, arguments.content, cx)
1511 })?;
1512
1513 task.await?;
1514
1515 Ok(Default::default())
1516 }
1517
1518 async fn read_text_file(
1519 &self,
1520 arguments: acp::ReadTextFileRequest,
1521 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1522 let task = self.session_thread(&arguments.session_id)?.update(
1523 &mut self.cx.clone(),
1524 |thread, cx| {
1525 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1526 },
1527 )?;
1528
1529 let content = task.await?;
1530
1531 Ok(acp::ReadTextFileResponse::new(content))
1532 }
1533
1534 async fn session_notification(
1535 &self,
1536 notification: acp::SessionNotification,
1537 ) -> Result<(), acp::Error> {
1538 let sessions = self.sessions.borrow();
1539 let session = sessions
1540 .get(¬ification.session_id)
1541 .context("Failed to get session")?;
1542
1543 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1544 current_mode_id,
1545 ..
1546 }) = ¬ification.update
1547 {
1548 if let Some(session_modes) = &session.session_modes {
1549 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1550 }
1551 }
1552
1553 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1554 config_options,
1555 ..
1556 }) = ¬ification.update
1557 {
1558 if let Some(opts) = &session.config_options {
1559 *opts.config_options.borrow_mut() = config_options.clone();
1560 opts.tx.borrow_mut().send(()).ok();
1561 }
1562 }
1563
1564 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1565 && let Some(session_list) = self.session_list.borrow().as_ref()
1566 {
1567 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1568 }
1569
1570 // Clone so we can inspect meta both before and after handing off to the thread
1571 let update_clone = notification.update.clone();
1572
1573 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1574 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1575 if let Some(meta) = &tc.meta {
1576 if let Some(terminal_info) = meta.get("terminal_info") {
1577 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1578 {
1579 let terminal_id = acp::TerminalId::new(id_str);
1580 let cwd = terminal_info
1581 .get("cwd")
1582 .and_then(|v| v.as_str().map(PathBuf::from));
1583
1584 // Create a minimal display-only lower-level terminal and register it.
1585 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1586 let builder = TerminalBuilder::new_display_only(
1587 CursorShape::default(),
1588 AlternateScroll::On,
1589 None,
1590 0,
1591 cx.background_executor(),
1592 thread.project().read(cx).path_style(cx),
1593 )?;
1594 let lower = cx.new(|cx| builder.subscribe(cx));
1595 thread.on_terminal_provider_event(
1596 TerminalProviderEvent::Created {
1597 terminal_id,
1598 label: tc.title.clone(),
1599 cwd,
1600 output_byte_limit: None,
1601 terminal: lower,
1602 },
1603 cx,
1604 );
1605 anyhow::Ok(())
1606 });
1607 }
1608 }
1609 }
1610 }
1611
1612 // Forward the update to the acp_thread as usual.
1613 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1614 thread.handle_session_update(notification.update.clone(), cx)
1615 })??;
1616
1617 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1618 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1619 if let Some(meta) = &tcu.meta {
1620 if let Some(term_out) = meta.get("terminal_output") {
1621 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1622 let terminal_id = acp::TerminalId::new(id_str);
1623 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1624 let data = s.as_bytes().to_vec();
1625 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1626 thread.on_terminal_provider_event(
1627 TerminalProviderEvent::Output { terminal_id, data },
1628 cx,
1629 );
1630 });
1631 }
1632 }
1633 }
1634
1635 // terminal_exit
1636 if let Some(term_exit) = meta.get("terminal_exit") {
1637 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1638 let terminal_id = acp::TerminalId::new(id_str);
1639 let status = acp::TerminalExitStatus::new()
1640 .exit_code(
1641 term_exit
1642 .get("exit_code")
1643 .and_then(|v| v.as_u64())
1644 .map(|i| i as u32),
1645 )
1646 .signal(
1647 term_exit
1648 .get("signal")
1649 .and_then(|v| v.as_str().map(|s| s.to_string())),
1650 );
1651
1652 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1653 thread.on_terminal_provider_event(
1654 TerminalProviderEvent::Exit {
1655 terminal_id,
1656 status,
1657 },
1658 cx,
1659 );
1660 });
1661 }
1662 }
1663 }
1664 }
1665
1666 Ok(())
1667 }
1668
1669 async fn create_terminal(
1670 &self,
1671 args: acp::CreateTerminalRequest,
1672 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1673 let thread = self.session_thread(&args.session_id)?;
1674 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1675
1676 let terminal_entity = acp_thread::create_terminal_entity(
1677 args.command.clone(),
1678 &args.args,
1679 args.env
1680 .into_iter()
1681 .map(|env| (env.name, env.value))
1682 .collect(),
1683 args.cwd.clone(),
1684 &project,
1685 &mut self.cx.clone(),
1686 )
1687 .await?;
1688
1689 // Register with renderer
1690 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1691 thread.register_terminal_created(
1692 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1693 format!("{} {}", args.command, args.args.join(" ")),
1694 args.cwd.clone(),
1695 args.output_byte_limit,
1696 terminal_entity,
1697 cx,
1698 )
1699 })?;
1700 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1701 Ok(acp::CreateTerminalResponse::new(terminal_id))
1702 }
1703
1704 async fn kill_terminal(
1705 &self,
1706 args: acp::KillTerminalRequest,
1707 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1708 self.session_thread(&args.session_id)?
1709 .update(&mut self.cx.clone(), |thread, cx| {
1710 thread.kill_terminal(args.terminal_id, cx)
1711 })??;
1712
1713 Ok(Default::default())
1714 }
1715
1716 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1717 Err(acp::Error::method_not_found())
1718 }
1719
1720 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1721 Err(acp::Error::method_not_found())
1722 }
1723
1724 async fn release_terminal(
1725 &self,
1726 args: acp::ReleaseTerminalRequest,
1727 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1728 self.session_thread(&args.session_id)?
1729 .update(&mut self.cx.clone(), |thread, cx| {
1730 thread.release_terminal(args.terminal_id, cx)
1731 })??;
1732
1733 Ok(Default::default())
1734 }
1735
1736 async fn terminal_output(
1737 &self,
1738 args: acp::TerminalOutputRequest,
1739 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1740 self.session_thread(&args.session_id)?
1741 .read_with(&mut self.cx.clone(), |thread, cx| {
1742 let out = thread
1743 .terminal(args.terminal_id)?
1744 .read(cx)
1745 .current_output(cx);
1746
1747 Ok(out)
1748 })?
1749 }
1750
1751 async fn wait_for_terminal_exit(
1752 &self,
1753 args: acp::WaitForTerminalExitRequest,
1754 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1755 let exit_status = self
1756 .session_thread(&args.session_id)?
1757 .update(&mut self.cx.clone(), |thread, cx| {
1758 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1759 })??
1760 .await;
1761
1762 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1763 }
1764}
1765
1766impl ClientDelegate {
1767 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1768 let sessions = self.sessions.borrow();
1769 sessions
1770 .get(session_id)
1771 .context("Failed to get session")
1772 .map(|session| session.thread.clone())
1773 }
1774}