1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::agent_server_store::AgentServerCommand;
14use project::{AgentId, Project};
15use serde::Deserialize;
16use settings::Settings as _;
17use task::{ShellBuilder, SpawnInTerminal};
18use util::ResultExt as _;
19use util::path_list::PathList;
20use util::process::Child;
21
22use std::path::PathBuf;
23use std::process::Stdio;
24use std::rc::Rc;
25use std::{any::Any, cell::RefCell};
26use thiserror::Error;
27
28use anyhow::{Context as _, Result};
29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
30
31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
32use terminal::TerminalBuilder;
33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
34
35use crate::GEMINI_ID;
36
37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
38
39#[derive(Debug, Error)]
40#[error("Unsupported version")]
41pub struct UnsupportedVersion;
42
43pub struct AcpConnection {
44 id: AgentId,
45 telemetry_id: SharedString,
46 connection: Rc<acp::ClientSideConnection>,
47 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
48 auth_methods: Vec<acp::AuthMethod>,
49 command: AgentServerCommand,
50 agent_capabilities: acp::AgentCapabilities,
51 default_mode: Option<acp::SessionModeId>,
52 default_model: Option<acp::ModelId>,
53 default_config_options: HashMap<String, String>,
54 child: Child,
55 session_list: Option<Rc<AcpSessionList>>,
56 _io_task: Task<Result<(), acp::Error>>,
57 _wait_task: Task<Result<()>>,
58 _stderr_task: Task<Result<()>>,
59}
60
61struct ConfigOptions {
62 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
63 tx: Rc<RefCell<watch::Sender<()>>>,
64 rx: watch::Receiver<()>,
65}
66
67impl ConfigOptions {
68 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
69 let (tx, rx) = watch::channel(());
70 Self {
71 config_options,
72 tx: Rc::new(RefCell::new(tx)),
73 rx,
74 }
75 }
76}
77
78pub struct AcpSession {
79 thread: WeakEntity<AcpThread>,
80 suppress_abort_err: bool,
81 models: Option<Rc<RefCell<acp::SessionModelState>>>,
82 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
83 config_options: Option<ConfigOptions>,
84}
85
86pub struct AcpSessionList {
87 connection: Rc<acp::ClientSideConnection>,
88 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
89 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
90}
91
92impl AcpSessionList {
93 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
94 let (tx, rx) = smol::channel::unbounded();
95 Self {
96 connection,
97 updates_tx: tx,
98 updates_rx: rx,
99 }
100 }
101
102 fn notify_update(&self) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::Refresh)
105 .log_err();
106 }
107
108 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
109 self.updates_tx
110 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
111 .log_err();
112 }
113}
114
115impl AgentSessionList for AcpSessionList {
116 fn list_sessions(
117 &self,
118 request: AgentSessionListRequest,
119 cx: &mut App,
120 ) -> Task<Result<AgentSessionListResponse>> {
121 let conn = self.connection.clone();
122 cx.foreground_executor().spawn(async move {
123 let acp_request = acp::ListSessionsRequest::new()
124 .cwd(request.cwd)
125 .cursor(request.cursor);
126 let response = conn.list_sessions(acp_request).await?;
127 Ok(AgentSessionListResponse {
128 sessions: response
129 .sessions
130 .into_iter()
131 .map(|s| AgentSessionInfo {
132 session_id: s.session_id,
133 work_dirs: Some(PathList::new(&[s.cwd])),
134 title: s.title.map(Into::into),
135 updated_at: s.updated_at.and_then(|date_str| {
136 chrono::DateTime::parse_from_rfc3339(&date_str)
137 .ok()
138 .map(|dt| dt.with_timezone(&chrono::Utc))
139 }),
140 created_at: None,
141 meta: s.meta,
142 })
143 .collect(),
144 next_cursor: response.next_cursor,
145 meta: response.meta,
146 })
147 })
148 }
149
150 fn watch(
151 &self,
152 _cx: &mut App,
153 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
154 Some(self.updates_rx.clone())
155 }
156
157 fn notify_refresh(&self) {
158 self.notify_update();
159 }
160
161 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
162 self
163 }
164}
165
166pub async fn connect(
167 agent_id: AgentId,
168 project: Entity<Project>,
169 command: AgentServerCommand,
170 default_mode: Option<acp::SessionModeId>,
171 default_model: Option<acp::ModelId>,
172 default_config_options: HashMap<String, String>,
173 cx: &mut AsyncApp,
174) -> Result<Rc<dyn AgentConnection>> {
175 let conn = AcpConnection::stdio(
176 agent_id,
177 project,
178 command.clone(),
179 default_mode,
180 default_model,
181 default_config_options,
182 cx,
183 )
184 .await?;
185 Ok(Rc::new(conn) as _)
186}
187
188const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
189
190impl AcpConnection {
191 pub async fn stdio(
192 agent_id: AgentId,
193 project: Entity<Project>,
194 command: AgentServerCommand,
195 default_mode: Option<acp::SessionModeId>,
196 default_model: Option<acp::ModelId>,
197 default_config_options: HashMap<String, String>,
198 cx: &mut AsyncApp,
199 ) -> Result<Self> {
200 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
201 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
202 let mut child =
203 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
204 child.envs(command.env.iter().flatten());
205 if let Some(cwd) = project.update(cx, |project, cx| {
206 project
207 .default_path_list(cx)
208 .ordered_paths()
209 .next()
210 .cloned()
211 }) {
212 child.current_dir(cwd);
213 }
214 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
215
216 let stdout = child.stdout.take().context("Failed to take stdout")?;
217 let stdin = child.stdin.take().context("Failed to take stdin")?;
218 let stderr = child.stderr.take().context("Failed to take stderr")?;
219 log::debug!(
220 "Spawning external agent server: {:?}, {:?}",
221 command.path,
222 command.args
223 );
224 log::trace!("Spawned (pid: {})", child.id());
225
226 let sessions = Rc::new(RefCell::new(HashMap::default()));
227
228 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
229 (
230 release_channel::ReleaseChannel::try_global(cx)
231 .map(|release_channel| release_channel.display_name()),
232 release_channel::AppVersion::global(cx).to_string(),
233 )
234 });
235
236 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
237 Rc::new(RefCell::new(None));
238
239 let client = ClientDelegate {
240 sessions: sessions.clone(),
241 session_list: client_session_list.clone(),
242 cx: cx.clone(),
243 };
244 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
245 let foreground_executor = cx.foreground_executor().clone();
246 move |fut| {
247 foreground_executor.spawn(fut).detach();
248 }
249 });
250
251 let io_task = cx.background_spawn(io_task);
252
253 let stderr_task = cx.background_spawn(async move {
254 let mut stderr = BufReader::new(stderr);
255 let mut line = String::new();
256 while let Ok(n) = stderr.read_line(&mut line).await
257 && n > 0
258 {
259 log::warn!("agent stderr: {}", line.trim());
260 line.clear();
261 }
262 Ok(())
263 });
264
265 let wait_task = cx.spawn({
266 let sessions = sessions.clone();
267 let status_fut = child.status();
268 async move |cx| {
269 let status = status_fut.await?;
270
271 for session in sessions.borrow().values() {
272 session
273 .thread
274 .update(cx, |thread, cx| {
275 thread.emit_load_error(LoadError::Exited { status }, cx)
276 })
277 .ok();
278 }
279
280 anyhow::Ok(())
281 }
282 });
283
284 let connection = Rc::new(connection);
285
286 cx.update(|cx| {
287 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
288 registry.set_active_connection(agent_id.clone(), &connection, cx)
289 });
290 });
291
292 let response = connection
293 .initialize(
294 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
295 .client_capabilities(
296 acp::ClientCapabilities::new()
297 .fs(acp::FileSystemCapabilities::new()
298 .read_text_file(true)
299 .write_text_file(true))
300 .terminal(true)
301 .auth(acp::AuthCapabilities::new().terminal(true))
302 // Experimental: Allow for rendering terminal output from the agents
303 .meta(acp::Meta::from_iter([
304 ("terminal_output".into(), true.into()),
305 ("terminal-auth".into(), true.into()),
306 ])),
307 )
308 .client_info(
309 acp::Implementation::new("zed", version)
310 .title(release_channel.map(ToOwned::to_owned)),
311 ),
312 )
313 .await?;
314
315 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
316 return Err(UnsupportedVersion.into());
317 }
318
319 let telemetry_id = response
320 .agent_info
321 // Use the one the agent provides if we have one
322 .map(|info| info.name.into())
323 // Otherwise, just use the name
324 .unwrap_or_else(|| agent_id.0.to_string().into());
325
326 let session_list = if response
327 .agent_capabilities
328 .session_capabilities
329 .list
330 .is_some()
331 {
332 let list = Rc::new(AcpSessionList::new(connection.clone()));
333 *client_session_list.borrow_mut() = Some(list.clone());
334 Some(list)
335 } else {
336 None
337 };
338
339 // TODO: Remove this override once Google team releases their official auth methods
340 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
341 let mut args = command.args.clone();
342 args.retain(|a| a != "--experimental-acp" && a != "--acp");
343 let value = serde_json::json!({
344 "label": "gemini /auth",
345 "command": command.path.to_string_lossy().into_owned(),
346 "args": args,
347 "env": command.env.clone().unwrap_or_default(),
348 });
349 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
350 vec![acp::AuthMethod::Agent(
351 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
352 .description("Login with your Google or Vertex AI account")
353 .meta(meta),
354 )]
355 } else {
356 response.auth_methods
357 };
358 Ok(Self {
359 id: agent_id,
360 auth_methods,
361 command,
362 connection,
363 telemetry_id,
364 sessions,
365 agent_capabilities: response.agent_capabilities,
366 default_mode,
367 default_model,
368 default_config_options,
369 session_list,
370 _io_task: io_task,
371 _wait_task: wait_task,
372 _stderr_task: stderr_task,
373 child,
374 })
375 }
376
377 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
378 &self.agent_capabilities.prompt_capabilities
379 }
380
381 fn apply_default_config_options(
382 &self,
383 session_id: &acp::SessionId,
384 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
385 cx: &mut AsyncApp,
386 ) {
387 let id = self.id.clone();
388 let defaults_to_apply: Vec<_> = {
389 let config_opts_ref = config_options.borrow();
390 config_opts_ref
391 .iter()
392 .filter_map(|config_option| {
393 let default_value = self.default_config_options.get(&*config_option.id.0)?;
394
395 let is_valid = match &config_option.kind {
396 acp::SessionConfigKind::Select(select) => match &select.options {
397 acp::SessionConfigSelectOptions::Ungrouped(options) => options
398 .iter()
399 .any(|opt| &*opt.value.0 == default_value.as_str()),
400 acp::SessionConfigSelectOptions::Grouped(groups) => {
401 groups.iter().any(|g| {
402 g.options
403 .iter()
404 .any(|opt| &*opt.value.0 == default_value.as_str())
405 })
406 }
407 _ => false,
408 },
409 _ => false,
410 };
411
412 if is_valid {
413 let initial_value = match &config_option.kind {
414 acp::SessionConfigKind::Select(select) => {
415 Some(select.current_value.clone())
416 }
417 _ => None,
418 };
419 Some((
420 config_option.id.clone(),
421 default_value.clone(),
422 initial_value,
423 ))
424 } else {
425 log::warn!(
426 "`{}` is not a valid value for config option `{}` in {}",
427 default_value,
428 config_option.id.0,
429 id
430 );
431 None
432 }
433 })
434 .collect()
435 };
436
437 for (config_id, default_value, initial_value) in defaults_to_apply {
438 cx.spawn({
439 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
440 let session_id = session_id.clone();
441 let config_id_clone = config_id.clone();
442 let config_opts = config_options.clone();
443 let conn = self.connection.clone();
444 async move |_| {
445 let result = conn
446 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
447 session_id,
448 config_id_clone.clone(),
449 default_value_id,
450 ))
451 .await
452 .log_err();
453
454 if result.is_none() {
455 if let Some(initial) = initial_value {
456 let mut opts = config_opts.borrow_mut();
457 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
458 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
459 select.current_value = initial;
460 }
461 }
462 }
463 }
464 }
465 })
466 .detach();
467
468 let mut opts = config_options.borrow_mut();
469 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
470 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
471 select.current_value = acp::SessionConfigValueId::new(default_value);
472 }
473 }
474 }
475 }
476}
477
478impl Drop for AcpConnection {
479 fn drop(&mut self) {
480 self.child.kill().log_err();
481 }
482}
483
484fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
485 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
486}
487
488fn terminal_auth_task(
489 command: &AgentServerCommand,
490 agent_id: &AgentId,
491 method: &acp::AuthMethodTerminal,
492) -> SpawnInTerminal {
493 let mut args = command.args.clone();
494 args.extend(method.args.clone());
495
496 let mut env = command.env.clone().unwrap_or_default();
497 env.extend(method.env.clone());
498
499 acp_thread::build_terminal_auth_task(
500 terminal_auth_task_id(agent_id, &method.id),
501 method.name.clone(),
502 command.path.to_string_lossy().into_owned(),
503 args,
504 env,
505 )
506}
507
508/// Used to support the _meta method prior to stabilization
509fn meta_terminal_auth_task(
510 agent_id: &AgentId,
511 method_id: &acp::AuthMethodId,
512 method: &acp::AuthMethod,
513) -> Option<SpawnInTerminal> {
514 #[derive(Deserialize)]
515 struct MetaTerminalAuth {
516 label: String,
517 command: String,
518 #[serde(default)]
519 args: Vec<String>,
520 #[serde(default)]
521 env: HashMap<String, String>,
522 }
523
524 let meta = match method {
525 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
526 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
527 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
528 _ => None,
529 }?;
530 let terminal_auth =
531 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
532
533 Some(acp_thread::build_terminal_auth_task(
534 terminal_auth_task_id(agent_id, method_id),
535 terminal_auth.label.clone(),
536 terminal_auth.command,
537 terminal_auth.args,
538 terminal_auth.env,
539 ))
540}
541
542impl AgentConnection for AcpConnection {
543 fn agent_id(&self) -> AgentId {
544 self.id.clone()
545 }
546
547 fn telemetry_id(&self) -> SharedString {
548 self.telemetry_id.clone()
549 }
550
551 fn new_session(
552 self: Rc<Self>,
553 project: Entity<Project>,
554 work_dirs: PathList,
555 cx: &mut App,
556 ) -> Task<Result<Entity<AcpThread>>> {
557 // TODO: remove this once ACP supports multiple working directories
558 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
559 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
560 };
561 let name = self.id.0.clone();
562 let mcp_servers = mcp_servers_for_project(&project, cx);
563
564 cx.spawn(async move |cx| {
565 let response = self.connection
566 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
567 .await
568 .map_err(map_acp_error)?;
569
570 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
571
572 if let Some(default_mode) = self.default_mode.clone() {
573 if let Some(modes) = modes.as_ref() {
574 let mut modes_ref = modes.borrow_mut();
575 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
576
577 if has_mode {
578 let initial_mode_id = modes_ref.current_mode_id.clone();
579
580 cx.spawn({
581 let default_mode = default_mode.clone();
582 let session_id = response.session_id.clone();
583 let modes = modes.clone();
584 let conn = self.connection.clone();
585 async move |_| {
586 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
587 .await.log_err();
588
589 if result.is_none() {
590 modes.borrow_mut().current_mode_id = initial_mode_id;
591 }
592 }
593 }).detach();
594
595 modes_ref.current_mode_id = default_mode;
596 } else {
597 let available_modes = modes_ref
598 .available_modes
599 .iter()
600 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
601 .collect::<Vec<_>>()
602 .join("\n");
603
604 log::warn!(
605 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
606 );
607 }
608 }
609 }
610
611 if let Some(default_model) = self.default_model.clone() {
612 if let Some(models) = models.as_ref() {
613 let mut models_ref = models.borrow_mut();
614 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
615
616 if has_model {
617 let initial_model_id = models_ref.current_model_id.clone();
618
619 cx.spawn({
620 let default_model = default_model.clone();
621 let session_id = response.session_id.clone();
622 let models = models.clone();
623 let conn = self.connection.clone();
624 async move |_| {
625 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
626 .await.log_err();
627
628 if result.is_none() {
629 models.borrow_mut().current_model_id = initial_model_id;
630 }
631 }
632 }).detach();
633
634 models_ref.current_model_id = default_model;
635 } else {
636 let available_models = models_ref
637 .available_models
638 .iter()
639 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
640 .collect::<Vec<_>>()
641 .join("\n");
642
643 log::warn!(
644 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
645 );
646 }
647 }
648 }
649
650 if let Some(config_opts) = config_options.as_ref() {
651 self.apply_default_config_options(&response.session_id, config_opts, cx);
652 }
653
654 let action_log = cx.new(|_| ActionLog::new(project.clone()));
655 let thread: Entity<AcpThread> = cx.new(|cx| {
656 AcpThread::new(
657 None,
658 None,
659 Some(work_dirs),
660 self.clone(),
661 project,
662 action_log,
663 response.session_id.clone(),
664 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
665 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
666 cx,
667 )
668 });
669
670 self.sessions.borrow_mut().insert(
671 response.session_id,
672 AcpSession {
673 thread: thread.downgrade(),
674 suppress_abort_err: false,
675 session_modes: modes,
676 models,
677 config_options: config_options.map(ConfigOptions::new),
678 },
679 );
680
681 Ok(thread)
682 })
683 }
684
685 fn supports_load_session(&self) -> bool {
686 self.agent_capabilities.load_session
687 }
688
689 fn supports_resume_session(&self) -> bool {
690 self.agent_capabilities
691 .session_capabilities
692 .resume
693 .is_some()
694 }
695
696 fn load_session(
697 self: Rc<Self>,
698 session_id: acp::SessionId,
699 project: Entity<Project>,
700 work_dirs: PathList,
701 title: Option<SharedString>,
702 cx: &mut App,
703 ) -> Task<Result<Entity<AcpThread>>> {
704 if !self.agent_capabilities.load_session {
705 return Task::ready(Err(anyhow!(LoadError::Other(
706 "Loading sessions is not supported by this agent.".into()
707 ))));
708 }
709 // TODO: remove this once ACP supports multiple working directories
710 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
711 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
712 };
713
714 let mcp_servers = mcp_servers_for_project(&project, cx);
715 let action_log = cx.new(|_| ActionLog::new(project.clone()));
716 let thread: Entity<AcpThread> = cx.new(|cx| {
717 AcpThread::new(
718 None,
719 title,
720 Some(work_dirs.clone()),
721 self.clone(),
722 project,
723 action_log,
724 session_id.clone(),
725 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
726 cx,
727 )
728 });
729
730 self.sessions.borrow_mut().insert(
731 session_id.clone(),
732 AcpSession {
733 thread: thread.downgrade(),
734 suppress_abort_err: false,
735 session_modes: None,
736 models: None,
737 config_options: None,
738 },
739 );
740
741 cx.spawn(async move |cx| {
742 let response = match self
743 .connection
744 .load_session(
745 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
746 )
747 .await
748 {
749 Ok(response) => response,
750 Err(err) => {
751 self.sessions.borrow_mut().remove(&session_id);
752 return Err(map_acp_error(err));
753 }
754 };
755
756 let (modes, models, config_options) =
757 config_state(response.modes, response.models, response.config_options);
758
759 if let Some(config_opts) = config_options.as_ref() {
760 self.apply_default_config_options(&session_id, config_opts, cx);
761 }
762
763 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
764 session.session_modes = modes;
765 session.models = models;
766 session.config_options = config_options.map(ConfigOptions::new);
767 }
768
769 Ok(thread)
770 })
771 }
772
773 fn resume_session(
774 self: Rc<Self>,
775 session_id: acp::SessionId,
776 project: Entity<Project>,
777 work_dirs: PathList,
778 title: Option<SharedString>,
779 cx: &mut App,
780 ) -> Task<Result<Entity<AcpThread>>> {
781 if self
782 .agent_capabilities
783 .session_capabilities
784 .resume
785 .is_none()
786 {
787 return Task::ready(Err(anyhow!(LoadError::Other(
788 "Resuming sessions is not supported by this agent.".into()
789 ))));
790 }
791 // TODO: remove this once ACP supports multiple working directories
792 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
793 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
794 };
795
796 let mcp_servers = mcp_servers_for_project(&project, cx);
797 let action_log = cx.new(|_| ActionLog::new(project.clone()));
798 let thread: Entity<AcpThread> = cx.new(|cx| {
799 AcpThread::new(
800 None,
801 title,
802 Some(work_dirs),
803 self.clone(),
804 project,
805 action_log,
806 session_id.clone(),
807 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
808 cx,
809 )
810 });
811
812 self.sessions.borrow_mut().insert(
813 session_id.clone(),
814 AcpSession {
815 thread: thread.downgrade(),
816 suppress_abort_err: false,
817 session_modes: None,
818 models: None,
819 config_options: None,
820 },
821 );
822
823 cx.spawn(async move |cx| {
824 let response = match self
825 .connection
826 .resume_session(
827 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
828 .mcp_servers(mcp_servers),
829 )
830 .await
831 {
832 Ok(response) => response,
833 Err(err) => {
834 self.sessions.borrow_mut().remove(&session_id);
835 return Err(map_acp_error(err));
836 }
837 };
838
839 let (modes, models, config_options) =
840 config_state(response.modes, response.models, response.config_options);
841
842 if let Some(config_opts) = config_options.as_ref() {
843 self.apply_default_config_options(&session_id, config_opts, cx);
844 }
845
846 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
847 session.session_modes = modes;
848 session.models = models;
849 session.config_options = config_options.map(ConfigOptions::new);
850 }
851
852 Ok(thread)
853 })
854 }
855
856 fn supports_close_session(&self) -> bool {
857 self.agent_capabilities.session_capabilities.close.is_some()
858 }
859
860 fn close_session(
861 self: Rc<Self>,
862 session_id: &acp::SessionId,
863 cx: &mut App,
864 ) -> Task<Result<()>> {
865 if !self.supports_close_session() {
866 return Task::ready(Err(anyhow!(LoadError::Other(
867 "Closing sessions is not supported by this agent.".into()
868 ))));
869 }
870
871 let conn = self.connection.clone();
872 let session_id = session_id.clone();
873 cx.foreground_executor().spawn(async move {
874 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
875 .await?;
876 self.sessions.borrow_mut().remove(&session_id);
877 Ok(())
878 })
879 }
880
881 fn auth_methods(&self) -> &[acp::AuthMethod] {
882 &self.auth_methods
883 }
884
885 fn terminal_auth_task(
886 &self,
887 method_id: &acp::AuthMethodId,
888 cx: &App,
889 ) -> Option<SpawnInTerminal> {
890 let method = self
891 .auth_methods
892 .iter()
893 .find(|method| method.id() == method_id)?;
894
895 match method {
896 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
897 Some(terminal_auth_task(&self.command, &self.id, terminal))
898 }
899 _ => meta_terminal_auth_task(&self.id, method_id, method),
900 }
901 }
902
903 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
904 let conn = self.connection.clone();
905 cx.foreground_executor().spawn(async move {
906 conn.authenticate(acp::AuthenticateRequest::new(method_id))
907 .await?;
908 Ok(())
909 })
910 }
911
912 fn prompt(
913 &self,
914 _id: Option<acp_thread::UserMessageId>,
915 params: acp::PromptRequest,
916 cx: &mut App,
917 ) -> Task<Result<acp::PromptResponse>> {
918 let conn = self.connection.clone();
919 let sessions = self.sessions.clone();
920 let session_id = params.session_id.clone();
921 cx.foreground_executor().spawn(async move {
922 let result = conn.prompt(params).await;
923
924 let mut suppress_abort_err = false;
925
926 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
927 suppress_abort_err = session.suppress_abort_err;
928 session.suppress_abort_err = false;
929 }
930
931 match result {
932 Ok(response) => Ok(response),
933 Err(err) => {
934 if err.code == acp::ErrorCode::AuthRequired {
935 return Err(anyhow!(acp::Error::auth_required()));
936 }
937
938 if err.code != ErrorCode::InternalError {
939 anyhow::bail!(err)
940 }
941
942 let Some(data) = &err.data else {
943 anyhow::bail!(err)
944 };
945
946 // Temporary workaround until the following PR is generally available:
947 // https://github.com/google-gemini/gemini-cli/pull/6656
948
949 #[derive(Deserialize)]
950 #[serde(deny_unknown_fields)]
951 struct ErrorDetails {
952 details: Box<str>,
953 }
954
955 match serde_json::from_value(data.clone()) {
956 Ok(ErrorDetails { details }) => {
957 if suppress_abort_err
958 && (details.contains("This operation was aborted")
959 || details.contains("The user aborted a request"))
960 {
961 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
962 } else {
963 Err(anyhow!(details))
964 }
965 }
966 Err(_) => Err(anyhow!(err)),
967 }
968 }
969 }
970 })
971 }
972
973 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
974 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
975 session.suppress_abort_err = true;
976 }
977 let conn = self.connection.clone();
978 let params = acp::CancelNotification::new(session_id.clone());
979 cx.foreground_executor()
980 .spawn(async move { conn.cancel(params).await })
981 .detach();
982 }
983
984 fn session_modes(
985 &self,
986 session_id: &acp::SessionId,
987 _cx: &App,
988 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
989 let sessions = self.sessions.clone();
990 let sessions_ref = sessions.borrow();
991 let Some(session) = sessions_ref.get(session_id) else {
992 return None;
993 };
994
995 if let Some(modes) = session.session_modes.as_ref() {
996 Some(Rc::new(AcpSessionModes {
997 connection: self.connection.clone(),
998 session_id: session_id.clone(),
999 state: modes.clone(),
1000 }) as _)
1001 } else {
1002 None
1003 }
1004 }
1005
1006 fn model_selector(
1007 &self,
1008 session_id: &acp::SessionId,
1009 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1010 let sessions = self.sessions.clone();
1011 let sessions_ref = sessions.borrow();
1012 let Some(session) = sessions_ref.get(session_id) else {
1013 return None;
1014 };
1015
1016 if let Some(models) = session.models.as_ref() {
1017 Some(Rc::new(AcpModelSelector::new(
1018 session_id.clone(),
1019 self.connection.clone(),
1020 models.clone(),
1021 )) as _)
1022 } else {
1023 None
1024 }
1025 }
1026
1027 fn session_config_options(
1028 &self,
1029 session_id: &acp::SessionId,
1030 _cx: &App,
1031 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1032 let sessions = self.sessions.borrow();
1033 let session = sessions.get(session_id)?;
1034
1035 let config_opts = session.config_options.as_ref()?;
1036
1037 Some(Rc::new(AcpSessionConfigOptions {
1038 session_id: session_id.clone(),
1039 connection: self.connection.clone(),
1040 state: config_opts.config_options.clone(),
1041 watch_tx: config_opts.tx.clone(),
1042 watch_rx: config_opts.rx.clone(),
1043 }) as _)
1044 }
1045
1046 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1047 self.session_list.clone().map(|s| s as _)
1048 }
1049
1050 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1051 self
1052 }
1053}
1054
1055fn map_acp_error(err: acp::Error) -> anyhow::Error {
1056 if err.code == acp::ErrorCode::AuthRequired {
1057 let mut error = AuthRequired::new();
1058
1059 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1060 error = error.with_description(err.message);
1061 }
1062
1063 anyhow!(error)
1064 } else {
1065 anyhow!(err)
1066 }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072
1073 #[test]
1074 fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1075 let command = AgentServerCommand {
1076 path: "/path/to/agent".into(),
1077 args: vec!["--acp".into(), "--verbose".into()],
1078 env: Some(HashMap::from_iter([
1079 ("BASE".into(), "1".into()),
1080 ("SHARED".into(), "base".into()),
1081 ])),
1082 };
1083 let method = acp::AuthMethodTerminal::new("login", "Login")
1084 .args(vec!["/auth".into()])
1085 .env(std::collections::HashMap::from_iter([
1086 ("EXTRA".into(), "2".into()),
1087 ("SHARED".into(), "override".into()),
1088 ]));
1089
1090 let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1091
1092 assert_eq!(
1093 terminal_auth_task.command.as_deref(),
1094 Some("/path/to/agent")
1095 );
1096 assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1097 assert_eq!(
1098 terminal_auth_task.env,
1099 HashMap::from_iter([
1100 ("BASE".into(), "1".into()),
1101 ("SHARED".into(), "override".into()),
1102 ("EXTRA".into(), "2".into()),
1103 ])
1104 );
1105 assert_eq!(terminal_auth_task.label, "Login");
1106 assert_eq!(terminal_auth_task.command_label, "Login");
1107 }
1108
1109 #[test]
1110 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1111 let method_id = acp::AuthMethodId::new("legacy-login");
1112 let method = acp::AuthMethod::Agent(
1113 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1114 "terminal-auth".to_string(),
1115 serde_json::json!({
1116 "label": "legacy /auth",
1117 "command": "legacy-agent",
1118 "args": ["auth", "--interactive"],
1119 "env": {
1120 "AUTH_MODE": "interactive",
1121 },
1122 }),
1123 )])),
1124 );
1125
1126 let terminal_auth_task =
1127 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1128 .expect("expected legacy terminal auth task");
1129
1130 assert_eq!(
1131 terminal_auth_task.id.0,
1132 "external-agent-test-agent-legacy-login-login"
1133 );
1134 assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1135 assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1136 assert_eq!(
1137 terminal_auth_task.env,
1138 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1139 );
1140 assert_eq!(terminal_auth_task.label, "legacy /auth");
1141 }
1142
1143 #[test]
1144 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1145 let method_id = acp::AuthMethodId::new("legacy-login");
1146 let method = acp::AuthMethod::Agent(
1147 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1148 "terminal-auth".to_string(),
1149 serde_json::json!({
1150 "label": "legacy /auth",
1151 }),
1152 )])),
1153 );
1154
1155 assert!(
1156 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1157 );
1158 }
1159
1160 #[test]
1161 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1162 let method_id = acp::AuthMethodId::new("login");
1163 let method = acp::AuthMethod::Terminal(
1164 acp::AuthMethodTerminal::new(method_id, "Login")
1165 .args(vec!["/auth".into()])
1166 .env(std::collections::HashMap::from_iter([(
1167 "AUTH_MODE".into(),
1168 "first-class".into(),
1169 )]))
1170 .meta(acp::Meta::from_iter([(
1171 "terminal-auth".to_string(),
1172 serde_json::json!({
1173 "label": "legacy /auth",
1174 "command": "legacy-agent",
1175 "args": ["legacy-auth"],
1176 "env": {
1177 "AUTH_MODE": "legacy",
1178 },
1179 }),
1180 )])),
1181 );
1182
1183 let command = AgentServerCommand {
1184 path: "/path/to/agent".into(),
1185 args: vec!["--acp".into()],
1186 env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1187 };
1188
1189 let terminal_auth_task = match &method {
1190 acp::AuthMethod::Terminal(terminal) => {
1191 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1192 }
1193 _ => unreachable!(),
1194 };
1195
1196 assert_eq!(
1197 terminal_auth_task.command.as_deref(),
1198 Some("/path/to/agent")
1199 );
1200 assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1201 assert_eq!(
1202 terminal_auth_task.env,
1203 HashMap::from_iter([
1204 ("BASE".into(), "1".into()),
1205 ("AUTH_MODE".into(), "first-class".into()),
1206 ])
1207 );
1208 assert_eq!(terminal_auth_task.label, "Login");
1209 }
1210}
1211
1212fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1213 let context_server_store = project.read(cx).context_server_store().read(cx);
1214 let is_local = project.read(cx).is_local();
1215 context_server_store
1216 .configured_server_ids()
1217 .iter()
1218 .filter_map(|id| {
1219 let configuration = context_server_store.configuration_for_server(id)?;
1220 match &*configuration {
1221 project::context_server_store::ContextServerConfiguration::Custom {
1222 command,
1223 remote,
1224 ..
1225 }
1226 | project::context_server_store::ContextServerConfiguration::Extension {
1227 command,
1228 remote,
1229 ..
1230 } if is_local || *remote => Some(acp::McpServer::Stdio(
1231 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1232 .args(command.args.clone())
1233 .env(if let Some(env) = command.env.as_ref() {
1234 env.iter()
1235 .map(|(name, value)| acp::EnvVariable::new(name, value))
1236 .collect()
1237 } else {
1238 vec![]
1239 }),
1240 )),
1241 project::context_server_store::ContextServerConfiguration::Http {
1242 url,
1243 headers,
1244 timeout: _,
1245 } => Some(acp::McpServer::Http(
1246 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1247 headers
1248 .iter()
1249 .map(|(name, value)| acp::HttpHeader::new(name, value))
1250 .collect(),
1251 ),
1252 )),
1253 _ => None,
1254 }
1255 })
1256 .collect()
1257}
1258
1259fn config_state(
1260 modes: Option<acp::SessionModeState>,
1261 models: Option<acp::SessionModelState>,
1262 config_options: Option<Vec<acp::SessionConfigOption>>,
1263) -> (
1264 Option<Rc<RefCell<acp::SessionModeState>>>,
1265 Option<Rc<RefCell<acp::SessionModelState>>>,
1266 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1267) {
1268 if let Some(opts) = config_options {
1269 return (None, None, Some(Rc::new(RefCell::new(opts))));
1270 }
1271
1272 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1273 let models = models.map(|models| Rc::new(RefCell::new(models)));
1274 (modes, models, None)
1275}
1276
1277struct AcpSessionModes {
1278 session_id: acp::SessionId,
1279 connection: Rc<acp::ClientSideConnection>,
1280 state: Rc<RefCell<acp::SessionModeState>>,
1281}
1282
1283impl acp_thread::AgentSessionModes for AcpSessionModes {
1284 fn current_mode(&self) -> acp::SessionModeId {
1285 self.state.borrow().current_mode_id.clone()
1286 }
1287
1288 fn all_modes(&self) -> Vec<acp::SessionMode> {
1289 self.state.borrow().available_modes.clone()
1290 }
1291
1292 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1293 let connection = self.connection.clone();
1294 let session_id = self.session_id.clone();
1295 let old_mode_id;
1296 {
1297 let mut state = self.state.borrow_mut();
1298 old_mode_id = state.current_mode_id.clone();
1299 state.current_mode_id = mode_id.clone();
1300 };
1301 let state = self.state.clone();
1302 cx.foreground_executor().spawn(async move {
1303 let result = connection
1304 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1305 .await;
1306
1307 if result.is_err() {
1308 state.borrow_mut().current_mode_id = old_mode_id;
1309 }
1310
1311 result?;
1312
1313 Ok(())
1314 })
1315 }
1316}
1317
1318struct AcpModelSelector {
1319 session_id: acp::SessionId,
1320 connection: Rc<acp::ClientSideConnection>,
1321 state: Rc<RefCell<acp::SessionModelState>>,
1322}
1323
1324impl AcpModelSelector {
1325 fn new(
1326 session_id: acp::SessionId,
1327 connection: Rc<acp::ClientSideConnection>,
1328 state: Rc<RefCell<acp::SessionModelState>>,
1329 ) -> Self {
1330 Self {
1331 session_id,
1332 connection,
1333 state,
1334 }
1335 }
1336}
1337
1338impl acp_thread::AgentModelSelector for AcpModelSelector {
1339 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1340 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1341 self.state
1342 .borrow()
1343 .available_models
1344 .clone()
1345 .into_iter()
1346 .map(acp_thread::AgentModelInfo::from)
1347 .collect(),
1348 )))
1349 }
1350
1351 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1352 let connection = self.connection.clone();
1353 let session_id = self.session_id.clone();
1354 let old_model_id;
1355 {
1356 let mut state = self.state.borrow_mut();
1357 old_model_id = state.current_model_id.clone();
1358 state.current_model_id = model_id.clone();
1359 };
1360 let state = self.state.clone();
1361 cx.foreground_executor().spawn(async move {
1362 let result = connection
1363 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1364 .await;
1365
1366 if result.is_err() {
1367 state.borrow_mut().current_model_id = old_model_id;
1368 }
1369
1370 result?;
1371
1372 Ok(())
1373 })
1374 }
1375
1376 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1377 let state = self.state.borrow();
1378 Task::ready(
1379 state
1380 .available_models
1381 .iter()
1382 .find(|m| m.model_id == state.current_model_id)
1383 .cloned()
1384 .map(acp_thread::AgentModelInfo::from)
1385 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1386 )
1387 }
1388}
1389
1390struct AcpSessionConfigOptions {
1391 session_id: acp::SessionId,
1392 connection: Rc<acp::ClientSideConnection>,
1393 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1394 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1395 watch_rx: watch::Receiver<()>,
1396}
1397
1398impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1399 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1400 self.state.borrow().clone()
1401 }
1402
1403 fn set_config_option(
1404 &self,
1405 config_id: acp::SessionConfigId,
1406 value: acp::SessionConfigValueId,
1407 cx: &mut App,
1408 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1409 let connection = self.connection.clone();
1410 let session_id = self.session_id.clone();
1411 let state = self.state.clone();
1412
1413 let watch_tx = self.watch_tx.clone();
1414
1415 cx.foreground_executor().spawn(async move {
1416 let response = connection
1417 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1418 session_id, config_id, value,
1419 ))
1420 .await?;
1421
1422 *state.borrow_mut() = response.config_options.clone();
1423 watch_tx.borrow_mut().send(()).ok();
1424 Ok(response.config_options)
1425 })
1426 }
1427
1428 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1429 Some(self.watch_rx.clone())
1430 }
1431}
1432
1433struct ClientDelegate {
1434 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1435 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1436 cx: AsyncApp,
1437}
1438
1439#[async_trait::async_trait(?Send)]
1440impl acp::Client for ClientDelegate {
1441 async fn request_permission(
1442 &self,
1443 arguments: acp::RequestPermissionRequest,
1444 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1445 let thread;
1446 {
1447 let sessions_ref = self.sessions.borrow();
1448 let session = sessions_ref
1449 .get(&arguments.session_id)
1450 .context("Failed to get session")?;
1451 thread = session.thread.clone();
1452 }
1453
1454 let cx = &mut self.cx.clone();
1455
1456 let task = thread.update(cx, |thread, cx| {
1457 thread.request_tool_call_authorization(
1458 arguments.tool_call,
1459 acp_thread::PermissionOptions::Flat(arguments.options),
1460 cx,
1461 )
1462 })??;
1463
1464 let outcome = task.await;
1465
1466 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1467 }
1468
1469 async fn write_text_file(
1470 &self,
1471 arguments: acp::WriteTextFileRequest,
1472 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1473 let cx = &mut self.cx.clone();
1474 let task = self
1475 .session_thread(&arguments.session_id)?
1476 .update(cx, |thread, cx| {
1477 thread.write_text_file(arguments.path, arguments.content, cx)
1478 })?;
1479
1480 task.await?;
1481
1482 Ok(Default::default())
1483 }
1484
1485 async fn read_text_file(
1486 &self,
1487 arguments: acp::ReadTextFileRequest,
1488 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1489 let task = self.session_thread(&arguments.session_id)?.update(
1490 &mut self.cx.clone(),
1491 |thread, cx| {
1492 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1493 },
1494 )?;
1495
1496 let content = task.await?;
1497
1498 Ok(acp::ReadTextFileResponse::new(content))
1499 }
1500
1501 async fn session_notification(
1502 &self,
1503 notification: acp::SessionNotification,
1504 ) -> Result<(), acp::Error> {
1505 let sessions = self.sessions.borrow();
1506 let session = sessions
1507 .get(¬ification.session_id)
1508 .context("Failed to get session")?;
1509
1510 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1511 current_mode_id,
1512 ..
1513 }) = ¬ification.update
1514 {
1515 if let Some(session_modes) = &session.session_modes {
1516 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1517 }
1518 }
1519
1520 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1521 config_options,
1522 ..
1523 }) = ¬ification.update
1524 {
1525 if let Some(opts) = &session.config_options {
1526 *opts.config_options.borrow_mut() = config_options.clone();
1527 opts.tx.borrow_mut().send(()).ok();
1528 }
1529 }
1530
1531 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1532 && let Some(session_list) = self.session_list.borrow().as_ref()
1533 {
1534 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1535 }
1536
1537 // Clone so we can inspect meta both before and after handing off to the thread
1538 let update_clone = notification.update.clone();
1539
1540 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1541 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1542 if let Some(meta) = &tc.meta {
1543 if let Some(terminal_info) = meta.get("terminal_info") {
1544 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1545 {
1546 let terminal_id = acp::TerminalId::new(id_str);
1547 let cwd = terminal_info
1548 .get("cwd")
1549 .and_then(|v| v.as_str().map(PathBuf::from));
1550
1551 // Create a minimal display-only lower-level terminal and register it.
1552 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1553 let builder = TerminalBuilder::new_display_only(
1554 CursorShape::default(),
1555 AlternateScroll::On,
1556 None,
1557 0,
1558 cx.background_executor(),
1559 thread.project().read(cx).path_style(cx),
1560 )?;
1561 let lower = cx.new(|cx| builder.subscribe(cx));
1562 thread.on_terminal_provider_event(
1563 TerminalProviderEvent::Created {
1564 terminal_id,
1565 label: tc.title.clone(),
1566 cwd,
1567 output_byte_limit: None,
1568 terminal: lower,
1569 },
1570 cx,
1571 );
1572 anyhow::Ok(())
1573 });
1574 }
1575 }
1576 }
1577 }
1578
1579 // Forward the update to the acp_thread as usual.
1580 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1581 thread.handle_session_update(notification.update.clone(), cx)
1582 })??;
1583
1584 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1585 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1586 if let Some(meta) = &tcu.meta {
1587 if let Some(term_out) = meta.get("terminal_output") {
1588 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1589 let terminal_id = acp::TerminalId::new(id_str);
1590 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1591 let data = s.as_bytes().to_vec();
1592 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1593 thread.on_terminal_provider_event(
1594 TerminalProviderEvent::Output { terminal_id, data },
1595 cx,
1596 );
1597 });
1598 }
1599 }
1600 }
1601
1602 // terminal_exit
1603 if let Some(term_exit) = meta.get("terminal_exit") {
1604 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1605 let terminal_id = acp::TerminalId::new(id_str);
1606 let status = acp::TerminalExitStatus::new()
1607 .exit_code(
1608 term_exit
1609 .get("exit_code")
1610 .and_then(|v| v.as_u64())
1611 .map(|i| i as u32),
1612 )
1613 .signal(
1614 term_exit
1615 .get("signal")
1616 .and_then(|v| v.as_str().map(|s| s.to_string())),
1617 );
1618
1619 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1620 thread.on_terminal_provider_event(
1621 TerminalProviderEvent::Exit {
1622 terminal_id,
1623 status,
1624 },
1625 cx,
1626 );
1627 });
1628 }
1629 }
1630 }
1631 }
1632
1633 Ok(())
1634 }
1635
1636 async fn create_terminal(
1637 &self,
1638 args: acp::CreateTerminalRequest,
1639 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1640 let thread = self.session_thread(&args.session_id)?;
1641 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1642
1643 let terminal_entity = acp_thread::create_terminal_entity(
1644 args.command.clone(),
1645 &args.args,
1646 args.env
1647 .into_iter()
1648 .map(|env| (env.name, env.value))
1649 .collect(),
1650 args.cwd.clone(),
1651 &project,
1652 &mut self.cx.clone(),
1653 )
1654 .await?;
1655
1656 // Register with renderer
1657 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1658 thread.register_terminal_created(
1659 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1660 format!("{} {}", args.command, args.args.join(" ")),
1661 args.cwd.clone(),
1662 args.output_byte_limit,
1663 terminal_entity,
1664 cx,
1665 )
1666 })?;
1667 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1668 Ok(acp::CreateTerminalResponse::new(terminal_id))
1669 }
1670
1671 async fn kill_terminal(
1672 &self,
1673 args: acp::KillTerminalRequest,
1674 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1675 self.session_thread(&args.session_id)?
1676 .update(&mut self.cx.clone(), |thread, cx| {
1677 thread.kill_terminal(args.terminal_id, cx)
1678 })??;
1679
1680 Ok(Default::default())
1681 }
1682
1683 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1684 Err(acp::Error::method_not_found())
1685 }
1686
1687 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1688 Err(acp::Error::method_not_found())
1689 }
1690
1691 async fn release_terminal(
1692 &self,
1693 args: acp::ReleaseTerminalRequest,
1694 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1695 self.session_thread(&args.session_id)?
1696 .update(&mut self.cx.clone(), |thread, cx| {
1697 thread.release_terminal(args.terminal_id, cx)
1698 })??;
1699
1700 Ok(Default::default())
1701 }
1702
1703 async fn terminal_output(
1704 &self,
1705 args: acp::TerminalOutputRequest,
1706 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1707 self.session_thread(&args.session_id)?
1708 .read_with(&mut self.cx.clone(), |thread, cx| {
1709 let out = thread
1710 .terminal(args.terminal_id)?
1711 .read(cx)
1712 .current_output(cx);
1713
1714 Ok(out)
1715 })?
1716 }
1717
1718 async fn wait_for_terminal_exit(
1719 &self,
1720 args: acp::WaitForTerminalExitRequest,
1721 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1722 let exit_status = self
1723 .session_thread(&args.session_id)?
1724 .update(&mut self.cx.clone(), |thread, cx| {
1725 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1726 })??
1727 .await;
1728
1729 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1730 }
1731}
1732
1733impl ClientDelegate {
1734 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1735 let sessions = self.sessions.borrow();
1736 sessions
1737 .get(session_id)
1738 .context("Failed to get session")
1739 .map(|session| session.thread.clone())
1740 }
1741}