1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::agent_server_store::AgentServerCommand;
14use project::{AgentId, Project};
15use serde::Deserialize;
16use settings::Settings as _;
17use task::{ShellBuilder, SpawnInTerminal};
18use util::ResultExt as _;
19use util::path_list::PathList;
20use util::process::Child;
21
22use std::path::PathBuf;
23use std::process::Stdio;
24use std::rc::Rc;
25use std::{any::Any, cell::RefCell};
26use thiserror::Error;
27
28use anyhow::{Context as _, Result};
29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
30
31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
32use terminal::TerminalBuilder;
33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
34
35use crate::GEMINI_ID;
36
37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
38
39#[derive(Debug, Error)]
40#[error("Unsupported version")]
41pub struct UnsupportedVersion;
42
43pub struct AcpConnection {
44 id: AgentId,
45 telemetry_id: SharedString,
46 connection: Rc<acp::ClientSideConnection>,
47 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
48 auth_methods: Vec<acp::AuthMethod>,
49 command: AgentServerCommand,
50 agent_capabilities: acp::AgentCapabilities,
51 default_mode: Option<acp::SessionModeId>,
52 default_model: Option<acp::ModelId>,
53 default_config_options: HashMap<String, String>,
54 child: Child,
55 session_list: Option<Rc<AcpSessionList>>,
56 _io_task: Task<Result<(), acp::Error>>,
57 _wait_task: Task<Result<()>>,
58 _stderr_task: Task<Result<()>>,
59}
60
61struct ConfigOptions {
62 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
63 tx: Rc<RefCell<watch::Sender<()>>>,
64 rx: watch::Receiver<()>,
65}
66
67impl ConfigOptions {
68 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
69 let (tx, rx) = watch::channel(());
70 Self {
71 config_options,
72 tx: Rc::new(RefCell::new(tx)),
73 rx,
74 }
75 }
76}
77
78pub struct AcpSession {
79 thread: WeakEntity<AcpThread>,
80 suppress_abort_err: bool,
81 models: Option<Rc<RefCell<acp::SessionModelState>>>,
82 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
83 config_options: Option<ConfigOptions>,
84}
85
86pub struct AcpSessionList {
87 connection: Rc<acp::ClientSideConnection>,
88 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
89 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
90}
91
92impl AcpSessionList {
93 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
94 let (tx, rx) = smol::channel::unbounded();
95 Self {
96 connection,
97 updates_tx: tx,
98 updates_rx: rx,
99 }
100 }
101
102 fn notify_update(&self) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::Refresh)
105 .log_err();
106 }
107
108 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
109 self.updates_tx
110 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
111 .log_err();
112 }
113}
114
115impl AgentSessionList for AcpSessionList {
116 fn list_sessions(
117 &self,
118 request: AgentSessionListRequest,
119 cx: &mut App,
120 ) -> Task<Result<AgentSessionListResponse>> {
121 let conn = self.connection.clone();
122 cx.foreground_executor().spawn(async move {
123 let acp_request = acp::ListSessionsRequest::new()
124 .cwd(request.cwd)
125 .cursor(request.cursor);
126 let response = conn.list_sessions(acp_request).await?;
127 Ok(AgentSessionListResponse {
128 sessions: response
129 .sessions
130 .into_iter()
131 .map(|s| AgentSessionInfo {
132 session_id: s.session_id,
133 work_dirs: Some(PathList::new(&[s.cwd])),
134 title: s.title.map(Into::into),
135 updated_at: s.updated_at.and_then(|date_str| {
136 chrono::DateTime::parse_from_rfc3339(&date_str)
137 .ok()
138 .map(|dt| dt.with_timezone(&chrono::Utc))
139 }),
140 created_at: None,
141 meta: s.meta,
142 })
143 .collect(),
144 next_cursor: response.next_cursor,
145 meta: response.meta,
146 })
147 })
148 }
149
150 fn watch(
151 &self,
152 _cx: &mut App,
153 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
154 Some(self.updates_rx.clone())
155 }
156
157 fn notify_refresh(&self) {
158 self.notify_update();
159 }
160
161 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
162 self
163 }
164}
165
166pub async fn connect(
167 agent_id: AgentId,
168 project: Entity<Project>,
169 command: AgentServerCommand,
170 default_mode: Option<acp::SessionModeId>,
171 default_model: Option<acp::ModelId>,
172 default_config_options: HashMap<String, String>,
173 cx: &mut AsyncApp,
174) -> Result<Rc<dyn AgentConnection>> {
175 let conn = AcpConnection::stdio(
176 agent_id,
177 project,
178 command.clone(),
179 default_mode,
180 default_model,
181 default_config_options,
182 cx,
183 )
184 .await?;
185 Ok(Rc::new(conn) as _)
186}
187
188const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
189
190impl AcpConnection {
191 pub async fn stdio(
192 agent_id: AgentId,
193 project: Entity<Project>,
194 command: AgentServerCommand,
195 default_mode: Option<acp::SessionModeId>,
196 default_model: Option<acp::ModelId>,
197 default_config_options: HashMap<String, String>,
198 cx: &mut AsyncApp,
199 ) -> Result<Self> {
200 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
201 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
202 let mut child =
203 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
204 child.envs(command.env.iter().flatten());
205 if let Some(cwd) = project.update(cx, |project, cx| {
206 if project.is_local() {
207 project
208 .default_path_list(cx)
209 .ordered_paths()
210 .next()
211 .cloned()
212 } else {
213 None
214 }
215 }) {
216 child.current_dir(cwd);
217 }
218 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
219
220 let stdout = child.stdout.take().context("Failed to take stdout")?;
221 let stdin = child.stdin.take().context("Failed to take stdin")?;
222 let stderr = child.stderr.take().context("Failed to take stderr")?;
223 log::debug!(
224 "Spawning external agent server: {:?}, {:?}",
225 command.path,
226 command.args
227 );
228 log::trace!("Spawned (pid: {})", child.id());
229
230 let sessions = Rc::new(RefCell::new(HashMap::default()));
231
232 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
233 (
234 release_channel::ReleaseChannel::try_global(cx)
235 .map(|release_channel| release_channel.display_name()),
236 release_channel::AppVersion::global(cx).to_string(),
237 )
238 });
239
240 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
241 Rc::new(RefCell::new(None));
242
243 let client = ClientDelegate {
244 sessions: sessions.clone(),
245 session_list: client_session_list.clone(),
246 cx: cx.clone(),
247 };
248 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
249 let foreground_executor = cx.foreground_executor().clone();
250 move |fut| {
251 foreground_executor.spawn(fut).detach();
252 }
253 });
254
255 let io_task = cx.background_spawn(io_task);
256
257 let stderr_task = cx.background_spawn(async move {
258 let mut stderr = BufReader::new(stderr);
259 let mut line = String::new();
260 while let Ok(n) = stderr.read_line(&mut line).await
261 && n > 0
262 {
263 log::warn!("agent stderr: {}", line.trim());
264 line.clear();
265 }
266 Ok(())
267 });
268
269 let wait_task = cx.spawn({
270 let sessions = sessions.clone();
271 let status_fut = child.status();
272 async move |cx| {
273 let status = status_fut.await?;
274
275 for session in sessions.borrow().values() {
276 session
277 .thread
278 .update(cx, |thread, cx| {
279 thread.emit_load_error(LoadError::Exited { status }, cx)
280 })
281 .ok();
282 }
283
284 anyhow::Ok(())
285 }
286 });
287
288 let connection = Rc::new(connection);
289
290 cx.update(|cx| {
291 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
292 registry.set_active_connection(agent_id.clone(), &connection, cx)
293 });
294 });
295
296 let response = connection
297 .initialize(
298 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
299 .client_capabilities(
300 acp::ClientCapabilities::new()
301 .fs(acp::FileSystemCapabilities::new()
302 .read_text_file(true)
303 .write_text_file(true))
304 .terminal(true)
305 .auth(acp::AuthCapabilities::new().terminal(true))
306 // Experimental: Allow for rendering terminal output from the agents
307 .meta(acp::Meta::from_iter([
308 ("terminal_output".into(), true.into()),
309 ("terminal-auth".into(), true.into()),
310 ])),
311 )
312 .client_info(
313 acp::Implementation::new("zed", version)
314 .title(release_channel.map(ToOwned::to_owned)),
315 ),
316 )
317 .await?;
318
319 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
320 return Err(UnsupportedVersion.into());
321 }
322
323 let telemetry_id = response
324 .agent_info
325 // Use the one the agent provides if we have one
326 .map(|info| info.name.into())
327 // Otherwise, just use the name
328 .unwrap_or_else(|| agent_id.0.clone());
329
330 let session_list = if response
331 .agent_capabilities
332 .session_capabilities
333 .list
334 .is_some()
335 {
336 let list = Rc::new(AcpSessionList::new(connection.clone()));
337 *client_session_list.borrow_mut() = Some(list.clone());
338 Some(list)
339 } else {
340 None
341 };
342
343 // TODO: Remove this override once Google team releases their official auth methods
344 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
345 let mut args = command.args.clone();
346 args.retain(|a| a != "--experimental-acp" && a != "--acp");
347 let value = serde_json::json!({
348 "label": "gemini /auth",
349 "command": command.path.to_string_lossy().into_owned(),
350 "args": args,
351 "env": command.env.clone().unwrap_or_default(),
352 });
353 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
354 vec![acp::AuthMethod::Agent(
355 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
356 .description("Login with your Google or Vertex AI account")
357 .meta(meta),
358 )]
359 } else {
360 response.auth_methods
361 };
362 Ok(Self {
363 id: agent_id,
364 auth_methods,
365 command,
366 connection,
367 telemetry_id,
368 sessions,
369 agent_capabilities: response.agent_capabilities,
370 default_mode,
371 default_model,
372 default_config_options,
373 session_list,
374 _io_task: io_task,
375 _wait_task: wait_task,
376 _stderr_task: stderr_task,
377 child,
378 })
379 }
380
381 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
382 &self.agent_capabilities.prompt_capabilities
383 }
384
385 fn apply_default_config_options(
386 &self,
387 session_id: &acp::SessionId,
388 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
389 cx: &mut AsyncApp,
390 ) {
391 let id = self.id.clone();
392 let defaults_to_apply: Vec<_> = {
393 let config_opts_ref = config_options.borrow();
394 config_opts_ref
395 .iter()
396 .filter_map(|config_option| {
397 let default_value = self.default_config_options.get(&*config_option.id.0)?;
398
399 let is_valid = match &config_option.kind {
400 acp::SessionConfigKind::Select(select) => match &select.options {
401 acp::SessionConfigSelectOptions::Ungrouped(options) => options
402 .iter()
403 .any(|opt| &*opt.value.0 == default_value.as_str()),
404 acp::SessionConfigSelectOptions::Grouped(groups) => {
405 groups.iter().any(|g| {
406 g.options
407 .iter()
408 .any(|opt| &*opt.value.0 == default_value.as_str())
409 })
410 }
411 _ => false,
412 },
413 _ => false,
414 };
415
416 if is_valid {
417 let initial_value = match &config_option.kind {
418 acp::SessionConfigKind::Select(select) => {
419 Some(select.current_value.clone())
420 }
421 _ => None,
422 };
423 Some((
424 config_option.id.clone(),
425 default_value.clone(),
426 initial_value,
427 ))
428 } else {
429 log::warn!(
430 "`{}` is not a valid value for config option `{}` in {}",
431 default_value,
432 config_option.id.0,
433 id
434 );
435 None
436 }
437 })
438 .collect()
439 };
440
441 for (config_id, default_value, initial_value) in defaults_to_apply {
442 cx.spawn({
443 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
444 let session_id = session_id.clone();
445 let config_id_clone = config_id.clone();
446 let config_opts = config_options.clone();
447 let conn = self.connection.clone();
448 async move |_| {
449 let result = conn
450 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
451 session_id,
452 config_id_clone.clone(),
453 default_value_id,
454 ))
455 .await
456 .log_err();
457
458 if result.is_none() {
459 if let Some(initial) = initial_value {
460 let mut opts = config_opts.borrow_mut();
461 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
462 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
463 select.current_value = initial;
464 }
465 }
466 }
467 }
468 }
469 })
470 .detach();
471
472 let mut opts = config_options.borrow_mut();
473 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
474 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
475 select.current_value = acp::SessionConfigValueId::new(default_value);
476 }
477 }
478 }
479 }
480}
481
482impl Drop for AcpConnection {
483 fn drop(&mut self) {
484 self.child.kill().log_err();
485 }
486}
487
488fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
489 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
490}
491
492fn terminal_auth_task(
493 command: &AgentServerCommand,
494 agent_id: &AgentId,
495 method: &acp::AuthMethodTerminal,
496) -> SpawnInTerminal {
497 let mut args = command.args.clone();
498 args.extend(method.args.clone());
499
500 let mut env = command.env.clone().unwrap_or_default();
501 env.extend(method.env.clone());
502
503 acp_thread::build_terminal_auth_task(
504 terminal_auth_task_id(agent_id, &method.id),
505 method.name.clone(),
506 command.path.to_string_lossy().into_owned(),
507 args,
508 env,
509 )
510}
511
512/// Used to support the _meta method prior to stabilization
513fn meta_terminal_auth_task(
514 agent_id: &AgentId,
515 method_id: &acp::AuthMethodId,
516 method: &acp::AuthMethod,
517) -> Option<SpawnInTerminal> {
518 #[derive(Deserialize)]
519 struct MetaTerminalAuth {
520 label: String,
521 command: String,
522 #[serde(default)]
523 args: Vec<String>,
524 #[serde(default)]
525 env: HashMap<String, String>,
526 }
527
528 let meta = match method {
529 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
530 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
531 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
532 _ => None,
533 }?;
534 let terminal_auth =
535 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
536
537 Some(acp_thread::build_terminal_auth_task(
538 terminal_auth_task_id(agent_id, method_id),
539 terminal_auth.label.clone(),
540 terminal_auth.command,
541 terminal_auth.args,
542 terminal_auth.env,
543 ))
544}
545
546impl AgentConnection for AcpConnection {
547 fn agent_id(&self) -> AgentId {
548 self.id.clone()
549 }
550
551 fn telemetry_id(&self) -> SharedString {
552 self.telemetry_id.clone()
553 }
554
555 fn new_session(
556 self: Rc<Self>,
557 project: Entity<Project>,
558 work_dirs: PathList,
559 cx: &mut App,
560 ) -> Task<Result<Entity<AcpThread>>> {
561 // TODO: remove this once ACP supports multiple working directories
562 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
563 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
564 };
565 let name = self.id.0.clone();
566 let mcp_servers = mcp_servers_for_project(&project, cx);
567
568 cx.spawn(async move |cx| {
569 let response = self.connection
570 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
571 .await
572 .map_err(map_acp_error)?;
573
574 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
575
576 if let Some(default_mode) = self.default_mode.clone() {
577 if let Some(modes) = modes.as_ref() {
578 let mut modes_ref = modes.borrow_mut();
579 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
580
581 if has_mode {
582 let initial_mode_id = modes_ref.current_mode_id.clone();
583
584 cx.spawn({
585 let default_mode = default_mode.clone();
586 let session_id = response.session_id.clone();
587 let modes = modes.clone();
588 let conn = self.connection.clone();
589 async move |_| {
590 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
591 .await.log_err();
592
593 if result.is_none() {
594 modes.borrow_mut().current_mode_id = initial_mode_id;
595 }
596 }
597 }).detach();
598
599 modes_ref.current_mode_id = default_mode;
600 } else {
601 let available_modes = modes_ref
602 .available_modes
603 .iter()
604 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
605 .collect::<Vec<_>>()
606 .join("\n");
607
608 log::warn!(
609 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
610 );
611 }
612 }
613 }
614
615 if let Some(default_model) = self.default_model.clone() {
616 if let Some(models) = models.as_ref() {
617 let mut models_ref = models.borrow_mut();
618 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
619
620 if has_model {
621 let initial_model_id = models_ref.current_model_id.clone();
622
623 cx.spawn({
624 let default_model = default_model.clone();
625 let session_id = response.session_id.clone();
626 let models = models.clone();
627 let conn = self.connection.clone();
628 async move |_| {
629 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
630 .await.log_err();
631
632 if result.is_none() {
633 models.borrow_mut().current_model_id = initial_model_id;
634 }
635 }
636 }).detach();
637
638 models_ref.current_model_id = default_model;
639 } else {
640 let available_models = models_ref
641 .available_models
642 .iter()
643 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
644 .collect::<Vec<_>>()
645 .join("\n");
646
647 log::warn!(
648 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
649 );
650 }
651 }
652 }
653
654 if let Some(config_opts) = config_options.as_ref() {
655 self.apply_default_config_options(&response.session_id, config_opts, cx);
656 }
657
658 let action_log = cx.new(|_| ActionLog::new(project.clone()));
659 let thread: Entity<AcpThread> = cx.new(|cx| {
660 AcpThread::new(
661 None,
662 None,
663 Some(work_dirs),
664 self.clone(),
665 project,
666 action_log,
667 response.session_id.clone(),
668 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
669 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
670 cx,
671 )
672 });
673
674 self.sessions.borrow_mut().insert(
675 response.session_id,
676 AcpSession {
677 thread: thread.downgrade(),
678 suppress_abort_err: false,
679 session_modes: modes,
680 models,
681 config_options: config_options.map(ConfigOptions::new),
682 },
683 );
684
685 Ok(thread)
686 })
687 }
688
689 fn supports_load_session(&self) -> bool {
690 self.agent_capabilities.load_session
691 }
692
693 fn supports_resume_session(&self) -> bool {
694 self.agent_capabilities
695 .session_capabilities
696 .resume
697 .is_some()
698 }
699
700 fn load_session(
701 self: Rc<Self>,
702 session_id: acp::SessionId,
703 project: Entity<Project>,
704 work_dirs: PathList,
705 title: Option<SharedString>,
706 cx: &mut App,
707 ) -> Task<Result<Entity<AcpThread>>> {
708 if !self.agent_capabilities.load_session {
709 return Task::ready(Err(anyhow!(LoadError::Other(
710 "Loading sessions is not supported by this agent.".into()
711 ))));
712 }
713 // TODO: remove this once ACP supports multiple working directories
714 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
715 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
716 };
717
718 let mcp_servers = mcp_servers_for_project(&project, cx);
719 let action_log = cx.new(|_| ActionLog::new(project.clone()));
720 let thread: Entity<AcpThread> = cx.new(|cx| {
721 AcpThread::new(
722 None,
723 title,
724 Some(work_dirs.clone()),
725 self.clone(),
726 project,
727 action_log,
728 session_id.clone(),
729 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
730 cx,
731 )
732 });
733
734 self.sessions.borrow_mut().insert(
735 session_id.clone(),
736 AcpSession {
737 thread: thread.downgrade(),
738 suppress_abort_err: false,
739 session_modes: None,
740 models: None,
741 config_options: None,
742 },
743 );
744
745 cx.spawn(async move |cx| {
746 let response = match self
747 .connection
748 .load_session(
749 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
750 )
751 .await
752 {
753 Ok(response) => response,
754 Err(err) => {
755 self.sessions.borrow_mut().remove(&session_id);
756 return Err(map_acp_error(err));
757 }
758 };
759
760 let (modes, models, config_options) =
761 config_state(response.modes, response.models, response.config_options);
762
763 if let Some(config_opts) = config_options.as_ref() {
764 self.apply_default_config_options(&session_id, config_opts, cx);
765 }
766
767 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
768 session.session_modes = modes;
769 session.models = models;
770 session.config_options = config_options.map(ConfigOptions::new);
771 }
772
773 Ok(thread)
774 })
775 }
776
777 fn resume_session(
778 self: Rc<Self>,
779 session_id: acp::SessionId,
780 project: Entity<Project>,
781 work_dirs: PathList,
782 title: Option<SharedString>,
783 cx: &mut App,
784 ) -> Task<Result<Entity<AcpThread>>> {
785 if self
786 .agent_capabilities
787 .session_capabilities
788 .resume
789 .is_none()
790 {
791 return Task::ready(Err(anyhow!(LoadError::Other(
792 "Resuming sessions is not supported by this agent.".into()
793 ))));
794 }
795 // TODO: remove this once ACP supports multiple working directories
796 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
797 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
798 };
799
800 let mcp_servers = mcp_servers_for_project(&project, cx);
801 let action_log = cx.new(|_| ActionLog::new(project.clone()));
802 let thread: Entity<AcpThread> = cx.new(|cx| {
803 AcpThread::new(
804 None,
805 title,
806 Some(work_dirs),
807 self.clone(),
808 project,
809 action_log,
810 session_id.clone(),
811 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
812 cx,
813 )
814 });
815
816 self.sessions.borrow_mut().insert(
817 session_id.clone(),
818 AcpSession {
819 thread: thread.downgrade(),
820 suppress_abort_err: false,
821 session_modes: None,
822 models: None,
823 config_options: None,
824 },
825 );
826
827 cx.spawn(async move |cx| {
828 let response = match self
829 .connection
830 .resume_session(
831 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
832 .mcp_servers(mcp_servers),
833 )
834 .await
835 {
836 Ok(response) => response,
837 Err(err) => {
838 self.sessions.borrow_mut().remove(&session_id);
839 return Err(map_acp_error(err));
840 }
841 };
842
843 let (modes, models, config_options) =
844 config_state(response.modes, response.models, response.config_options);
845
846 if let Some(config_opts) = config_options.as_ref() {
847 self.apply_default_config_options(&session_id, config_opts, cx);
848 }
849
850 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
851 session.session_modes = modes;
852 session.models = models;
853 session.config_options = config_options.map(ConfigOptions::new);
854 }
855
856 Ok(thread)
857 })
858 }
859
860 fn supports_close_session(&self) -> bool {
861 self.agent_capabilities.session_capabilities.close.is_some()
862 }
863
864 fn close_session(
865 self: Rc<Self>,
866 session_id: &acp::SessionId,
867 cx: &mut App,
868 ) -> Task<Result<()>> {
869 if !self.supports_close_session() {
870 return Task::ready(Err(anyhow!(LoadError::Other(
871 "Closing sessions is not supported by this agent.".into()
872 ))));
873 }
874
875 let conn = self.connection.clone();
876 let session_id = session_id.clone();
877 cx.foreground_executor().spawn(async move {
878 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
879 .await?;
880 self.sessions.borrow_mut().remove(&session_id);
881 Ok(())
882 })
883 }
884
885 fn auth_methods(&self) -> &[acp::AuthMethod] {
886 &self.auth_methods
887 }
888
889 fn terminal_auth_task(
890 &self,
891 method_id: &acp::AuthMethodId,
892 cx: &App,
893 ) -> Option<SpawnInTerminal> {
894 let method = self
895 .auth_methods
896 .iter()
897 .find(|method| method.id() == method_id)?;
898
899 match method {
900 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
901 Some(terminal_auth_task(&self.command, &self.id, terminal))
902 }
903 _ => meta_terminal_auth_task(&self.id, method_id, method),
904 }
905 }
906
907 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
908 let conn = self.connection.clone();
909 cx.foreground_executor().spawn(async move {
910 conn.authenticate(acp::AuthenticateRequest::new(method_id))
911 .await?;
912 Ok(())
913 })
914 }
915
916 fn prompt(
917 &self,
918 _id: Option<acp_thread::UserMessageId>,
919 params: acp::PromptRequest,
920 cx: &mut App,
921 ) -> Task<Result<acp::PromptResponse>> {
922 let conn = self.connection.clone();
923 let sessions = self.sessions.clone();
924 let session_id = params.session_id.clone();
925 cx.foreground_executor().spawn(async move {
926 let result = conn.prompt(params).await;
927
928 let mut suppress_abort_err = false;
929
930 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
931 suppress_abort_err = session.suppress_abort_err;
932 session.suppress_abort_err = false;
933 }
934
935 match result {
936 Ok(response) => Ok(response),
937 Err(err) => {
938 if err.code == acp::ErrorCode::AuthRequired {
939 return Err(anyhow!(acp::Error::auth_required()));
940 }
941
942 if err.code != ErrorCode::InternalError {
943 anyhow::bail!(err)
944 }
945
946 let Some(data) = &err.data else {
947 anyhow::bail!(err)
948 };
949
950 // Temporary workaround until the following PR is generally available:
951 // https://github.com/google-gemini/gemini-cli/pull/6656
952
953 #[derive(Deserialize)]
954 #[serde(deny_unknown_fields)]
955 struct ErrorDetails {
956 details: Box<str>,
957 }
958
959 match serde_json::from_value(data.clone()) {
960 Ok(ErrorDetails { details }) => {
961 if suppress_abort_err
962 && (details.contains("This operation was aborted")
963 || details.contains("The user aborted a request"))
964 {
965 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
966 } else {
967 Err(anyhow!(details))
968 }
969 }
970 Err(_) => Err(anyhow!(err)),
971 }
972 }
973 }
974 })
975 }
976
977 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
978 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
979 session.suppress_abort_err = true;
980 }
981 let conn = self.connection.clone();
982 let params = acp::CancelNotification::new(session_id.clone());
983 cx.foreground_executor()
984 .spawn(async move { conn.cancel(params).await })
985 .detach();
986 }
987
988 fn session_modes(
989 &self,
990 session_id: &acp::SessionId,
991 _cx: &App,
992 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
993 let sessions = self.sessions.clone();
994 let sessions_ref = sessions.borrow();
995 let Some(session) = sessions_ref.get(session_id) else {
996 return None;
997 };
998
999 if let Some(modes) = session.session_modes.as_ref() {
1000 Some(Rc::new(AcpSessionModes {
1001 connection: self.connection.clone(),
1002 session_id: session_id.clone(),
1003 state: modes.clone(),
1004 }) as _)
1005 } else {
1006 None
1007 }
1008 }
1009
1010 fn model_selector(
1011 &self,
1012 session_id: &acp::SessionId,
1013 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1014 let sessions = self.sessions.clone();
1015 let sessions_ref = sessions.borrow();
1016 let Some(session) = sessions_ref.get(session_id) else {
1017 return None;
1018 };
1019
1020 if let Some(models) = session.models.as_ref() {
1021 Some(Rc::new(AcpModelSelector::new(
1022 session_id.clone(),
1023 self.connection.clone(),
1024 models.clone(),
1025 )) as _)
1026 } else {
1027 None
1028 }
1029 }
1030
1031 fn session_config_options(
1032 &self,
1033 session_id: &acp::SessionId,
1034 _cx: &App,
1035 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1036 let sessions = self.sessions.borrow();
1037 let session = sessions.get(session_id)?;
1038
1039 let config_opts = session.config_options.as_ref()?;
1040
1041 Some(Rc::new(AcpSessionConfigOptions {
1042 session_id: session_id.clone(),
1043 connection: self.connection.clone(),
1044 state: config_opts.config_options.clone(),
1045 watch_tx: config_opts.tx.clone(),
1046 watch_rx: config_opts.rx.clone(),
1047 }) as _)
1048 }
1049
1050 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1051 self.session_list.clone().map(|s| s as _)
1052 }
1053
1054 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1055 self
1056 }
1057}
1058
1059fn map_acp_error(err: acp::Error) -> anyhow::Error {
1060 if err.code == acp::ErrorCode::AuthRequired {
1061 let mut error = AuthRequired::new();
1062
1063 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1064 error = error.with_description(err.message);
1065 }
1066
1067 anyhow!(error)
1068 } else {
1069 anyhow!(err)
1070 }
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075 use super::*;
1076
1077 #[test]
1078 fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1079 let command = AgentServerCommand {
1080 path: "/path/to/agent".into(),
1081 args: vec!["--acp".into(), "--verbose".into()],
1082 env: Some(HashMap::from_iter([
1083 ("BASE".into(), "1".into()),
1084 ("SHARED".into(), "base".into()),
1085 ])),
1086 };
1087 let method = acp::AuthMethodTerminal::new("login", "Login")
1088 .args(vec!["/auth".into()])
1089 .env(std::collections::HashMap::from_iter([
1090 ("EXTRA".into(), "2".into()),
1091 ("SHARED".into(), "override".into()),
1092 ]));
1093
1094 let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1095
1096 assert_eq!(
1097 terminal_auth_task.command.as_deref(),
1098 Some("/path/to/agent")
1099 );
1100 assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1101 assert_eq!(
1102 terminal_auth_task.env,
1103 HashMap::from_iter([
1104 ("BASE".into(), "1".into()),
1105 ("SHARED".into(), "override".into()),
1106 ("EXTRA".into(), "2".into()),
1107 ])
1108 );
1109 assert_eq!(terminal_auth_task.label, "Login");
1110 assert_eq!(terminal_auth_task.command_label, "Login");
1111 }
1112
1113 #[test]
1114 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1115 let method_id = acp::AuthMethodId::new("legacy-login");
1116 let method = acp::AuthMethod::Agent(
1117 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1118 "terminal-auth".to_string(),
1119 serde_json::json!({
1120 "label": "legacy /auth",
1121 "command": "legacy-agent",
1122 "args": ["auth", "--interactive"],
1123 "env": {
1124 "AUTH_MODE": "interactive",
1125 },
1126 }),
1127 )])),
1128 );
1129
1130 let terminal_auth_task =
1131 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1132 .expect("expected legacy terminal auth task");
1133
1134 assert_eq!(
1135 terminal_auth_task.id.0,
1136 "external-agent-test-agent-legacy-login-login"
1137 );
1138 assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1139 assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1140 assert_eq!(
1141 terminal_auth_task.env,
1142 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1143 );
1144 assert_eq!(terminal_auth_task.label, "legacy /auth");
1145 }
1146
1147 #[test]
1148 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1149 let method_id = acp::AuthMethodId::new("legacy-login");
1150 let method = acp::AuthMethod::Agent(
1151 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1152 "terminal-auth".to_string(),
1153 serde_json::json!({
1154 "label": "legacy /auth",
1155 }),
1156 )])),
1157 );
1158
1159 assert!(
1160 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1161 );
1162 }
1163
1164 #[test]
1165 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1166 let method_id = acp::AuthMethodId::new("login");
1167 let method = acp::AuthMethod::Terminal(
1168 acp::AuthMethodTerminal::new(method_id, "Login")
1169 .args(vec!["/auth".into()])
1170 .env(std::collections::HashMap::from_iter([(
1171 "AUTH_MODE".into(),
1172 "first-class".into(),
1173 )]))
1174 .meta(acp::Meta::from_iter([(
1175 "terminal-auth".to_string(),
1176 serde_json::json!({
1177 "label": "legacy /auth",
1178 "command": "legacy-agent",
1179 "args": ["legacy-auth"],
1180 "env": {
1181 "AUTH_MODE": "legacy",
1182 },
1183 }),
1184 )])),
1185 );
1186
1187 let command = AgentServerCommand {
1188 path: "/path/to/agent".into(),
1189 args: vec!["--acp".into()],
1190 env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1191 };
1192
1193 let terminal_auth_task = match &method {
1194 acp::AuthMethod::Terminal(terminal) => {
1195 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1196 }
1197 _ => unreachable!(),
1198 };
1199
1200 assert_eq!(
1201 terminal_auth_task.command.as_deref(),
1202 Some("/path/to/agent")
1203 );
1204 assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1205 assert_eq!(
1206 terminal_auth_task.env,
1207 HashMap::from_iter([
1208 ("BASE".into(), "1".into()),
1209 ("AUTH_MODE".into(), "first-class".into()),
1210 ])
1211 );
1212 assert_eq!(terminal_auth_task.label, "Login");
1213 }
1214}
1215
1216fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1217 let context_server_store = project.read(cx).context_server_store().read(cx);
1218 let is_local = project.read(cx).is_local();
1219 context_server_store
1220 .configured_server_ids()
1221 .iter()
1222 .filter_map(|id| {
1223 let configuration = context_server_store.configuration_for_server(id)?;
1224 match &*configuration {
1225 project::context_server_store::ContextServerConfiguration::Custom {
1226 command,
1227 remote,
1228 ..
1229 }
1230 | project::context_server_store::ContextServerConfiguration::Extension {
1231 command,
1232 remote,
1233 ..
1234 } if is_local || *remote => Some(acp::McpServer::Stdio(
1235 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1236 .args(command.args.clone())
1237 .env(if let Some(env) = command.env.as_ref() {
1238 env.iter()
1239 .map(|(name, value)| acp::EnvVariable::new(name, value))
1240 .collect()
1241 } else {
1242 vec![]
1243 }),
1244 )),
1245 project::context_server_store::ContextServerConfiguration::Http {
1246 url,
1247 headers,
1248 timeout: _,
1249 } => Some(acp::McpServer::Http(
1250 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1251 headers
1252 .iter()
1253 .map(|(name, value)| acp::HttpHeader::new(name, value))
1254 .collect(),
1255 ),
1256 )),
1257 _ => None,
1258 }
1259 })
1260 .collect()
1261}
1262
1263fn config_state(
1264 modes: Option<acp::SessionModeState>,
1265 models: Option<acp::SessionModelState>,
1266 config_options: Option<Vec<acp::SessionConfigOption>>,
1267) -> (
1268 Option<Rc<RefCell<acp::SessionModeState>>>,
1269 Option<Rc<RefCell<acp::SessionModelState>>>,
1270 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1271) {
1272 if let Some(opts) = config_options {
1273 return (None, None, Some(Rc::new(RefCell::new(opts))));
1274 }
1275
1276 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1277 let models = models.map(|models| Rc::new(RefCell::new(models)));
1278 (modes, models, None)
1279}
1280
1281struct AcpSessionModes {
1282 session_id: acp::SessionId,
1283 connection: Rc<acp::ClientSideConnection>,
1284 state: Rc<RefCell<acp::SessionModeState>>,
1285}
1286
1287impl acp_thread::AgentSessionModes for AcpSessionModes {
1288 fn current_mode(&self) -> acp::SessionModeId {
1289 self.state.borrow().current_mode_id.clone()
1290 }
1291
1292 fn all_modes(&self) -> Vec<acp::SessionMode> {
1293 self.state.borrow().available_modes.clone()
1294 }
1295
1296 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1297 let connection = self.connection.clone();
1298 let session_id = self.session_id.clone();
1299 let old_mode_id;
1300 {
1301 let mut state = self.state.borrow_mut();
1302 old_mode_id = state.current_mode_id.clone();
1303 state.current_mode_id = mode_id.clone();
1304 };
1305 let state = self.state.clone();
1306 cx.foreground_executor().spawn(async move {
1307 let result = connection
1308 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1309 .await;
1310
1311 if result.is_err() {
1312 state.borrow_mut().current_mode_id = old_mode_id;
1313 }
1314
1315 result?;
1316
1317 Ok(())
1318 })
1319 }
1320}
1321
1322struct AcpModelSelector {
1323 session_id: acp::SessionId,
1324 connection: Rc<acp::ClientSideConnection>,
1325 state: Rc<RefCell<acp::SessionModelState>>,
1326}
1327
1328impl AcpModelSelector {
1329 fn new(
1330 session_id: acp::SessionId,
1331 connection: Rc<acp::ClientSideConnection>,
1332 state: Rc<RefCell<acp::SessionModelState>>,
1333 ) -> Self {
1334 Self {
1335 session_id,
1336 connection,
1337 state,
1338 }
1339 }
1340}
1341
1342impl acp_thread::AgentModelSelector for AcpModelSelector {
1343 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1344 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1345 self.state
1346 .borrow()
1347 .available_models
1348 .clone()
1349 .into_iter()
1350 .map(acp_thread::AgentModelInfo::from)
1351 .collect(),
1352 )))
1353 }
1354
1355 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1356 let connection = self.connection.clone();
1357 let session_id = self.session_id.clone();
1358 let old_model_id;
1359 {
1360 let mut state = self.state.borrow_mut();
1361 old_model_id = state.current_model_id.clone();
1362 state.current_model_id = model_id.clone();
1363 };
1364 let state = self.state.clone();
1365 cx.foreground_executor().spawn(async move {
1366 let result = connection
1367 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1368 .await;
1369
1370 if result.is_err() {
1371 state.borrow_mut().current_model_id = old_model_id;
1372 }
1373
1374 result?;
1375
1376 Ok(())
1377 })
1378 }
1379
1380 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1381 let state = self.state.borrow();
1382 Task::ready(
1383 state
1384 .available_models
1385 .iter()
1386 .find(|m| m.model_id == state.current_model_id)
1387 .cloned()
1388 .map(acp_thread::AgentModelInfo::from)
1389 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1390 )
1391 }
1392}
1393
1394struct AcpSessionConfigOptions {
1395 session_id: acp::SessionId,
1396 connection: Rc<acp::ClientSideConnection>,
1397 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1398 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1399 watch_rx: watch::Receiver<()>,
1400}
1401
1402impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1403 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1404 self.state.borrow().clone()
1405 }
1406
1407 fn set_config_option(
1408 &self,
1409 config_id: acp::SessionConfigId,
1410 value: acp::SessionConfigValueId,
1411 cx: &mut App,
1412 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1413 let connection = self.connection.clone();
1414 let session_id = self.session_id.clone();
1415 let state = self.state.clone();
1416
1417 let watch_tx = self.watch_tx.clone();
1418
1419 cx.foreground_executor().spawn(async move {
1420 let response = connection
1421 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1422 session_id, config_id, value,
1423 ))
1424 .await?;
1425
1426 *state.borrow_mut() = response.config_options.clone();
1427 watch_tx.borrow_mut().send(()).ok();
1428 Ok(response.config_options)
1429 })
1430 }
1431
1432 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1433 Some(self.watch_rx.clone())
1434 }
1435}
1436
1437struct ClientDelegate {
1438 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1439 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1440 cx: AsyncApp,
1441}
1442
1443#[async_trait::async_trait(?Send)]
1444impl acp::Client for ClientDelegate {
1445 async fn request_permission(
1446 &self,
1447 arguments: acp::RequestPermissionRequest,
1448 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1449 let thread;
1450 {
1451 let sessions_ref = self.sessions.borrow();
1452 let session = sessions_ref
1453 .get(&arguments.session_id)
1454 .context("Failed to get session")?;
1455 thread = session.thread.clone();
1456 }
1457
1458 let cx = &mut self.cx.clone();
1459
1460 let task = thread.update(cx, |thread, cx| {
1461 thread.request_tool_call_authorization(
1462 arguments.tool_call,
1463 acp_thread::PermissionOptions::Flat(arguments.options),
1464 cx,
1465 )
1466 })??;
1467
1468 let outcome = task.await;
1469
1470 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1471 }
1472
1473 async fn write_text_file(
1474 &self,
1475 arguments: acp::WriteTextFileRequest,
1476 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1477 let cx = &mut self.cx.clone();
1478 let task = self
1479 .session_thread(&arguments.session_id)?
1480 .update(cx, |thread, cx| {
1481 thread.write_text_file(arguments.path, arguments.content, cx)
1482 })?;
1483
1484 task.await?;
1485
1486 Ok(Default::default())
1487 }
1488
1489 async fn read_text_file(
1490 &self,
1491 arguments: acp::ReadTextFileRequest,
1492 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1493 let task = self.session_thread(&arguments.session_id)?.update(
1494 &mut self.cx.clone(),
1495 |thread, cx| {
1496 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1497 },
1498 )?;
1499
1500 let content = task.await?;
1501
1502 Ok(acp::ReadTextFileResponse::new(content))
1503 }
1504
1505 async fn session_notification(
1506 &self,
1507 notification: acp::SessionNotification,
1508 ) -> Result<(), acp::Error> {
1509 let sessions = self.sessions.borrow();
1510 let session = sessions
1511 .get(¬ification.session_id)
1512 .context("Failed to get session")?;
1513
1514 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1515 current_mode_id,
1516 ..
1517 }) = ¬ification.update
1518 {
1519 if let Some(session_modes) = &session.session_modes {
1520 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1521 }
1522 }
1523
1524 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1525 config_options,
1526 ..
1527 }) = ¬ification.update
1528 {
1529 if let Some(opts) = &session.config_options {
1530 *opts.config_options.borrow_mut() = config_options.clone();
1531 opts.tx.borrow_mut().send(()).ok();
1532 }
1533 }
1534
1535 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1536 && let Some(session_list) = self.session_list.borrow().as_ref()
1537 {
1538 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1539 }
1540
1541 // Clone so we can inspect meta both before and after handing off to the thread
1542 let update_clone = notification.update.clone();
1543
1544 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1545 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1546 if let Some(meta) = &tc.meta {
1547 if let Some(terminal_info) = meta.get("terminal_info") {
1548 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1549 {
1550 let terminal_id = acp::TerminalId::new(id_str);
1551 let cwd = terminal_info
1552 .get("cwd")
1553 .and_then(|v| v.as_str().map(PathBuf::from));
1554
1555 // Create a minimal display-only lower-level terminal and register it.
1556 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1557 let builder = TerminalBuilder::new_display_only(
1558 CursorShape::default(),
1559 AlternateScroll::On,
1560 None,
1561 0,
1562 cx.background_executor(),
1563 thread.project().read(cx).path_style(cx),
1564 )?;
1565 let lower = cx.new(|cx| builder.subscribe(cx));
1566 thread.on_terminal_provider_event(
1567 TerminalProviderEvent::Created {
1568 terminal_id,
1569 label: tc.title.clone(),
1570 cwd,
1571 output_byte_limit: None,
1572 terminal: lower,
1573 },
1574 cx,
1575 );
1576 anyhow::Ok(())
1577 });
1578 }
1579 }
1580 }
1581 }
1582
1583 // Forward the update to the acp_thread as usual.
1584 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1585 thread.handle_session_update(notification.update.clone(), cx)
1586 })??;
1587
1588 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1589 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1590 if let Some(meta) = &tcu.meta {
1591 if let Some(term_out) = meta.get("terminal_output") {
1592 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1593 let terminal_id = acp::TerminalId::new(id_str);
1594 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1595 let data = s.as_bytes().to_vec();
1596 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1597 thread.on_terminal_provider_event(
1598 TerminalProviderEvent::Output { terminal_id, data },
1599 cx,
1600 );
1601 });
1602 }
1603 }
1604 }
1605
1606 // terminal_exit
1607 if let Some(term_exit) = meta.get("terminal_exit") {
1608 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1609 let terminal_id = acp::TerminalId::new(id_str);
1610 let status = acp::TerminalExitStatus::new()
1611 .exit_code(
1612 term_exit
1613 .get("exit_code")
1614 .and_then(|v| v.as_u64())
1615 .map(|i| i as u32),
1616 )
1617 .signal(
1618 term_exit
1619 .get("signal")
1620 .and_then(|v| v.as_str().map(|s| s.to_string())),
1621 );
1622
1623 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1624 thread.on_terminal_provider_event(
1625 TerminalProviderEvent::Exit {
1626 terminal_id,
1627 status,
1628 },
1629 cx,
1630 );
1631 });
1632 }
1633 }
1634 }
1635 }
1636
1637 Ok(())
1638 }
1639
1640 async fn create_terminal(
1641 &self,
1642 args: acp::CreateTerminalRequest,
1643 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1644 let thread = self.session_thread(&args.session_id)?;
1645 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1646
1647 let terminal_entity = acp_thread::create_terminal_entity(
1648 args.command.clone(),
1649 &args.args,
1650 args.env
1651 .into_iter()
1652 .map(|env| (env.name, env.value))
1653 .collect(),
1654 args.cwd.clone(),
1655 &project,
1656 &mut self.cx.clone(),
1657 )
1658 .await?;
1659
1660 // Register with renderer
1661 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1662 thread.register_terminal_created(
1663 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1664 format!("{} {}", args.command, args.args.join(" ")),
1665 args.cwd.clone(),
1666 args.output_byte_limit,
1667 terminal_entity,
1668 cx,
1669 )
1670 })?;
1671 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1672 Ok(acp::CreateTerminalResponse::new(terminal_id))
1673 }
1674
1675 async fn kill_terminal(
1676 &self,
1677 args: acp::KillTerminalRequest,
1678 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1679 self.session_thread(&args.session_id)?
1680 .update(&mut self.cx.clone(), |thread, cx| {
1681 thread.kill_terminal(args.terminal_id, cx)
1682 })??;
1683
1684 Ok(Default::default())
1685 }
1686
1687 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1688 Err(acp::Error::method_not_found())
1689 }
1690
1691 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1692 Err(acp::Error::method_not_found())
1693 }
1694
1695 async fn release_terminal(
1696 &self,
1697 args: acp::ReleaseTerminalRequest,
1698 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1699 self.session_thread(&args.session_id)?
1700 .update(&mut self.cx.clone(), |thread, cx| {
1701 thread.release_terminal(args.terminal_id, cx)
1702 })??;
1703
1704 Ok(Default::default())
1705 }
1706
1707 async fn terminal_output(
1708 &self,
1709 args: acp::TerminalOutputRequest,
1710 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1711 self.session_thread(&args.session_id)?
1712 .read_with(&mut self.cx.clone(), |thread, cx| {
1713 let out = thread
1714 .terminal(args.terminal_id)?
1715 .read(cx)
1716 .current_output(cx);
1717
1718 Ok(out)
1719 })?
1720 }
1721
1722 async fn wait_for_terminal_exit(
1723 &self,
1724 args: acp::WaitForTerminalExitRequest,
1725 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1726 let exit_status = self
1727 .session_thread(&args.session_id)?
1728 .update(&mut self.cx.clone(), |thread, cx| {
1729 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1730 })??
1731 .await;
1732
1733 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1734 }
1735}
1736
1737impl ClientDelegate {
1738 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1739 let sessions = self.sessions.borrow();
1740 sessions
1741 .get(session_id)
1742 .context("Failed to get session")
1743 .map(|session| session.thread.clone())
1744 }
1745}