1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::agent_server_store::{AgentServerCommand, AgentServerStore};
14use project::{AgentId, Project};
15use remote::remote_client::Interactive;
16use serde::Deserialize;
17use settings::Settings as _;
18use std::path::PathBuf;
19use std::process::Stdio;
20use std::rc::Rc;
21use std::{any::Any, cell::RefCell};
22use task::{ShellBuilder, SpawnInTerminal};
23use thiserror::Error;
24use util::ResultExt as _;
25use util::path_list::PathList;
26use util::process::Child;
27
28use anyhow::{Context as _, Result};
29use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
30
31use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
32use terminal::TerminalBuilder;
33use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
34
35use crate::GEMINI_ID;
36
37pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
38
39#[derive(Debug, Error)]
40#[error("Unsupported version")]
41pub struct UnsupportedVersion;
42
43pub struct AcpConnection {
44 id: AgentId,
45 telemetry_id: SharedString,
46 connection: Rc<acp::ClientSideConnection>,
47 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
48 auth_methods: Vec<acp::AuthMethod>,
49 agent_server_store: WeakEntity<AgentServerStore>,
50 agent_capabilities: acp::AgentCapabilities,
51 default_mode: Option<acp::SessionModeId>,
52 default_model: Option<acp::ModelId>,
53 default_config_options: HashMap<String, String>,
54 child: Child,
55 session_list: Option<Rc<AcpSessionList>>,
56 _io_task: Task<Result<(), acp::Error>>,
57 _wait_task: Task<Result<()>>,
58 _stderr_task: Task<Result<()>>,
59}
60
61struct ConfigOptions {
62 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
63 tx: Rc<RefCell<watch::Sender<()>>>,
64 rx: watch::Receiver<()>,
65}
66
67impl ConfigOptions {
68 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
69 let (tx, rx) = watch::channel(());
70 Self {
71 config_options,
72 tx: Rc::new(RefCell::new(tx)),
73 rx,
74 }
75 }
76}
77
78pub struct AcpSession {
79 thread: WeakEntity<AcpThread>,
80 suppress_abort_err: bool,
81 models: Option<Rc<RefCell<acp::SessionModelState>>>,
82 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
83 config_options: Option<ConfigOptions>,
84}
85
86pub struct AcpSessionList {
87 connection: Rc<acp::ClientSideConnection>,
88 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
89 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
90}
91
92impl AcpSessionList {
93 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
94 let (tx, rx) = smol::channel::unbounded();
95 Self {
96 connection,
97 updates_tx: tx,
98 updates_rx: rx,
99 }
100 }
101
102 fn notify_update(&self) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::Refresh)
105 .log_err();
106 }
107
108 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
109 self.updates_tx
110 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
111 .log_err();
112 }
113}
114
115impl AgentSessionList for AcpSessionList {
116 fn list_sessions(
117 &self,
118 request: AgentSessionListRequest,
119 cx: &mut App,
120 ) -> Task<Result<AgentSessionListResponse>> {
121 let conn = self.connection.clone();
122 cx.foreground_executor().spawn(async move {
123 let acp_request = acp::ListSessionsRequest::new()
124 .cwd(request.cwd)
125 .cursor(request.cursor);
126 let response = conn.list_sessions(acp_request).await?;
127 Ok(AgentSessionListResponse {
128 sessions: response
129 .sessions
130 .into_iter()
131 .map(|s| AgentSessionInfo {
132 session_id: s.session_id,
133 work_dirs: Some(PathList::new(&[s.cwd])),
134 title: s.title.map(Into::into),
135 updated_at: s.updated_at.and_then(|date_str| {
136 chrono::DateTime::parse_from_rfc3339(&date_str)
137 .ok()
138 .map(|dt| dt.with_timezone(&chrono::Utc))
139 }),
140 created_at: None,
141 meta: s.meta,
142 })
143 .collect(),
144 next_cursor: response.next_cursor,
145 meta: response.meta,
146 })
147 })
148 }
149
150 fn watch(
151 &self,
152 _cx: &mut App,
153 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
154 Some(self.updates_rx.clone())
155 }
156
157 fn notify_refresh(&self) {
158 self.notify_update();
159 }
160
161 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
162 self
163 }
164}
165
166pub async fn connect(
167 agent_id: AgentId,
168 project: Entity<Project>,
169 command: AgentServerCommand,
170 agent_server_store: WeakEntity<AgentServerStore>,
171 default_mode: Option<acp::SessionModeId>,
172 default_model: Option<acp::ModelId>,
173 default_config_options: HashMap<String, String>,
174 cx: &mut AsyncApp,
175) -> Result<Rc<dyn AgentConnection>> {
176 let conn = AcpConnection::stdio(
177 agent_id,
178 project,
179 command.clone(),
180 agent_server_store,
181 default_mode,
182 default_model,
183 default_config_options,
184 cx,
185 )
186 .await?;
187 Ok(Rc::new(conn) as _)
188}
189
190const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
191
192impl AcpConnection {
193 pub fn client_connection(&self) -> &Rc<acp::ClientSideConnection> {
194 &self.connection
195 }
196
197 pub async fn stdio(
198 agent_id: AgentId,
199 project: Entity<Project>,
200 command: AgentServerCommand,
201 agent_server_store: WeakEntity<AgentServerStore>,
202 default_mode: Option<acp::SessionModeId>,
203 default_model: Option<acp::ModelId>,
204 default_config_options: HashMap<String, String>,
205 cx: &mut AsyncApp,
206 ) -> Result<Self> {
207 let root_dir = project.read_with(cx, |project, cx| {
208 project
209 .default_path_list(cx)
210 .ordered_paths()
211 .next()
212 .cloned()
213 });
214 let original_command = command.clone();
215 let (path, args, env) = project
216 .read_with(cx, |project, cx| {
217 project.remote_client().and_then(|client| {
218 let template = client
219 .read(cx)
220 .build_command_with_options(
221 Some(command.path.display().to_string()),
222 &command.args,
223 &command.env.clone().into_iter().flatten().collect(),
224 root_dir.as_ref().map(|path| path.display().to_string()),
225 None,
226 Interactive::No,
227 )
228 .log_err()?;
229 Some((template.program, template.args, template.env))
230 })
231 })
232 .unwrap_or_else(|| {
233 (
234 command.path.display().to_string(),
235 command.args,
236 command.env.unwrap_or_default(),
237 )
238 });
239
240 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
241 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
242 let mut child = builder.build_std_command(Some(path.clone()), &args);
243 child.envs(env.clone());
244 if let Some(cwd) = project.read_with(cx, |project, _cx| {
245 if project.is_local() {
246 root_dir.as_ref()
247 } else {
248 None
249 }
250 }) {
251 child.current_dir(cwd);
252 }
253 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
254
255 let stdout = child.stdout.take().context("Failed to take stdout")?;
256 let stdin = child.stdin.take().context("Failed to take stdin")?;
257 let stderr = child.stderr.take().context("Failed to take stderr")?;
258 log::debug!("Spawning external agent server: {:?}, {:?}", path, args);
259 log::trace!("Spawned (pid: {})", child.id());
260
261 let sessions = Rc::new(RefCell::new(HashMap::default()));
262
263 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
264 (
265 release_channel::ReleaseChannel::try_global(cx)
266 .map(|release_channel| release_channel.display_name()),
267 release_channel::AppVersion::global(cx).to_string(),
268 )
269 });
270
271 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
272 Rc::new(RefCell::new(None));
273
274 let client = ClientDelegate {
275 sessions: sessions.clone(),
276 session_list: client_session_list.clone(),
277 cx: cx.clone(),
278 };
279 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
280 let foreground_executor = cx.foreground_executor().clone();
281 move |fut| {
282 foreground_executor.spawn(fut).detach();
283 }
284 });
285
286 let io_task = cx.background_spawn(io_task);
287
288 let stderr_task = cx.background_spawn(async move {
289 let mut stderr = BufReader::new(stderr);
290 let mut line = String::new();
291 while let Ok(n) = stderr.read_line(&mut line).await
292 && n > 0
293 {
294 log::warn!("agent stderr: {}", line.trim());
295 line.clear();
296 }
297 Ok(())
298 });
299
300 let wait_task = cx.spawn({
301 let sessions = sessions.clone();
302 let status_fut = child.status();
303 async move |cx| {
304 let status = status_fut.await?;
305
306 for session in sessions.borrow().values() {
307 session
308 .thread
309 .update(cx, |thread, cx| {
310 thread.emit_load_error(LoadError::Exited { status }, cx)
311 })
312 .ok();
313 }
314
315 anyhow::Ok(())
316 }
317 });
318
319 let connection = Rc::new(connection);
320
321 let response = connection
322 .initialize(
323 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
324 .client_capabilities(
325 acp::ClientCapabilities::new()
326 .fs(acp::FileSystemCapabilities::new()
327 .read_text_file(true)
328 .write_text_file(true))
329 .terminal(true)
330 .auth(acp::AuthCapabilities::new().terminal(true))
331 // Experimental: Allow for rendering terminal output from the agents
332 .meta(acp::Meta::from_iter([
333 ("terminal_output".into(), true.into()),
334 ("terminal-auth".into(), true.into()),
335 ])),
336 )
337 .client_info(
338 acp::Implementation::new("zed", version)
339 .title(release_channel.map(ToOwned::to_owned)),
340 ),
341 )
342 .await?;
343
344 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
345 return Err(UnsupportedVersion.into());
346 }
347
348 let telemetry_id = response
349 .agent_info
350 // Use the one the agent provides if we have one
351 .map(|info| info.name.into())
352 // Otherwise, just use the name
353 .unwrap_or_else(|| agent_id.0.clone());
354
355 let session_list = if response
356 .agent_capabilities
357 .session_capabilities
358 .list
359 .is_some()
360 {
361 let list = Rc::new(AcpSessionList::new(connection.clone()));
362 *client_session_list.borrow_mut() = Some(list.clone());
363 Some(list)
364 } else {
365 None
366 };
367
368 // TODO: Remove this override once Google team releases their official auth methods
369 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
370 let mut gemini_args = original_command.args.clone();
371 gemini_args.retain(|a| a != "--experimental-acp" && a != "--acp");
372 let value = serde_json::json!({
373 "label": "gemini /auth",
374 "command": original_command.path.to_string_lossy(),
375 "args": gemini_args,
376 "env": original_command.env.unwrap_or_default(),
377 });
378 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
379 vec![acp::AuthMethod::Agent(
380 acp::AuthMethodAgent::new(GEMINI_TERMINAL_AUTH_METHOD_ID, "Login")
381 .description("Login with your Google or Vertex AI account")
382 .meta(meta),
383 )]
384 } else {
385 response.auth_methods
386 };
387 Ok(Self {
388 id: agent_id,
389 auth_methods,
390 agent_server_store,
391 connection,
392 telemetry_id,
393 sessions,
394 agent_capabilities: response.agent_capabilities,
395 default_mode,
396 default_model,
397 default_config_options,
398 session_list,
399 _io_task: io_task,
400 _wait_task: wait_task,
401 _stderr_task: stderr_task,
402 child,
403 })
404 }
405
406 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
407 &self.agent_capabilities.prompt_capabilities
408 }
409
410 fn apply_default_config_options(
411 &self,
412 session_id: &acp::SessionId,
413 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
414 cx: &mut AsyncApp,
415 ) {
416 let id = self.id.clone();
417 let defaults_to_apply: Vec<_> = {
418 let config_opts_ref = config_options.borrow();
419 config_opts_ref
420 .iter()
421 .filter_map(|config_option| {
422 let default_value = self.default_config_options.get(&*config_option.id.0)?;
423
424 let is_valid = match &config_option.kind {
425 acp::SessionConfigKind::Select(select) => match &select.options {
426 acp::SessionConfigSelectOptions::Ungrouped(options) => options
427 .iter()
428 .any(|opt| &*opt.value.0 == default_value.as_str()),
429 acp::SessionConfigSelectOptions::Grouped(groups) => {
430 groups.iter().any(|g| {
431 g.options
432 .iter()
433 .any(|opt| &*opt.value.0 == default_value.as_str())
434 })
435 }
436 _ => false,
437 },
438 _ => false,
439 };
440
441 if is_valid {
442 let initial_value = match &config_option.kind {
443 acp::SessionConfigKind::Select(select) => {
444 Some(select.current_value.clone())
445 }
446 _ => None,
447 };
448 Some((
449 config_option.id.clone(),
450 default_value.clone(),
451 initial_value,
452 ))
453 } else {
454 log::warn!(
455 "`{}` is not a valid value for config option `{}` in {}",
456 default_value,
457 config_option.id.0,
458 id
459 );
460 None
461 }
462 })
463 .collect()
464 };
465
466 for (config_id, default_value, initial_value) in defaults_to_apply {
467 cx.spawn({
468 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
469 let session_id = session_id.clone();
470 let config_id_clone = config_id.clone();
471 let config_opts = config_options.clone();
472 let conn = self.connection.clone();
473 async move |_| {
474 let result = conn
475 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
476 session_id,
477 config_id_clone.clone(),
478 default_value_id,
479 ))
480 .await
481 .log_err();
482
483 if result.is_none() {
484 if let Some(initial) = initial_value {
485 let mut opts = config_opts.borrow_mut();
486 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
487 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
488 select.current_value = initial;
489 }
490 }
491 }
492 }
493 }
494 })
495 .detach();
496
497 let mut opts = config_options.borrow_mut();
498 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
499 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
500 select.current_value = acp::SessionConfigValueId::new(default_value);
501 }
502 }
503 }
504 }
505}
506
507impl Drop for AcpConnection {
508 fn drop(&mut self) {
509 self.child.kill().log_err();
510 }
511}
512
513fn terminal_auth_task_id(agent_id: &AgentId, method_id: &acp::AuthMethodId) -> String {
514 format!("external-agent-{}-{}-login", agent_id.0, method_id.0)
515}
516
517fn terminal_auth_task(
518 command: &AgentServerCommand,
519 agent_id: &AgentId,
520 method: &acp::AuthMethodTerminal,
521) -> SpawnInTerminal {
522 acp_thread::build_terminal_auth_task(
523 terminal_auth_task_id(agent_id, &method.id),
524 method.name.clone(),
525 command.path.to_string_lossy().into_owned(),
526 command.args.clone(),
527 command.env.clone().unwrap_or_default(),
528 )
529}
530
531/// Used to support the _meta method prior to stabilization
532fn meta_terminal_auth_task(
533 agent_id: &AgentId,
534 method_id: &acp::AuthMethodId,
535 method: &acp::AuthMethod,
536) -> Option<SpawnInTerminal> {
537 #[derive(Deserialize)]
538 struct MetaTerminalAuth {
539 label: String,
540 command: String,
541 #[serde(default)]
542 args: Vec<String>,
543 #[serde(default)]
544 env: HashMap<String, String>,
545 }
546
547 let meta = match method {
548 acp::AuthMethod::EnvVar(env_var) => env_var.meta.as_ref(),
549 acp::AuthMethod::Terminal(terminal) => terminal.meta.as_ref(),
550 acp::AuthMethod::Agent(agent) => agent.meta.as_ref(),
551 _ => None,
552 }?;
553 let terminal_auth =
554 serde_json::from_value::<MetaTerminalAuth>(meta.get("terminal-auth")?.clone()).ok()?;
555
556 Some(acp_thread::build_terminal_auth_task(
557 terminal_auth_task_id(agent_id, method_id),
558 terminal_auth.label.clone(),
559 terminal_auth.command,
560 terminal_auth.args,
561 terminal_auth.env,
562 ))
563}
564
565impl AgentConnection for AcpConnection {
566 fn agent_id(&self) -> AgentId {
567 self.id.clone()
568 }
569
570 fn telemetry_id(&self) -> SharedString {
571 self.telemetry_id.clone()
572 }
573
574 fn new_session(
575 self: Rc<Self>,
576 project: Entity<Project>,
577 work_dirs: PathList,
578 cx: &mut App,
579 ) -> Task<Result<Entity<AcpThread>>> {
580 // TODO: remove this once ACP supports multiple working directories
581 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
582 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
583 };
584 let name = self.id.0.clone();
585 let mcp_servers = mcp_servers_for_project(&project, cx);
586
587 cx.spawn(async move |cx| {
588 let response = self.connection
589 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
590 .await
591 .map_err(map_acp_error)?;
592
593 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
594
595 if let Some(default_mode) = self.default_mode.clone() {
596 if let Some(modes) = modes.as_ref() {
597 let mut modes_ref = modes.borrow_mut();
598 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
599
600 if has_mode {
601 let initial_mode_id = modes_ref.current_mode_id.clone();
602
603 cx.spawn({
604 let default_mode = default_mode.clone();
605 let session_id = response.session_id.clone();
606 let modes = modes.clone();
607 let conn = self.connection.clone();
608 async move |_| {
609 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
610 .await.log_err();
611
612 if result.is_none() {
613 modes.borrow_mut().current_mode_id = initial_mode_id;
614 }
615 }
616 }).detach();
617
618 modes_ref.current_mode_id = default_mode;
619 } else {
620 let available_modes = modes_ref
621 .available_modes
622 .iter()
623 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
624 .collect::<Vec<_>>()
625 .join("\n");
626
627 log::warn!(
628 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
629 );
630 }
631 }
632 }
633
634 if let Some(default_model) = self.default_model.clone() {
635 if let Some(models) = models.as_ref() {
636 let mut models_ref = models.borrow_mut();
637 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
638
639 if has_model {
640 let initial_model_id = models_ref.current_model_id.clone();
641
642 cx.spawn({
643 let default_model = default_model.clone();
644 let session_id = response.session_id.clone();
645 let models = models.clone();
646 let conn = self.connection.clone();
647 async move |_| {
648 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
649 .await.log_err();
650
651 if result.is_none() {
652 models.borrow_mut().current_model_id = initial_model_id;
653 }
654 }
655 }).detach();
656
657 models_ref.current_model_id = default_model;
658 } else {
659 let available_models = models_ref
660 .available_models
661 .iter()
662 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
663 .collect::<Vec<_>>()
664 .join("\n");
665
666 log::warn!(
667 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
668 );
669 }
670 }
671 }
672
673 if let Some(config_opts) = config_options.as_ref() {
674 self.apply_default_config_options(&response.session_id, config_opts, cx);
675 }
676
677 let action_log = cx.new(|_| ActionLog::new(project.clone()));
678 let thread: Entity<AcpThread> = cx.new(|cx| {
679 AcpThread::new(
680 None,
681 None,
682 Some(work_dirs),
683 self.clone(),
684 project,
685 action_log,
686 response.session_id.clone(),
687 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
688 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
689 cx,
690 )
691 });
692
693 self.sessions.borrow_mut().insert(
694 response.session_id,
695 AcpSession {
696 thread: thread.downgrade(),
697 suppress_abort_err: false,
698 session_modes: modes,
699 models,
700 config_options: config_options.map(ConfigOptions::new),
701 },
702 );
703
704 Ok(thread)
705 })
706 }
707
708 fn supports_load_session(&self) -> bool {
709 self.agent_capabilities.load_session
710 }
711
712 fn supports_resume_session(&self) -> bool {
713 self.agent_capabilities
714 .session_capabilities
715 .resume
716 .is_some()
717 }
718
719 fn load_session(
720 self: Rc<Self>,
721 session_id: acp::SessionId,
722 project: Entity<Project>,
723 work_dirs: PathList,
724 title: Option<SharedString>,
725 cx: &mut App,
726 ) -> Task<Result<Entity<AcpThread>>> {
727 if !self.agent_capabilities.load_session {
728 return Task::ready(Err(anyhow!(LoadError::Other(
729 "Loading sessions is not supported by this agent.".into()
730 ))));
731 }
732 // TODO: remove this once ACP supports multiple working directories
733 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
734 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
735 };
736
737 let mcp_servers = mcp_servers_for_project(&project, cx);
738 let action_log = cx.new(|_| ActionLog::new(project.clone()));
739 let thread: Entity<AcpThread> = cx.new(|cx| {
740 AcpThread::new(
741 None,
742 title,
743 Some(work_dirs.clone()),
744 self.clone(),
745 project,
746 action_log,
747 session_id.clone(),
748 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
749 cx,
750 )
751 });
752
753 self.sessions.borrow_mut().insert(
754 session_id.clone(),
755 AcpSession {
756 thread: thread.downgrade(),
757 suppress_abort_err: false,
758 session_modes: None,
759 models: None,
760 config_options: None,
761 },
762 );
763
764 cx.spawn(async move |cx| {
765 let response = match self
766 .connection
767 .load_session(
768 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
769 )
770 .await
771 {
772 Ok(response) => response,
773 Err(err) => {
774 self.sessions.borrow_mut().remove(&session_id);
775 return Err(map_acp_error(err));
776 }
777 };
778
779 let (modes, models, config_options) =
780 config_state(response.modes, response.models, response.config_options);
781
782 if let Some(config_opts) = config_options.as_ref() {
783 self.apply_default_config_options(&session_id, config_opts, cx);
784 }
785
786 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
787 session.session_modes = modes;
788 session.models = models;
789 session.config_options = config_options.map(ConfigOptions::new);
790 }
791
792 Ok(thread)
793 })
794 }
795
796 fn resume_session(
797 self: Rc<Self>,
798 session_id: acp::SessionId,
799 project: Entity<Project>,
800 work_dirs: PathList,
801 title: Option<SharedString>,
802 cx: &mut App,
803 ) -> Task<Result<Entity<AcpThread>>> {
804 if self
805 .agent_capabilities
806 .session_capabilities
807 .resume
808 .is_none()
809 {
810 return Task::ready(Err(anyhow!(LoadError::Other(
811 "Resuming sessions is not supported by this agent.".into()
812 ))));
813 }
814 // TODO: remove this once ACP supports multiple working directories
815 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
816 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
817 };
818
819 let mcp_servers = mcp_servers_for_project(&project, cx);
820 let action_log = cx.new(|_| ActionLog::new(project.clone()));
821 let thread: Entity<AcpThread> = cx.new(|cx| {
822 AcpThread::new(
823 None,
824 title,
825 Some(work_dirs),
826 self.clone(),
827 project,
828 action_log,
829 session_id.clone(),
830 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
831 cx,
832 )
833 });
834
835 self.sessions.borrow_mut().insert(
836 session_id.clone(),
837 AcpSession {
838 thread: thread.downgrade(),
839 suppress_abort_err: false,
840 session_modes: None,
841 models: None,
842 config_options: None,
843 },
844 );
845
846 cx.spawn(async move |cx| {
847 let response = match self
848 .connection
849 .resume_session(
850 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
851 .mcp_servers(mcp_servers),
852 )
853 .await
854 {
855 Ok(response) => response,
856 Err(err) => {
857 self.sessions.borrow_mut().remove(&session_id);
858 return Err(map_acp_error(err));
859 }
860 };
861
862 let (modes, models, config_options) =
863 config_state(response.modes, response.models, response.config_options);
864
865 if let Some(config_opts) = config_options.as_ref() {
866 self.apply_default_config_options(&session_id, config_opts, cx);
867 }
868
869 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
870 session.session_modes = modes;
871 session.models = models;
872 session.config_options = config_options.map(ConfigOptions::new);
873 }
874
875 Ok(thread)
876 })
877 }
878
879 fn supports_close_session(&self) -> bool {
880 self.agent_capabilities.session_capabilities.close.is_some()
881 }
882
883 fn close_session(
884 self: Rc<Self>,
885 session_id: &acp::SessionId,
886 cx: &mut App,
887 ) -> Task<Result<()>> {
888 if !self.supports_close_session() {
889 return Task::ready(Err(anyhow!(LoadError::Other(
890 "Closing sessions is not supported by this agent.".into()
891 ))));
892 }
893
894 let conn = self.connection.clone();
895 let session_id = session_id.clone();
896 cx.foreground_executor().spawn(async move {
897 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
898 .await?;
899 self.sessions.borrow_mut().remove(&session_id);
900 Ok(())
901 })
902 }
903
904 fn auth_methods(&self) -> &[acp::AuthMethod] {
905 &self.auth_methods
906 }
907
908 fn terminal_auth_task(
909 &self,
910 method_id: &acp::AuthMethodId,
911 cx: &App,
912 ) -> Option<Task<Result<SpawnInTerminal>>> {
913 let method = self
914 .auth_methods
915 .iter()
916 .find(|method| method.id() == method_id)?;
917
918 match method {
919 acp::AuthMethod::Terminal(terminal) if cx.has_flag::<AcpBetaFeatureFlag>() => {
920 let agent_id = self.id.clone();
921 let terminal = terminal.clone();
922 let store = self.agent_server_store.clone();
923 Some(cx.spawn(async move |cx| {
924 let command = store
925 .update(cx, |store, cx| {
926 let agent = store
927 .get_external_agent(&agent_id)
928 .context("Agent server not found")?;
929 anyhow::Ok(agent.get_command(
930 terminal.args.clone(),
931 HashMap::from_iter(terminal.env.clone()),
932 &mut cx.to_async(),
933 ))
934 })?
935 .context("Failed to get agent command")?
936 .await?;
937 Ok(terminal_auth_task(&command, &agent_id, &terminal))
938 }))
939 }
940 _ => meta_terminal_auth_task(&self.id, method_id, method)
941 .map(|task| Task::ready(Ok(task))),
942 }
943 }
944
945 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
946 let conn = self.connection.clone();
947 cx.foreground_executor().spawn(async move {
948 conn.authenticate(acp::AuthenticateRequest::new(method_id))
949 .await?;
950 Ok(())
951 })
952 }
953
954 fn prompt(
955 &self,
956 _id: Option<acp_thread::UserMessageId>,
957 params: acp::PromptRequest,
958 cx: &mut App,
959 ) -> Task<Result<acp::PromptResponse>> {
960 let conn = self.connection.clone();
961 let sessions = self.sessions.clone();
962 let session_id = params.session_id.clone();
963 cx.foreground_executor().spawn(async move {
964 let result = conn.prompt(params).await;
965
966 let mut suppress_abort_err = false;
967
968 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
969 suppress_abort_err = session.suppress_abort_err;
970 session.suppress_abort_err = false;
971 }
972
973 match result {
974 Ok(response) => Ok(response),
975 Err(err) => {
976 if err.code == acp::ErrorCode::AuthRequired {
977 return Err(anyhow!(acp::Error::auth_required()));
978 }
979
980 if err.code != ErrorCode::InternalError {
981 anyhow::bail!(err)
982 }
983
984 let Some(data) = &err.data else {
985 anyhow::bail!(err)
986 };
987
988 // Temporary workaround until the following PR is generally available:
989 // https://github.com/google-gemini/gemini-cli/pull/6656
990
991 #[derive(Deserialize)]
992 #[serde(deny_unknown_fields)]
993 struct ErrorDetails {
994 details: Box<str>,
995 }
996
997 match serde_json::from_value(data.clone()) {
998 Ok(ErrorDetails { details }) => {
999 if suppress_abort_err
1000 && (details.contains("This operation was aborted")
1001 || details.contains("The user aborted a request"))
1002 {
1003 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
1004 } else {
1005 Err(anyhow!(details))
1006 }
1007 }
1008 Err(_) => Err(anyhow!(err)),
1009 }
1010 }
1011 }
1012 })
1013 }
1014
1015 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
1016 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
1017 session.suppress_abort_err = true;
1018 }
1019 let conn = self.connection.clone();
1020 let params = acp::CancelNotification::new(session_id.clone());
1021 cx.foreground_executor()
1022 .spawn(async move { conn.cancel(params).await })
1023 .detach();
1024 }
1025
1026 fn session_modes(
1027 &self,
1028 session_id: &acp::SessionId,
1029 _cx: &App,
1030 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
1031 let sessions = self.sessions.clone();
1032 let sessions_ref = sessions.borrow();
1033 let Some(session) = sessions_ref.get(session_id) else {
1034 return None;
1035 };
1036
1037 if let Some(modes) = session.session_modes.as_ref() {
1038 Some(Rc::new(AcpSessionModes {
1039 connection: self.connection.clone(),
1040 session_id: session_id.clone(),
1041 state: modes.clone(),
1042 }) as _)
1043 } else {
1044 None
1045 }
1046 }
1047
1048 fn model_selector(
1049 &self,
1050 session_id: &acp::SessionId,
1051 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
1052 let sessions = self.sessions.clone();
1053 let sessions_ref = sessions.borrow();
1054 let Some(session) = sessions_ref.get(session_id) else {
1055 return None;
1056 };
1057
1058 if let Some(models) = session.models.as_ref() {
1059 Some(Rc::new(AcpModelSelector::new(
1060 session_id.clone(),
1061 self.connection.clone(),
1062 models.clone(),
1063 )) as _)
1064 } else {
1065 None
1066 }
1067 }
1068
1069 fn session_config_options(
1070 &self,
1071 session_id: &acp::SessionId,
1072 _cx: &App,
1073 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
1074 let sessions = self.sessions.borrow();
1075 let session = sessions.get(session_id)?;
1076
1077 let config_opts = session.config_options.as_ref()?;
1078
1079 Some(Rc::new(AcpSessionConfigOptions {
1080 session_id: session_id.clone(),
1081 connection: self.connection.clone(),
1082 state: config_opts.config_options.clone(),
1083 watch_tx: config_opts.tx.clone(),
1084 watch_rx: config_opts.rx.clone(),
1085 }) as _)
1086 }
1087
1088 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
1089 self.session_list.clone().map(|s| s as _)
1090 }
1091
1092 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
1093 self
1094 }
1095}
1096
1097fn map_acp_error(err: acp::Error) -> anyhow::Error {
1098 if err.code == acp::ErrorCode::AuthRequired {
1099 let mut error = AuthRequired::new();
1100
1101 if err.message != acp::ErrorCode::AuthRequired.to_string() {
1102 error = error.with_description(err.message);
1103 }
1104
1105 anyhow!(error)
1106 } else {
1107 anyhow!(err)
1108 }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use super::*;
1114
1115 #[test]
1116 fn terminal_auth_task_builds_spawn_from_prebuilt_command() {
1117 let command = AgentServerCommand {
1118 path: "/path/to/agent".into(),
1119 args: vec!["--acp".into(), "--verbose".into(), "/auth".into()],
1120 env: Some(HashMap::from_iter([
1121 ("BASE".into(), "1".into()),
1122 ("SHARED".into(), "override".into()),
1123 ("EXTRA".into(), "2".into()),
1124 ])),
1125 };
1126 let method = acp::AuthMethodTerminal::new("login", "Login");
1127
1128 let task = terminal_auth_task(&command, &AgentId::new("test-agent"), &method);
1129
1130 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1131 assert_eq!(task.args, vec!["--acp", "--verbose", "/auth"]);
1132 assert_eq!(
1133 task.env,
1134 HashMap::from_iter([
1135 ("BASE".into(), "1".into()),
1136 ("SHARED".into(), "override".into()),
1137 ("EXTRA".into(), "2".into()),
1138 ])
1139 );
1140 assert_eq!(task.label, "Login");
1141 assert_eq!(task.command_label, "Login");
1142 }
1143
1144 #[test]
1145 fn legacy_terminal_auth_task_parses_meta_and_retries_session() {
1146 let method_id = acp::AuthMethodId::new("legacy-login");
1147 let method = acp::AuthMethod::Agent(
1148 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1149 "terminal-auth".to_string(),
1150 serde_json::json!({
1151 "label": "legacy /auth",
1152 "command": "legacy-agent",
1153 "args": ["auth", "--interactive"],
1154 "env": {
1155 "AUTH_MODE": "interactive",
1156 },
1157 }),
1158 )])),
1159 );
1160
1161 let task = meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method)
1162 .expect("expected legacy terminal auth task");
1163
1164 assert_eq!(task.id.0, "external-agent-test-agent-legacy-login-login");
1165 assert_eq!(task.command.as_deref(), Some("legacy-agent"));
1166 assert_eq!(task.args, vec!["auth", "--interactive"]);
1167 assert_eq!(
1168 task.env,
1169 HashMap::from_iter([("AUTH_MODE".into(), "interactive".into())])
1170 );
1171 assert_eq!(task.label, "legacy /auth");
1172 }
1173
1174 #[test]
1175 fn legacy_terminal_auth_task_returns_none_for_invalid_meta() {
1176 let method_id = acp::AuthMethodId::new("legacy-login");
1177 let method = acp::AuthMethod::Agent(
1178 acp::AuthMethodAgent::new(method_id.clone(), "Login").meta(acp::Meta::from_iter([(
1179 "terminal-auth".to_string(),
1180 serde_json::json!({
1181 "label": "legacy /auth",
1182 }),
1183 )])),
1184 );
1185
1186 assert!(
1187 meta_terminal_auth_task(&AgentId::new("test-agent"), &method_id, &method).is_none()
1188 );
1189 }
1190
1191 #[test]
1192 fn first_class_terminal_auth_takes_precedence_over_legacy_meta() {
1193 let method_id = acp::AuthMethodId::new("login");
1194 let method = acp::AuthMethod::Terminal(
1195 acp::AuthMethodTerminal::new(method_id, "Login")
1196 .args(vec!["/auth".into()])
1197 .env(std::collections::HashMap::from_iter([(
1198 "AUTH_MODE".into(),
1199 "first-class".into(),
1200 )]))
1201 .meta(acp::Meta::from_iter([(
1202 "terminal-auth".to_string(),
1203 serde_json::json!({
1204 "label": "legacy /auth",
1205 "command": "legacy-agent",
1206 "args": ["legacy-auth"],
1207 "env": {
1208 "AUTH_MODE": "legacy",
1209 },
1210 }),
1211 )])),
1212 );
1213
1214 let command = AgentServerCommand {
1215 path: "/path/to/agent".into(),
1216 args: vec!["--acp".into(), "/auth".into()],
1217 env: Some(HashMap::from_iter([
1218 ("BASE".into(), "1".into()),
1219 ("AUTH_MODE".into(), "first-class".into()),
1220 ])),
1221 };
1222
1223 let task = match &method {
1224 acp::AuthMethod::Terminal(terminal) => {
1225 terminal_auth_task(&command, &AgentId::new("test-agent"), terminal)
1226 }
1227 _ => unreachable!(),
1228 };
1229
1230 assert_eq!(task.command.as_deref(), Some("/path/to/agent"));
1231 assert_eq!(task.args, vec!["--acp", "/auth"]);
1232 assert_eq!(
1233 task.env,
1234 HashMap::from_iter([
1235 ("BASE".into(), "1".into()),
1236 ("AUTH_MODE".into(), "first-class".into()),
1237 ])
1238 );
1239 assert_eq!(task.label, "Login");
1240 }
1241}
1242
1243fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
1244 let context_server_store = project.read(cx).context_server_store().read(cx);
1245 let is_local = project.read(cx).is_local();
1246 context_server_store
1247 .configured_server_ids()
1248 .iter()
1249 .filter_map(|id| {
1250 let configuration = context_server_store.configuration_for_server(id)?;
1251 match &*configuration {
1252 project::context_server_store::ContextServerConfiguration::Custom {
1253 command,
1254 remote,
1255 ..
1256 }
1257 | project::context_server_store::ContextServerConfiguration::Extension {
1258 command,
1259 remote,
1260 ..
1261 } if is_local || *remote => Some(acp::McpServer::Stdio(
1262 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1263 .args(command.args.clone())
1264 .env(if let Some(env) = command.env.as_ref() {
1265 env.iter()
1266 .map(|(name, value)| acp::EnvVariable::new(name, value))
1267 .collect()
1268 } else {
1269 vec![]
1270 }),
1271 )),
1272 project::context_server_store::ContextServerConfiguration::Http {
1273 url,
1274 headers,
1275 timeout: _,
1276 } => Some(acp::McpServer::Http(
1277 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1278 headers
1279 .iter()
1280 .map(|(name, value)| acp::HttpHeader::new(name, value))
1281 .collect(),
1282 ),
1283 )),
1284 _ => None,
1285 }
1286 })
1287 .collect()
1288}
1289
1290fn config_state(
1291 modes: Option<acp::SessionModeState>,
1292 models: Option<acp::SessionModelState>,
1293 config_options: Option<Vec<acp::SessionConfigOption>>,
1294) -> (
1295 Option<Rc<RefCell<acp::SessionModeState>>>,
1296 Option<Rc<RefCell<acp::SessionModelState>>>,
1297 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1298) {
1299 if let Some(opts) = config_options {
1300 return (None, None, Some(Rc::new(RefCell::new(opts))));
1301 }
1302
1303 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1304 let models = models.map(|models| Rc::new(RefCell::new(models)));
1305 (modes, models, None)
1306}
1307
1308struct AcpSessionModes {
1309 session_id: acp::SessionId,
1310 connection: Rc<acp::ClientSideConnection>,
1311 state: Rc<RefCell<acp::SessionModeState>>,
1312}
1313
1314impl acp_thread::AgentSessionModes for AcpSessionModes {
1315 fn current_mode(&self) -> acp::SessionModeId {
1316 self.state.borrow().current_mode_id.clone()
1317 }
1318
1319 fn all_modes(&self) -> Vec<acp::SessionMode> {
1320 self.state.borrow().available_modes.clone()
1321 }
1322
1323 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1324 let connection = self.connection.clone();
1325 let session_id = self.session_id.clone();
1326 let old_mode_id;
1327 {
1328 let mut state = self.state.borrow_mut();
1329 old_mode_id = state.current_mode_id.clone();
1330 state.current_mode_id = mode_id.clone();
1331 };
1332 let state = self.state.clone();
1333 cx.foreground_executor().spawn(async move {
1334 let result = connection
1335 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1336 .await;
1337
1338 if result.is_err() {
1339 state.borrow_mut().current_mode_id = old_mode_id;
1340 }
1341
1342 result?;
1343
1344 Ok(())
1345 })
1346 }
1347}
1348
1349struct AcpModelSelector {
1350 session_id: acp::SessionId,
1351 connection: Rc<acp::ClientSideConnection>,
1352 state: Rc<RefCell<acp::SessionModelState>>,
1353}
1354
1355impl AcpModelSelector {
1356 fn new(
1357 session_id: acp::SessionId,
1358 connection: Rc<acp::ClientSideConnection>,
1359 state: Rc<RefCell<acp::SessionModelState>>,
1360 ) -> Self {
1361 Self {
1362 session_id,
1363 connection,
1364 state,
1365 }
1366 }
1367}
1368
1369impl acp_thread::AgentModelSelector for AcpModelSelector {
1370 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1371 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1372 self.state
1373 .borrow()
1374 .available_models
1375 .clone()
1376 .into_iter()
1377 .map(acp_thread::AgentModelInfo::from)
1378 .collect(),
1379 )))
1380 }
1381
1382 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1383 let connection = self.connection.clone();
1384 let session_id = self.session_id.clone();
1385 let old_model_id;
1386 {
1387 let mut state = self.state.borrow_mut();
1388 old_model_id = state.current_model_id.clone();
1389 state.current_model_id = model_id.clone();
1390 };
1391 let state = self.state.clone();
1392 cx.foreground_executor().spawn(async move {
1393 let result = connection
1394 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1395 .await;
1396
1397 if result.is_err() {
1398 state.borrow_mut().current_model_id = old_model_id;
1399 }
1400
1401 result?;
1402
1403 Ok(())
1404 })
1405 }
1406
1407 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1408 let state = self.state.borrow();
1409 Task::ready(
1410 state
1411 .available_models
1412 .iter()
1413 .find(|m| m.model_id == state.current_model_id)
1414 .cloned()
1415 .map(acp_thread::AgentModelInfo::from)
1416 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1417 )
1418 }
1419}
1420
1421struct AcpSessionConfigOptions {
1422 session_id: acp::SessionId,
1423 connection: Rc<acp::ClientSideConnection>,
1424 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1425 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1426 watch_rx: watch::Receiver<()>,
1427}
1428
1429impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1430 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1431 self.state.borrow().clone()
1432 }
1433
1434 fn set_config_option(
1435 &self,
1436 config_id: acp::SessionConfigId,
1437 value: acp::SessionConfigValueId,
1438 cx: &mut App,
1439 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1440 let connection = self.connection.clone();
1441 let session_id = self.session_id.clone();
1442 let state = self.state.clone();
1443
1444 let watch_tx = self.watch_tx.clone();
1445
1446 cx.foreground_executor().spawn(async move {
1447 let response = connection
1448 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1449 session_id, config_id, value,
1450 ))
1451 .await?;
1452
1453 *state.borrow_mut() = response.config_options.clone();
1454 watch_tx.borrow_mut().send(()).ok();
1455 Ok(response.config_options)
1456 })
1457 }
1458
1459 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1460 Some(self.watch_rx.clone())
1461 }
1462}
1463
1464struct ClientDelegate {
1465 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1466 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1467 cx: AsyncApp,
1468}
1469
1470#[async_trait::async_trait(?Send)]
1471impl acp::Client for ClientDelegate {
1472 async fn request_permission(
1473 &self,
1474 arguments: acp::RequestPermissionRequest,
1475 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1476 let thread;
1477 {
1478 let sessions_ref = self.sessions.borrow();
1479 let session = sessions_ref
1480 .get(&arguments.session_id)
1481 .context("Failed to get session")?;
1482 thread = session.thread.clone();
1483 }
1484
1485 let cx = &mut self.cx.clone();
1486
1487 let task = thread.update(cx, |thread, cx| {
1488 thread.request_tool_call_authorization(
1489 arguments.tool_call,
1490 acp_thread::PermissionOptions::Flat(arguments.options),
1491 cx,
1492 )
1493 })??;
1494
1495 let outcome = task.await;
1496
1497 Ok(acp::RequestPermissionResponse::new(outcome.into()))
1498 }
1499
1500 async fn write_text_file(
1501 &self,
1502 arguments: acp::WriteTextFileRequest,
1503 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1504 let cx = &mut self.cx.clone();
1505 let task = self
1506 .session_thread(&arguments.session_id)?
1507 .update(cx, |thread, cx| {
1508 thread.write_text_file(arguments.path, arguments.content, cx)
1509 })?;
1510
1511 task.await?;
1512
1513 Ok(Default::default())
1514 }
1515
1516 async fn read_text_file(
1517 &self,
1518 arguments: acp::ReadTextFileRequest,
1519 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1520 let task = self.session_thread(&arguments.session_id)?.update(
1521 &mut self.cx.clone(),
1522 |thread, cx| {
1523 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1524 },
1525 )?;
1526
1527 let content = task.await?;
1528
1529 Ok(acp::ReadTextFileResponse::new(content))
1530 }
1531
1532 async fn session_notification(
1533 &self,
1534 notification: acp::SessionNotification,
1535 ) -> Result<(), acp::Error> {
1536 let sessions = self.sessions.borrow();
1537 let session = sessions
1538 .get(¬ification.session_id)
1539 .context("Failed to get session")?;
1540
1541 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1542 current_mode_id,
1543 ..
1544 }) = ¬ification.update
1545 {
1546 if let Some(session_modes) = &session.session_modes {
1547 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1548 }
1549 }
1550
1551 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1552 config_options,
1553 ..
1554 }) = ¬ification.update
1555 {
1556 if let Some(opts) = &session.config_options {
1557 *opts.config_options.borrow_mut() = config_options.clone();
1558 opts.tx.borrow_mut().send(()).ok();
1559 }
1560 }
1561
1562 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1563 && let Some(session_list) = self.session_list.borrow().as_ref()
1564 {
1565 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1566 }
1567
1568 // Clone so we can inspect meta both before and after handing off to the thread
1569 let update_clone = notification.update.clone();
1570
1571 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1572 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1573 if let Some(meta) = &tc.meta {
1574 if let Some(terminal_info) = meta.get("terminal_info") {
1575 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1576 {
1577 let terminal_id = acp::TerminalId::new(id_str);
1578 let cwd = terminal_info
1579 .get("cwd")
1580 .and_then(|v| v.as_str().map(PathBuf::from));
1581
1582 // Create a minimal display-only lower-level terminal and register it.
1583 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1584 let builder = TerminalBuilder::new_display_only(
1585 CursorShape::default(),
1586 AlternateScroll::On,
1587 None,
1588 0,
1589 cx.background_executor(),
1590 thread.project().read(cx).path_style(cx),
1591 )?;
1592 let lower = cx.new(|cx| builder.subscribe(cx));
1593 thread.on_terminal_provider_event(
1594 TerminalProviderEvent::Created {
1595 terminal_id,
1596 label: tc.title.clone(),
1597 cwd,
1598 output_byte_limit: None,
1599 terminal: lower,
1600 },
1601 cx,
1602 );
1603 anyhow::Ok(())
1604 });
1605 }
1606 }
1607 }
1608 }
1609
1610 // Forward the update to the acp_thread as usual.
1611 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1612 thread.handle_session_update(notification.update.clone(), cx)
1613 })??;
1614
1615 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1616 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1617 if let Some(meta) = &tcu.meta {
1618 if let Some(term_out) = meta.get("terminal_output") {
1619 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1620 let terminal_id = acp::TerminalId::new(id_str);
1621 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1622 let data = s.as_bytes().to_vec();
1623 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1624 thread.on_terminal_provider_event(
1625 TerminalProviderEvent::Output { terminal_id, data },
1626 cx,
1627 );
1628 });
1629 }
1630 }
1631 }
1632
1633 // terminal_exit
1634 if let Some(term_exit) = meta.get("terminal_exit") {
1635 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1636 let terminal_id = acp::TerminalId::new(id_str);
1637 let status = acp::TerminalExitStatus::new()
1638 .exit_code(
1639 term_exit
1640 .get("exit_code")
1641 .and_then(|v| v.as_u64())
1642 .map(|i| i as u32),
1643 )
1644 .signal(
1645 term_exit
1646 .get("signal")
1647 .and_then(|v| v.as_str().map(|s| s.to_string())),
1648 );
1649
1650 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1651 thread.on_terminal_provider_event(
1652 TerminalProviderEvent::Exit {
1653 terminal_id,
1654 status,
1655 },
1656 cx,
1657 );
1658 });
1659 }
1660 }
1661 }
1662 }
1663
1664 Ok(())
1665 }
1666
1667 async fn create_terminal(
1668 &self,
1669 args: acp::CreateTerminalRequest,
1670 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1671 let thread = self.session_thread(&args.session_id)?;
1672 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1673
1674 let terminal_entity = acp_thread::create_terminal_entity(
1675 args.command.clone(),
1676 &args.args,
1677 args.env
1678 .into_iter()
1679 .map(|env| (env.name, env.value))
1680 .collect(),
1681 args.cwd.clone(),
1682 &project,
1683 &mut self.cx.clone(),
1684 )
1685 .await?;
1686
1687 // Register with renderer
1688 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1689 thread.register_terminal_created(
1690 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1691 format!("{} {}", args.command, args.args.join(" ")),
1692 args.cwd.clone(),
1693 args.output_byte_limit,
1694 terminal_entity,
1695 cx,
1696 )
1697 })?;
1698 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1699 Ok(acp::CreateTerminalResponse::new(terminal_id))
1700 }
1701
1702 async fn kill_terminal(
1703 &self,
1704 args: acp::KillTerminalRequest,
1705 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1706 self.session_thread(&args.session_id)?
1707 .update(&mut self.cx.clone(), |thread, cx| {
1708 thread.kill_terminal(args.terminal_id, cx)
1709 })??;
1710
1711 Ok(Default::default())
1712 }
1713
1714 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1715 Err(acp::Error::method_not_found())
1716 }
1717
1718 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1719 Err(acp::Error::method_not_found())
1720 }
1721
1722 async fn release_terminal(
1723 &self,
1724 args: acp::ReleaseTerminalRequest,
1725 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1726 self.session_thread(&args.session_id)?
1727 .update(&mut self.cx.clone(), |thread, cx| {
1728 thread.release_terminal(args.terminal_id, cx)
1729 })??;
1730
1731 Ok(Default::default())
1732 }
1733
1734 async fn terminal_output(
1735 &self,
1736 args: acp::TerminalOutputRequest,
1737 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1738 self.session_thread(&args.session_id)?
1739 .read_with(&mut self.cx.clone(), |thread, cx| {
1740 let out = thread
1741 .terminal(args.terminal_id)?
1742 .read(cx)
1743 .current_output(cx);
1744
1745 Ok(out)
1746 })?
1747 }
1748
1749 async fn wait_for_terminal_exit(
1750 &self,
1751 args: acp::WaitForTerminalExitRequest,
1752 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1753 let exit_status = self
1754 .session_thread(&args.session_id)?
1755 .update(&mut self.cx.clone(), |thread, cx| {
1756 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1757 })??
1758 .await;
1759
1760 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1761 }
1762}
1763
1764impl ClientDelegate {
1765 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1766 let sessions = self.sessions.borrow();
1767 sessions
1768 .get(session_id)
1769 .context("Failed to get session")
1770 .map(|session| session.thread.clone())
1771 }
1772}