1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::agent_server_store::AgentServerCommand;
14use project::{AgentId, Project};
15use serde::Deserialize;
16use settings::Settings as _;
17use task::{ShellBuilder, SpawnInTerminal};
18use util::ResultExt as _;
19use util::path_list::PathList;
20use util::process::Child;
21
22use std::path::PathBuf;
23use std::process::Stdio;
24use std::rc::Rc;
25use std::{any::Any, cell::RefCell};
26use thiserror::Error;
27
28use anyhow::{Context as _, Result};
29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
30
31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
32use terminal::TerminalBuilder;
33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
34
35use crate::GEMINI_ID;
36
37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
38
39#[derive(Debug, Error)]
40#[error("Unsupported version")]
41pub struct UnsupportedVersion;
42
43pub struct AcpConnection {
44 id: AgentId,
45 display_name: SharedString,
46 telemetry_id: SharedString,
47 connection: Rc<acp::ClientSideConnection>,
48 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
49 auth_methods: Vec<acp::AuthMethod>,
50 command: AgentServerCommand,
51 agent_capabilities: acp::AgentCapabilities,
52 default_mode: Option<acp::SessionModeId>,
53 default_model: Option<acp::ModelId>,
54 default_config_options: HashMap<String, String>,
55 child: Child,
56 session_list: Option<Rc<AcpSessionList>>,
57 _io_task: Task<Result<(), acp::Error>>,
58 _wait_task: Task<Result<()>>,
59 _stderr_task: Task<Result<()>>,
60}
61
62struct ConfigOptions {
63 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
64 tx: Rc<RefCell<watch::Sender<()>>>,
65 rx: watch::Receiver<()>,
66}
67
68impl ConfigOptions {
69 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
70 let (tx, rx) = watch::channel(());
71 Self {
72 config_options,
73 tx: Rc::new(RefCell::new(tx)),
74 rx,
75 }
76 }
77}
78
79pub struct AcpSession {
80 thread: WeakEntity<AcpThread>,
81 suppress_abort_err: bool,
82 models: Option<Rc<RefCell<acp::SessionModelState>>>,
83 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
84 config_options: Option<ConfigOptions>,
85}
86
87pub struct AcpSessionList {
88 connection: Rc<acp::ClientSideConnection>,
89 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
90 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
91}
92
93impl AcpSessionList {
94 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
95 let (tx, rx) = smol::channel::unbounded();
96 Self {
97 connection,
98 updates_tx: tx,
99 updates_rx: rx,
100 }
101 }
102
103 fn notify_update(&self) {
104 self.updates_tx
105 .try_send(acp_thread::SessionListUpdate::Refresh)
106 .log_err();
107 }
108
109 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
110 self.updates_tx
111 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
112 .log_err();
113 }
114}
115
116impl AgentSessionList for AcpSessionList {
117 fn list_sessions(
118 &self,
119 request: AgentSessionListRequest,
120 cx: &mut App,
121 ) -> Task<Result<AgentSessionListResponse>> {
122 let conn = self.connection.clone();
123 cx.foreground_executor().spawn(async move {
124 let acp_request = acp::ListSessionsRequest::new()
125 .cwd(request.cwd)
126 .cursor(request.cursor);
127 let response = conn.list_sessions(acp_request).await?;
128 Ok(AgentSessionListResponse {
129 sessions: response
130 .sessions
131 .into_iter()
132 .map(|s| AgentSessionInfo {
133 session_id: s.session_id,
134 work_dirs: Some(PathList::new(&[s.cwd])),
135 title: s.title.map(Into::into),
136 updated_at: s.updated_at.and_then(|date_str| {
137 chrono::DateTime::parse_from_rfc3339(&date_str)
138 .ok()
139 .map(|dt| dt.with_timezone(&chrono::Utc))
140 }),
141 created_at: None,
142 meta: s.meta,
143 })
144 .collect(),
145 next_cursor: response.next_cursor,
146 meta: response.meta,
147 })
148 })
149 }
150
151 fn watch(
152 &self,
153 _cx: &mut App,
154 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
155 Some(self.updates_rx.clone())
156 }
157
158 fn notify_refresh(&self) {
159 self.notify_update();
160 }
161
162 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
163 self
164 }
165}
166
167pub async fn connect(
168 agent_id: AgentId,
169 project: Entity<Project>,
170 display_name: SharedString,
171 command: AgentServerCommand,
172 default_mode: Option<acp::SessionModeId>,
173 default_model: Option<acp::ModelId>,
174 default_config_options: HashMap<String, String>,
175 cx: &mut AsyncApp,
176) -> Result<Rc<dyn AgentConnection>> {
177 let conn = AcpConnection::stdio(
178 agent_id,
179 project,
180 display_name,
181 command.clone(),
182 default_mode,
183 default_model,
184 default_config_options,
185 cx,
186 )
187 .await?;
188 Ok(Rc::new(conn) as _)
189}
190
191const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
192
193impl AcpConnection {
194 pub async fn stdio(
195 agent_id: AgentId,
196 project: Entity<Project>,
197 display_name: SharedString,
198 command: AgentServerCommand,
199 default_mode: Option<acp::SessionModeId>,
200 default_model: Option<acp::ModelId>,
201 default_config_options: HashMap<String, String>,
202 cx: &mut AsyncApp,
203 ) -> Result<Self> {
204 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
205 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
206 let mut child =
207 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
208 child.envs(command.env.iter().flatten());
209 if let Some(cwd) = project.update(cx, |project, cx| {
210 project
211 .default_path_list(cx)
212 .ordered_paths()
213 .next()
214 .cloned()
215 }) {
216 child.current_dir(cwd);
217 }
218 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
219
220 let stdout = child.stdout.take().context("Failed to take stdout")?;
221 let stdin = child.stdin.take().context("Failed to take stdin")?;
222 let stderr = child.stderr.take().context("Failed to take stderr")?;
223 log::debug!(
224 "Spawning external agent server: {:?}, {:?}",
225 command.path,
226 command.args
227 );
228 log::trace!("Spawned (pid: {})", child.id());
229
230 let sessions = Rc::new(RefCell::new(HashMap::default()));
231
232 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
233 (
234 release_channel::ReleaseChannel::try_global(cx)
235 .map(|release_channel| release_channel.display_name()),
236 release_channel::AppVersion::global(cx).to_string(),
237 )
238 });
239
240 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
241 Rc::new(RefCell::new(None));
242
243 let client = ClientDelegate {
244 sessions: sessions.clone(),
245 session_list: client_session_list.clone(),
246 cx: cx.clone(),
247 };
248 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
249 let foreground_executor = cx.foreground_executor().clone();
250 move |fut| {
251 foreground_executor.spawn(fut).detach();
252 }
253 });
254
255 let io_task = cx.background_spawn(io_task);
256
257 let stderr_task = cx.background_spawn(async move {
258 let mut stderr = BufReader::new(stderr);
259 let mut line = String::new();
260 while let Ok(n) = stderr.read_line(&mut line).await
261 && n > 0
262 {
263 log::warn!("agent stderr: {}", line.trim());
264 line.clear();
265 }
266 Ok(())
267 });
268
269 let wait_task = cx.spawn({
270 let sessions = sessions.clone();
271 let status_fut = child.status();
272 async move |cx| {
273 let status = status_fut.await?;
274
275 for session in sessions.borrow().values() {
276 session
277 .thread
278 .update(cx, |thread, cx| {
279 thread.emit_load_error(LoadError::Exited { status }, cx)
280 })
281 .ok();
282 }
283
284 anyhow::Ok(())
285 }
286 });
287
288 let connection = Rc::new(connection);
289
290 cx.update(|cx| {
291 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
292 registry.set_active_connection(agent_id.clone(), &connection, cx)
293 });
294 });
295
296 let response = connection
297 .initialize(
298 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
299 .client_capabilities(
300 acp::ClientCapabilities::new()
301 .fs(acp::FileSystemCapabilities::new()
302 .read_text_file(true)
303 .write_text_file(true))
304 .terminal(true)
305 .auth(acp::AuthCapabilities::new().terminal(true))
306 // Experimental: Allow for rendering terminal output from the agents
307 .meta(acp::Meta::from_iter([
308 ("terminal_output".into(), true.into()),
309 ("terminal-auth".into(), true.into()),
310 ])),
311 )
312 .client_info(
313 acp::Implementation::new("zed", version)
314 .title(release_channel.map(ToOwned::to_owned)),
315 ),
316 )
317 .await?;
318
319 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
320 return Err(UnsupportedVersion.into());
321 }
322
323 let telemetry_id = response
324 .agent_info
325 // Use the one the agent provides if we have one
326 .map(|info| info.name.into())
327 // Otherwise, just use the name
328 .unwrap_or_else(|| agent_id.0.to_string().into());
329
330 let session_list = if response
331 .agent_capabilities
332 .session_capabilities
333 .list
334 .is_some()
335 {
336 let list = Rc::new(AcpSessionList::new(connection.clone()));
337 *client_session_list.borrow_mut() = Some(list.clone());
338 Some(list)
339 } else {
340 None
341 };
342
343 // TODO: Remove this override once Google team releases their official auth methods
344 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
345 let mut args = command.args.clone();
346 args.retain(|a| a != "--experimental-acp" && a != "--acp");
347 let value = serde_json::json!({
348 "label": "gemini /auth",
349 "command": command.path.to_string_lossy().into_owned(),
350 "args": args,
351 "env": command.env.clone().unwrap_or_default(),
352 });
353 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
354 vec![acp::AuthMethod::Agent(
355 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
356 .description("Login with your Google or Vertex AI account")
357 .meta(meta),
358 )]
359 } else {
360 response.auth_methods
361 };
362 Ok(Self {
363 id: agent_id,
364 auth_methods,
365 command,
366 connection,
367 display_name,
368 telemetry_id,
369 sessions,
370 agent_capabilities: response.agent_capabilities,
371 default_mode,
372 default_model,
373 default_config_options,
374 session_list,
375 _io_task: io_task,
376 _wait_task: wait_task,
377 _stderr_task: stderr_task,
378 child,
379 })
380 }
381
382 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
383 &self.agent_capabilities.prompt_capabilities
384 }
385
386 fn apply_default_config_options(
387 &self,
388 session_id: &acp::SessionId,
389 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
390 cx: &mut AsyncApp,
391 ) {
392 let id = self.id.clone();
393 let defaults_to_apply: Vec<_> = {
394 let config_opts_ref = config_options.borrow();
395 config_opts_ref
396 .iter()
397 .filter_map(|config_option| {
398 let default_value = self.default_config_options.get(&*config_option.id.0)?;
399
400 let is_valid = match &config_option.kind {
401 acp::SessionConfigKind::Select(select) => match &select.options {
402 acp::SessionConfigSelectOptions::Ungrouped(options) => options
403 .iter()
404 .any(|opt| &*opt.value.0 == default_value.as_str()),
405 acp::SessionConfigSelectOptions::Grouped(groups) => {
406 groups.iter().any(|g| {
407 g.options
408 .iter()
409 .any(|opt| &*opt.value.0 == default_value.as_str())
410 })
411 }
412 _ => false,
413 },
414 _ => false,
415 };
416
417 if is_valid {
418 let initial_value = match &config_option.kind {
419 acp::SessionConfigKind::Select(select) => {
420 Some(select.current_value.clone())
421 }
422 _ => None,
423 };
424 Some((
425 config_option.id.clone(),
426 default_value.clone(),
427 initial_value,
428 ))
429 } else {
430 log::warn!(
431 "`{}` is not a valid value for config option `{}` in {}",
432 default_value,
433 config_option.id.0,
434 id
435 );
436 None
437 }
438 })
439 .collect()
440 };
441
442 for (config_id, default_value, initial_value) in defaults_to_apply {
443 cx.spawn({
444 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
445 let session_id = session_id.clone();
446 let config_id_clone = config_id.clone();
447 let config_opts = config_options.clone();
448 let conn = self.connection.clone();
449 async move |_| {
450 let result = conn
451 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
452 session_id,
453 config_id_clone.clone(),
454 default_value_id,
455 ))
456 .await
457 .log_err();
458
459 if result.is_none() {
460 if let Some(initial) = initial_value {
461 let mut opts = config_opts.borrow_mut();
462 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
463 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
464 select.current_value = initial;
465 }
466 }
467 }
468 }
469 }
470 })
471 .detach();
472
473 let mut opts = config_options.borrow_mut();
474 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
475 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
476 select.current_value = acp::SessionConfigValueId::new(default_value);
477 }
478 }
479 }
480 }
481}
482
483impl Drop for AcpConnection {
484 fn drop(&mut self) {
485 self.child.kill().log_err();
486 }
487}
488
489fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
490 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
491}
492
493fn terminal_auth_task(
494 command: &AgentServerCommand,
495 agent_id: &AgentId,
496 method: &acp::AuthMethodTerminal,
497) -> SpawnInTerminal {
498 let mut args = command.args.clone();
499 args.extend(method.args.clone());
500
501 let mut env = command.env.clone().unwrap_or_default();
502 env.extend(method.env.clone());
503
504 acp_thread::build_terminal_auth_task(
505 terminal_auth_task_id(agent_id, &method.id),
506 method.name.clone(),
507 command.path.to_string_lossy().into_owned(),
508 args,
509 env,
510 )
511}
512
513/// Used to support the _meta method prior to stabilization
514fn meta_terminal_auth_task(
515 agent_id: &AgentId,
516 method_id: &acp::AuthMethodId,
517 method: &acp::AuthMethod,
518) -> Option<SpawnInTerminal> {
519 #[derive(Deserialize)]
520 struct MetaTerminalAuth {
521 label: String,
522 command: String,
523 #[serde(default)]
524 args: Vec<String>,
525 #[serde(default)]
526 env: HashMap<String, String>,
527 }
528
529 let meta = match method {
530 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
531 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
532 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
533 _ => None,
534 }?;
535 let terminal_auth =
536 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
537
538 Some(acp_thread::build_terminal_auth_task(
539 terminal_auth_task_id(agent_id, method_id),
540 terminal_auth.label.clone(),
541 terminal_auth.command,
542 terminal_auth.args,
543 terminal_auth.env,
544 ))
545}
546
547impl AgentConnection for AcpConnection {
548 fn agent_id(&self) -> AgentId {
549 self.id.clone()
550 }
551
552 fn telemetry_id(&self) -> SharedString {
553 self.telemetry_id.clone()
554 }
555
556 fn new_session(
557 self: Rc<Self>,
558 project: Entity<Project>,
559 work_dirs: PathList,
560 cx: &mut App,
561 ) -> Task<Result<Entity<AcpThread>>> {
562 // TODO: remove this once ACP supports multiple working directories
563 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
564 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
565 };
566 let name = self.id.0.clone();
567 let mcp_servers = mcp_servers_for_project(&project, cx);
568
569 cx.spawn(async move |cx| {
570 let response = self.connection
571 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
572 .await
573 .map_err(map_acp_error)?;
574
575 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
576
577 if let Some(default_mode) = self.default_mode.clone() {
578 if let Some(modes) = modes.as_ref() {
579 let mut modes_ref = modes.borrow_mut();
580 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
581
582 if has_mode {
583 let initial_mode_id = modes_ref.current_mode_id.clone();
584
585 cx.spawn({
586 let default_mode = default_mode.clone();
587 let session_id = response.session_id.clone();
588 let modes = modes.clone();
589 let conn = self.connection.clone();
590 async move |_| {
591 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
592 .await.log_err();
593
594 if result.is_none() {
595 modes.borrow_mut().current_mode_id = initial_mode_id;
596 }
597 }
598 }).detach();
599
600 modes_ref.current_mode_id = default_mode;
601 } else {
602 let available_modes = modes_ref
603 .available_modes
604 .iter()
605 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
606 .collect::<Vec<_>>()
607 .join("\n");
608
609 log::warn!(
610 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
611 );
612 }
613 }
614 }
615
616 if let Some(default_model) = self.default_model.clone() {
617 if let Some(models) = models.as_ref() {
618 let mut models_ref = models.borrow_mut();
619 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
620
621 if has_model {
622 let initial_model_id = models_ref.current_model_id.clone();
623
624 cx.spawn({
625 let default_model = default_model.clone();
626 let session_id = response.session_id.clone();
627 let models = models.clone();
628 let conn = self.connection.clone();
629 async move |_| {
630 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
631 .await.log_err();
632
633 if result.is_none() {
634 models.borrow_mut().current_model_id = initial_model_id;
635 }
636 }
637 }).detach();
638
639 models_ref.current_model_id = default_model;
640 } else {
641 let available_models = models_ref
642 .available_models
643 .iter()
644 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
645 .collect::<Vec<_>>()
646 .join("\n");
647
648 log::warn!(
649 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
650 );
651 }
652 }
653 }
654
655 if let Some(config_opts) = config_options.as_ref() {
656 self.apply_default_config_options(&response.session_id, config_opts, cx);
657 }
658
659 let action_log = cx.new(|_| ActionLog::new(project.clone()));
660 let thread: Entity<AcpThread> = cx.new(|cx| {
661 AcpThread::new(
662 None,
663 self.display_name.clone(),
664 Some(work_dirs),
665 self.clone(),
666 project,
667 action_log,
668 response.session_id.clone(),
669 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
670 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
671 cx,
672 )
673 });
674
675 self.sessions.borrow_mut().insert(
676 response.session_id,
677 AcpSession {
678 thread: thread.downgrade(),
679 suppress_abort_err: false,
680 session_modes: modes,
681 models,
682 config_options: config_options.map(ConfigOptions::new),
683 },
684 );
685
686 Ok(thread)
687 })
688 }
689
690 fn supports_load_session(&self) -> bool {
691 self.agent_capabilities.load_session
692 }
693
694 fn supports_resume_session(&self) -> bool {
695 self.agent_capabilities
696 .session_capabilities
697 .resume
698 .is_some()
699 }
700
701 fn load_session(
702 self: Rc<Self>,
703 session_id: acp::SessionId,
704 project: Entity<Project>,
705 work_dirs: PathList,
706 title: Option<SharedString>,
707 cx: &mut App,
708 ) -> Task<Result<Entity<AcpThread>>> {
709 if !self.agent_capabilities.load_session {
710 return Task::ready(Err(anyhow!(LoadError::Other(
711 "Loading sessions is not supported by this agent.".into()
712 ))));
713 }
714 // TODO: remove this once ACP supports multiple working directories
715 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
716 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
717 };
718
719 let mcp_servers = mcp_servers_for_project(&project, cx);
720 let action_log = cx.new(|_| ActionLog::new(project.clone()));
721 let title = title.unwrap_or_else(|| self.display_name.clone());
722 let thread: Entity<AcpThread> = cx.new(|cx| {
723 AcpThread::new(
724 None,
725 title,
726 Some(work_dirs.clone()),
727 self.clone(),
728 project,
729 action_log,
730 session_id.clone(),
731 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
732 cx,
733 )
734 });
735
736 self.sessions.borrow_mut().insert(
737 session_id.clone(),
738 AcpSession {
739 thread: thread.downgrade(),
740 suppress_abort_err: false,
741 session_modes: None,
742 models: None,
743 config_options: None,
744 },
745 );
746
747 cx.spawn(async move |cx| {
748 let response = match self
749 .connection
750 .load_session(
751 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
752 )
753 .await
754 {
755 Ok(response) => response,
756 Err(err) => {
757 self.sessions.borrow_mut().remove(&session_id);
758 return Err(map_acp_error(err));
759 }
760 };
761
762 let (modes, models, config_options) =
763 config_state(response.modes, response.models, response.config_options);
764
765 if let Some(config_opts) = config_options.as_ref() {
766 self.apply_default_config_options(&session_id, config_opts, cx);
767 }
768
769 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
770 session.session_modes = modes;
771 session.models = models;
772 session.config_options = config_options.map(ConfigOptions::new);
773 }
774
775 Ok(thread)
776 })
777 }
778
779 fn resume_session(
780 self: Rc<Self>,
781 session_id: acp::SessionId,
782 project: Entity<Project>,
783 work_dirs: PathList,
784 title: Option<SharedString>,
785 cx: &mut App,
786 ) -> Task<Result<Entity<AcpThread>>> {
787 if self
788 .agent_capabilities
789 .session_capabilities
790 .resume
791 .is_none()
792 {
793 return Task::ready(Err(anyhow!(LoadError::Other(
794 "Resuming sessions is not supported by this agent.".into()
795 ))));
796 }
797 // TODO: remove this once ACP supports multiple working directories
798 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
799 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
800 };
801
802 let mcp_servers = mcp_servers_for_project(&project, cx);
803 let action_log = cx.new(|_| ActionLog::new(project.clone()));
804 let title = title.unwrap_or_else(|| self.display_name.clone());
805 let thread: Entity<AcpThread> = cx.new(|cx| {
806 AcpThread::new(
807 None,
808 title,
809 Some(work_dirs),
810 self.clone(),
811 project,
812 action_log,
813 session_id.clone(),
814 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
815 cx,
816 )
817 });
818
819 self.sessions.borrow_mut().insert(
820 session_id.clone(),
821 AcpSession {
822 thread: thread.downgrade(),
823 suppress_abort_err: false,
824 session_modes: None,
825 models: None,
826 config_options: None,
827 },
828 );
829
830 cx.spawn(async move |cx| {
831 let response = match self
832 .connection
833 .resume_session(
834 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
835 .mcp_servers(mcp_servers),
836 )
837 .await
838 {
839 Ok(response) => response,
840 Err(err) => {
841 self.sessions.borrow_mut().remove(&session_id);
842 return Err(map_acp_error(err));
843 }
844 };
845
846 let (modes, models, config_options) =
847 config_state(response.modes, response.models, response.config_options);
848
849 if let Some(config_opts) = config_options.as_ref() {
850 self.apply_default_config_options(&session_id, config_opts, cx);
851 }
852
853 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
854 session.session_modes = modes;
855 session.models = models;
856 session.config_options = config_options.map(ConfigOptions::new);
857 }
858
859 Ok(thread)
860 })
861 }
862
863 fn supports_close_session(&self) -> bool {
864 self.agent_capabilities.session_capabilities.close.is_some()
865 }
866
867 fn close_session(
868 self: Rc<Self>,
869 session_id: &acp::SessionId,
870 cx: &mut App,
871 ) -> Task<Result<()>> {
872 if !self.supports_close_session() {
873 return Task::ready(Err(anyhow!(LoadError::Other(
874 "Closing sessions is not supported by this agent.".into()
875 ))));
876 }
877
878 let conn = self.connection.clone();
879 let session_id = session_id.clone();
880 cx.foreground_executor().spawn(async move {
881 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
882 .await?;
883 self.sessions.borrow_mut().remove(&session_id);
884 Ok(())
885 })
886 }
887
888 fn auth_methods(&self) -> &[acp::AuthMethod] {
889 &self.auth_methods
890 }
891
892 fn terminal_auth_task(
893 &self,
894 method_id: &acp::AuthMethodId,
895 cx: &App,
896 ) -> Option<SpawnInTerminal> {
897 let method = self
898 .auth_methods
899 .iter()
900 .find(|method| method.id() == method_id)?;
901
902 match method {
903 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
904 Some(terminal_auth_task(&self.command, &self.id, terminal))
905 }
906 _ => meta_terminal_auth_task(&self.id, method_id, method),
907 }
908 }
909
910 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
911 let conn = self.connection.clone();
912 cx.foreground_executor().spawn(async move {
913 conn.authenticate(acp::AuthenticateRequest::new(method_id))
914 .await?;
915 Ok(())
916 })
917 }
918
919 fn prompt(
920 &self,
921 _id: Option<acp_thread::UserMessageId>,
922 params: acp::PromptRequest,
923 cx: &mut App,
924 ) -> Task<Result<acp::PromptResponse>> {
925 let conn = self.connection.clone();
926 let sessions = self.sessions.clone();
927 let session_id = params.session_id.clone();
928 cx.foreground_executor().spawn(async move {
929 let result = conn.prompt(params).await;
930
931 let mut suppress_abort_err = false;
932
933 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
934 suppress_abort_err = session.suppress_abort_err;
935 session.suppress_abort_err = false;
936 }
937
938 match result {
939 Ok(response) => Ok(response),
940 Err(err) => {
941 if err.code == acp::ErrorCode::AuthRequired {
942 return Err(anyhow!(acp::Error::auth_required()));
943 }
944
945 if err.code != ErrorCode::InternalError {
946 anyhow::bail!(err)
947 }
948
949 let Some(data) = &err.data else {
950 anyhow::bail!(err)
951 };
952
953 // Temporary workaround until the following PR is generally available:
954 // https://github.com/google-gemini/gemini-cli/pull/6656
955
956 #[derive(Deserialize)]
957 #[serde(deny_unknown_fields)]
958 struct ErrorDetails {
959 details: Box<str>,
960 }
961
962 match serde_json::from_value(data.clone()) {
963 Ok(ErrorDetails { details }) => {
964 if suppress_abort_err
965 && (details.contains("This operation was aborted")
966 || details.contains("The user aborted a request"))
967 {
968 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
969 } else {
970 Err(anyhow!(details))
971 }
972 }
973 Err(_) => Err(anyhow!(err)),
974 }
975 }
976 }
977 })
978 }
979
980 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
981 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
982 session.suppress_abort_err = true;
983 }
984 let conn = self.connection.clone();
985 let params = acp::CancelNotification::new(session_id.clone());
986 cx.foreground_executor()
987 .spawn(async move { conn.cancel(params).await })
988 .detach();
989 }
990
991 fn session_modes(
992 &self,
993 session_id: &acp::SessionId,
994 _cx: &App,
995 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
996 let sessions = self.sessions.clone();
997 let sessions_ref = sessions.borrow();
998 let Some(session) = sessions_ref.get(session_id) else {
999 return None;
1000 };
1001
1002 if let Some(modes) = session.session_modes.as_ref() {
1003 Some(Rc::new(AcpSessionModes {
1004 connection: self.connection.clone(),
1005 session_id: session_id.clone(),
1006 state: modes.clone(),
1007 }) as _)
1008 } else {
1009 None
1010 }
1011 }
1012
1013 fn model_selector(
1014 &self,
1015 session_id: &acp::SessionId,
1016 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1017 let sessions = self.sessions.clone();
1018 let sessions_ref = sessions.borrow();
1019 let Some(session) = sessions_ref.get(session_id) else {
1020 return None;
1021 };
1022
1023 if let Some(models) = session.models.as_ref() {
1024 Some(Rc::new(AcpModelSelector::new(
1025 session_id.clone(),
1026 self.connection.clone(),
1027 models.clone(),
1028 )) as _)
1029 } else {
1030 None
1031 }
1032 }
1033
1034 fn session_config_options(
1035 &self,
1036 session_id: &acp::SessionId,
1037 _cx: &App,
1038 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1039 let sessions = self.sessions.borrow();
1040 let session = sessions.get(session_id)?;
1041
1042 let config_opts = session.config_options.as_ref()?;
1043
1044 Some(Rc::new(AcpSessionConfigOptions {
1045 session_id: session_id.clone(),
1046 connection: self.connection.clone(),
1047 state: config_opts.config_options.clone(),
1048 watch_tx: config_opts.tx.clone(),
1049 watch_rx: config_opts.rx.clone(),
1050 }) as _)
1051 }
1052
1053 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1054 self.session_list.clone().map(|s| s as _)
1055 }
1056
1057 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1058 self
1059 }
1060}
1061
1062fn map_acp_error(err: acp::Error) -> anyhow::Error {
1063 if err.code == acp::ErrorCode::AuthRequired {
1064 let mut error = AuthRequired::new();
1065
1066 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1067 error = error.with_description(err.message);
1068 }
1069
1070 anyhow!(error)
1071 } else {
1072 anyhow!(err)
1073 }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079
1080 #[test]
1081 fn terminal_auth_task_reuses_command_and_merges_args_and_env() {
1082 let command = AgentServerCommand {
1083 path: "/path/to/agent".into(),
1084 args: vec!["--acp".into(), "--verbose".into()],
1085 env: Some(HashMap::from_iter([
1086 ("BASE".into(), "1".into()),
1087 ("SHARED".into(), "base".into()),
1088 ])),
1089 };
1090 let method = acp::AuthMethodTerminal::new("login", "Login")
1091 .args(vec!["/auth".into()])
1092 .env(std::collections::HashMap::from_iter([
1093 ("EXTRA".into(), "2".into()),
1094 ("SHARED".into(), "override".into()),
1095 ]));
1096
1097 let terminal_auth_task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1098
1099 assert_eq!(
1100 terminal_auth_task.command.as_deref(),
1101 Some("/path/to/agent")
1102 );
1103 assert_eq!(terminal_auth_task.args, vec!["--acp", "--verbose", "/auth"]);
1104 assert_eq!(
1105 terminal_auth_task.env,
1106 HashMap::from_iter([
1107 ("BASE".into(), "1".into()),
1108 ("SHARED".into(), "override".into()),
1109 ("EXTRA".into(), "2".into()),
1110 ])
1111 );
1112 assert_eq!(terminal_auth_task.label, "Login");
1113 assert_eq!(terminal_auth_task.command_label, "Login");
1114 }
1115
1116 #[test]
1117 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1118 let method_id = acp::AuthMethodId::new("legacy-login");
1119 let method = acp::AuthMethod::Agent(
1120 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1121 "terminal-auth".to_string(),
1122 serde_json::json!({
1123 "label": "legacy /auth",
1124 "command": "legacy-agent",
1125 "args": ["auth", "--interactive"],
1126 "env": {
1127 "AUTH_MODE": "interactive",
1128 },
1129 }),
1130 )])),
1131 );
1132
1133 let terminal_auth_task =
1134 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1135 .expect("expected legacy terminal auth task");
1136
1137 assert_eq!(
1138 terminal_auth_task.id.0,
1139 "external-agent-test-agent-legacy-login-login"
1140 );
1141 assert_eq!(terminal_auth_task.command.as_deref(), Some("legacy-agent"));
1142 assert_eq!(terminal_auth_task.args, vec!["auth", "--interactive"]);
1143 assert_eq!(
1144 terminal_auth_task.env,
1145 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1146 );
1147 assert_eq!(terminal_auth_task.label, "legacy /auth");
1148 }
1149
1150 #[test]
1151 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1152 let method_id = acp::AuthMethodId::new("legacy-login");
1153 let method = acp::AuthMethod::Agent(
1154 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1155 "terminal-auth".to_string(),
1156 serde_json::json!({
1157 "label": "legacy /auth",
1158 }),
1159 )])),
1160 );
1161
1162 assert!(
1163 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1164 );
1165 }
1166
1167 #[test]
1168 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1169 let method_id = acp::AuthMethodId::new("login");
1170 let method = acp::AuthMethod::Terminal(
1171 acp::AuthMethodTerminal::new(method_id, "Login")
1172 .args(vec!["/auth".into()])
1173 .env(std::collections::HashMap::from_iter([(
1174 "AUTH_MODE".into(),
1175 "first-class".into(),
1176 )]))
1177 .meta(acp::Meta::from_iter([(
1178 "terminal-auth".to_string(),
1179 serde_json::json!({
1180 "label": "legacy /auth",
1181 "command": "legacy-agent",
1182 "args": ["legacy-auth"],
1183 "env": {
1184 "AUTH_MODE": "legacy",
1185 },
1186 }),
1187 )])),
1188 );
1189
1190 let command = AgentServerCommand {
1191 path: "/path/to/agent".into(),
1192 args: vec!["--acp".into()],
1193 env: Some(HashMap::from_iter([("BASE".into(), "1".into())])),
1194 };
1195
1196 let terminal_auth_task = match &method {
1197 acp::AuthMethod::Terminal(terminal) => {
1198 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1199 }
1200 _ => unreachable!(),
1201 };
1202
1203 assert_eq!(
1204 terminal_auth_task.command.as_deref(),
1205 Some("/path/to/agent")
1206 );
1207 assert_eq!(terminal_auth_task.args, vec!["--acp", "/auth"]);
1208 assert_eq!(
1209 terminal_auth_task.env,
1210 HashMap::from_iter([
1211 ("BASE".into(), "1".into()),
1212 ("AUTH_MODE".into(), "first-class".into()),
1213 ])
1214 );
1215 assert_eq!(terminal_auth_task.label, "Login");
1216 }
1217}
1218
1219fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1220 let context_server_store = project.read(cx).context_server_store().read(cx);
1221 let is_local = project.read(cx).is_local();
1222 context_server_store
1223 .configured_server_ids()
1224 .iter()
1225 .filter_map(|id| {
1226 let configuration = context_server_store.configuration_for_server(id)?;
1227 match &*configuration {
1228 project::context_server_store::ContextServerConfiguration::Custom {
1229 command,
1230 remote,
1231 ..
1232 }
1233 | project::context_server_store::ContextServerConfiguration::Extension {
1234 command,
1235 remote,
1236 ..
1237 } if is_local || *remote => Some(acp::McpServer::Stdio(
1238 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1239 .args(command.args.clone())
1240 .env(if let Some(env) = command.env.as_ref() {
1241 env.iter()
1242 .map(|(name, value)| acp::EnvVariable::new(name, value))
1243 .collect()
1244 } else {
1245 vec![]
1246 }),
1247 )),
1248 project::context_server_store::ContextServerConfiguration::Http {
1249 url,
1250 headers,
1251 timeout: _,
1252 } => Some(acp::McpServer::Http(
1253 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1254 headers
1255 .iter()
1256 .map(|(name, value)| acp::HttpHeader::new(name, value))
1257 .collect(),
1258 ),
1259 )),
1260 _ => None,
1261 }
1262 })
1263 .collect()
1264}
1265
1266fn config_state(
1267 modes: Option<acp::SessionModeState>,
1268 models: Option<acp::SessionModelState>,
1269 config_options: Option<Vec<acp::SessionConfigOption>>,
1270) -> (
1271 Option<Rc<RefCell<acp::SessionModeState>>>,
1272 Option<Rc<RefCell<acp::SessionModelState>>>,
1273 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1274) {
1275 if let Some(opts) = config_options {
1276 return (None, None, Some(Rc::new(RefCell::new(opts))));
1277 }
1278
1279 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1280 let models = models.map(|models| Rc::new(RefCell::new(models)));
1281 (modes, models, None)
1282}
1283
1284struct AcpSessionModes {
1285 session_id: acp::SessionId,
1286 connection: Rc<acp::ClientSideConnection>,
1287 state: Rc<RefCell<acp::SessionModeState>>,
1288}
1289
1290impl acp_thread::AgentSessionModes for AcpSessionModes {
1291 fn current_mode(&self) -> acp::SessionModeId {
1292 self.state.borrow().current_mode_id.clone()
1293 }
1294
1295 fn all_modes(&self) -> Vec<acp::SessionMode> {
1296 self.state.borrow().available_modes.clone()
1297 }
1298
1299 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1300 let connection = self.connection.clone();
1301 let session_id = self.session_id.clone();
1302 let old_mode_id;
1303 {
1304 let mut state = self.state.borrow_mut();
1305 old_mode_id = state.current_mode_id.clone();
1306 state.current_mode_id = mode_id.clone();
1307 };
1308 let state = self.state.clone();
1309 cx.foreground_executor().spawn(async move {
1310 let result = connection
1311 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1312 .await;
1313
1314 if result.is_err() {
1315 state.borrow_mut().current_mode_id = old_mode_id;
1316 }
1317
1318 result?;
1319
1320 Ok(())
1321 })
1322 }
1323}
1324
1325struct AcpModelSelector {
1326 session_id: acp::SessionId,
1327 connection: Rc<acp::ClientSideConnection>,
1328 state: Rc<RefCell<acp::SessionModelState>>,
1329}
1330
1331impl AcpModelSelector {
1332 fn new(
1333 session_id: acp::SessionId,
1334 connection: Rc<acp::ClientSideConnection>,
1335 state: Rc<RefCell<acp::SessionModelState>>,
1336 ) -> Self {
1337 Self {
1338 session_id,
1339 connection,
1340 state,
1341 }
1342 }
1343}
1344
1345impl acp_thread::AgentModelSelector for AcpModelSelector {
1346 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1347 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1348 self.state
1349 .borrow()
1350 .available_models
1351 .clone()
1352 .into_iter()
1353 .map(acp_thread::AgentModelInfo::from)
1354 .collect(),
1355 )))
1356 }
1357
1358 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1359 let connection = self.connection.clone();
1360 let session_id = self.session_id.clone();
1361 let old_model_id;
1362 {
1363 let mut state = self.state.borrow_mut();
1364 old_model_id = state.current_model_id.clone();
1365 state.current_model_id = model_id.clone();
1366 };
1367 let state = self.state.clone();
1368 cx.foreground_executor().spawn(async move {
1369 let result = connection
1370 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1371 .await;
1372
1373 if result.is_err() {
1374 state.borrow_mut().current_model_id = old_model_id;
1375 }
1376
1377 result?;
1378
1379 Ok(())
1380 })
1381 }
1382
1383 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1384 let state = self.state.borrow();
1385 Task::ready(
1386 state
1387 .available_models
1388 .iter()
1389 .find(|m| m.model_id == state.current_model_id)
1390 .cloned()
1391 .map(acp_thread::AgentModelInfo::from)
1392 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1393 )
1394 }
1395}
1396
1397struct AcpSessionConfigOptions {
1398 session_id: acp::SessionId,
1399 connection: Rc<acp::ClientSideConnection>,
1400 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1401 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1402 watch_rx: watch::Receiver<()>,
1403}
1404
1405impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1406 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1407 self.state.borrow().clone()
1408 }
1409
1410 fn set_config_option(
1411 &self,
1412 config_id: acp::SessionConfigId,
1413 value: acp::SessionConfigValueId,
1414 cx: &mut App,
1415 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1416 let connection = self.connection.clone();
1417 let session_id = self.session_id.clone();
1418 let state = self.state.clone();
1419
1420 let watch_tx = self.watch_tx.clone();
1421
1422 cx.foreground_executor().spawn(async move {
1423 let response = connection
1424 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1425 session_id, config_id, value,
1426 ))
1427 .await?;
1428
1429 *state.borrow_mut() = response.config_options.clone();
1430 watch_tx.borrow_mut().send(()).ok();
1431 Ok(response.config_options)
1432 })
1433 }
1434
1435 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1436 Some(self.watch_rx.clone())
1437 }
1438}
1439
1440struct ClientDelegate {
1441 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1442 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1443 cx: AsyncApp,
1444}
1445
1446#[async_trait::async_trait(?Send)]
1447impl acp::Client for ClientDelegate {
1448 async fn request_permission(
1449 &self,
1450 arguments: acp::RequestPermissionRequest,
1451 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1452 let thread;
1453 {
1454 let sessions_ref = self.sessions.borrow();
1455 let session = sessions_ref
1456 .get(&arguments.session_id)
1457 .context("Failed to get session")?;
1458 thread = session.thread.clone();
1459 }
1460
1461 let cx = &mut self.cx.clone();
1462
1463 let task = thread.update(cx, |thread, cx| {
1464 thread.request_tool_call_authorization(
1465 arguments.tool_call,
1466 acp_thread::PermissionOptions::Flat(arguments.options),
1467 cx,
1468 )
1469 })??;
1470
1471 let outcome = task.await;
1472
1473 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1474 }
1475
1476 async fn write_text_file(
1477 &self,
1478 arguments: acp::WriteTextFileRequest,
1479 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1480 let cx = &mut self.cx.clone();
1481 let task = self
1482 .session_thread(&arguments.session_id)?
1483 .update(cx, |thread, cx| {
1484 thread.write_text_file(arguments.path, arguments.content, cx)
1485 })?;
1486
1487 task.await?;
1488
1489 Ok(Default::default())
1490 }
1491
1492 async fn read_text_file(
1493 &self,
1494 arguments: acp::ReadTextFileRequest,
1495 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1496 let task = self.session_thread(&arguments.session_id)?.update(
1497 &mut self.cx.clone(),
1498 |thread, cx| {
1499 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1500 },
1501 )?;
1502
1503 let content = task.await?;
1504
1505 Ok(acp::ReadTextFileResponse::new(content))
1506 }
1507
1508 async fn session_notification(
1509 &self,
1510 notification: acp::SessionNotification,
1511 ) -> Result<(), acp::Error> {
1512 let sessions = self.sessions.borrow();
1513 let session = sessions
1514 .get(¬ification.session_id)
1515 .context("Failed to get session")?;
1516
1517 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1518 current_mode_id,
1519 ..
1520 }) = ¬ification.update
1521 {
1522 if let Some(session_modes) = &session.session_modes {
1523 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1524 }
1525 }
1526
1527 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1528 config_options,
1529 ..
1530 }) = ¬ification.update
1531 {
1532 if let Some(opts) = &session.config_options {
1533 *opts.config_options.borrow_mut() = config_options.clone();
1534 opts.tx.borrow_mut().send(()).ok();
1535 }
1536 }
1537
1538 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1539 && let Some(session_list) = self.session_list.borrow().as_ref()
1540 {
1541 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1542 }
1543
1544 // Clone so we can inspect meta both before and after handing off to the thread
1545 let update_clone = notification.update.clone();
1546
1547 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1548 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1549 if let Some(meta) = &tc.meta {
1550 if let Some(terminal_info) = meta.get("terminal_info") {
1551 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1552 {
1553 let terminal_id = acp::TerminalId::new(id_str);
1554 let cwd = terminal_info
1555 .get("cwd")
1556 .and_then(|v| v.as_str().map(PathBuf::from));
1557
1558 // Create a minimal display-only lower-level terminal and register it.
1559 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1560 let builder = TerminalBuilder::new_display_only(
1561 CursorShape::default(),
1562 AlternateScroll::On,
1563 None,
1564 0,
1565 cx.background_executor(),
1566 thread.project().read(cx).path_style(cx),
1567 )?;
1568 let lower = cx.new(|cx| builder.subscribe(cx));
1569 thread.on_terminal_provider_event(
1570 TerminalProviderEvent::Created {
1571 terminal_id,
1572 label: tc.title.clone(),
1573 cwd,
1574 output_byte_limit: None,
1575 terminal: lower,
1576 },
1577 cx,
1578 );
1579 anyhow::Ok(())
1580 });
1581 }
1582 }
1583 }
1584 }
1585
1586 // Forward the update to the acp_thread as usual.
1587 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1588 thread.handle_session_update(notification.update.clone(), cx)
1589 })??;
1590
1591 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1592 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1593 if let Some(meta) = &tcu.meta {
1594 if let Some(term_out) = meta.get("terminal_output") {
1595 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1596 let terminal_id = acp::TerminalId::new(id_str);
1597 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1598 let data = s.as_bytes().to_vec();
1599 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1600 thread.on_terminal_provider_event(
1601 TerminalProviderEvent::Output { terminal_id, data },
1602 cx,
1603 );
1604 });
1605 }
1606 }
1607 }
1608
1609 // terminal_exit
1610 if let Some(term_exit) = meta.get("terminal_exit") {
1611 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1612 let terminal_id = acp::TerminalId::new(id_str);
1613 let status = acp::TerminalExitStatus::new()
1614 .exit_code(
1615 term_exit
1616 .get("exit_code")
1617 .and_then(|v| v.as_u64())
1618 .map(|i| i as u32),
1619 )
1620 .signal(
1621 term_exit
1622 .get("signal")
1623 .and_then(|v| v.as_str().map(|s| s.to_string())),
1624 );
1625
1626 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1627 thread.on_terminal_provider_event(
1628 TerminalProviderEvent::Exit {
1629 terminal_id,
1630 status,
1631 },
1632 cx,
1633 );
1634 });
1635 }
1636 }
1637 }
1638 }
1639
1640 Ok(())
1641 }
1642
1643 async fn create_terminal(
1644 &self,
1645 args: acp::CreateTerminalRequest,
1646 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1647 let thread = self.session_thread(&args.session_id)?;
1648 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1649
1650 let terminal_entity = acp_thread::create_terminal_entity(
1651 args.command.clone(),
1652 &args.args,
1653 args.env
1654 .into_iter()
1655 .map(|env| (env.name, env.value))
1656 .collect(),
1657 args.cwd.clone(),
1658 &project,
1659 &mut self.cx.clone(),
1660 )
1661 .await?;
1662
1663 // Register with renderer
1664 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1665 thread.register_terminal_created(
1666 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1667 format!("{} {}", args.command, args.args.join(" ")),
1668 args.cwd.clone(),
1669 args.output_byte_limit,
1670 terminal_entity,
1671 cx,
1672 )
1673 })?;
1674 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1675 Ok(acp::CreateTerminalResponse::new(terminal_id))
1676 }
1677
1678 async fn kill_terminal(
1679 &self,
1680 args: acp::KillTerminalRequest,
1681 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1682 self.session_thread(&args.session_id)?
1683 .update(&mut self.cx.clone(), |thread, cx| {
1684 thread.kill_terminal(args.terminal_id, cx)
1685 })??;
1686
1687 Ok(Default::default())
1688 }
1689
1690 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1691 Err(acp::Error::method_not_found())
1692 }
1693
1694 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1695 Err(acp::Error::method_not_found())
1696 }
1697
1698 async fn release_terminal(
1699 &self,
1700 args: acp::ReleaseTerminalRequest,
1701 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1702 self.session_thread(&args.session_id)?
1703 .update(&mut self.cx.clone(), |thread, cx| {
1704 thread.release_terminal(args.terminal_id, cx)
1705 })??;
1706
1707 Ok(Default::default())
1708 }
1709
1710 async fn terminal_output(
1711 &self,
1712 args: acp::TerminalOutputRequest,
1713 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1714 self.session_thread(&args.session_id)?
1715 .read_with(&mut self.cx.clone(), |thread, cx| {
1716 let out = thread
1717 .terminal(args.terminal_id)?
1718 .read(cx)
1719 .current_output(cx);
1720
1721 Ok(out)
1722 })?
1723 }
1724
1725 async fn wait_for_terminal_exit(
1726 &self,
1727 args: acp::WaitForTerminalExitRequest,
1728 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1729 let exit_status = self
1730 .session_thread(&args.session_id)?
1731 .update(&mut self.cx.clone(), |thread, cx| {
1732 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1733 })??
1734 .await;
1735
1736 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1737 }
1738}
1739
1740impl ClientDelegate {
1741 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1742 let sessions = self.sessions.borrow();
1743 sessions
1744 .get(session_id)
1745 .context("Failed to get session")
1746 .map(|session| session.thread.clone())
1747 }
1748}