1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::Project;
13use project::agent_server_store::{AgentServerCommand, GEMINI_NAME};
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::process::Child;
19
20use std::path::PathBuf;
21use std::process::Stdio;
22use std::{any::Any, cell::RefCell};
23use std::{path::Path, rc::Rc};
24use thiserror::Error;
25
26use anyhow::{Context as _, Result};
27use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
28
29use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
30use terminal::TerminalBuilder;
31use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
32
33#[derive(Debug, Error)]
34#[error("Unsupported version")]
35pub struct UnsupportedVersion;
36
37pub struct AcpConnection {
38 server_name: SharedString,
39 display_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 child: Child,
49 session_list: Option<Rc<AcpSessionList>>,
50 _io_task: Task<Result<(), acp::Error>>,
51 _wait_task: Task<Result<()>>,
52 _stderr_task: Task<Result<()>>,
53}
54
55struct ConfigOptions {
56 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
57 tx: Rc<RefCell<watch::Sender<()>>>,
58 rx: watch::Receiver<()>,
59}
60
61impl ConfigOptions {
62 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
63 let (tx, rx) = watch::channel(());
64 Self {
65 config_options,
66 tx: Rc::new(RefCell::new(tx)),
67 rx,
68 }
69 }
70}
71
72pub struct AcpSession {
73 thread: WeakEntity<AcpThread>,
74 suppress_abort_err: bool,
75 models: Option<Rc<RefCell<acp::SessionModelState>>>,
76 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
77 config_options: Option<ConfigOptions>,
78}
79
80pub struct AcpSessionList {
81 connection: Rc<acp::ClientSideConnection>,
82 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
83 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
84}
85
86impl AcpSessionList {
87 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
88 let (tx, rx) = smol::channel::unbounded();
89 Self {
90 connection,
91 updates_tx: tx,
92 updates_rx: rx,
93 }
94 }
95
96 fn notify_update(&self) {
97 self.updates_tx
98 .try_send(acp_thread::SessionListUpdate::Refresh)
99 .log_err();
100 }
101
102 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
105 .log_err();
106 }
107}
108
109impl AgentSessionList for AcpSessionList {
110 fn list_sessions(
111 &self,
112 request: AgentSessionListRequest,
113 cx: &mut App,
114 ) -> Task<Result<AgentSessionListResponse>> {
115 let conn = self.connection.clone();
116 cx.foreground_executor().spawn(async move {
117 let acp_request = acp::ListSessionsRequest::new()
118 .cwd(request.cwd)
119 .cursor(request.cursor);
120 let response = conn.list_sessions(acp_request).await?;
121 Ok(AgentSessionListResponse {
122 sessions: response
123 .sessions
124 .into_iter()
125 .map(|s| AgentSessionInfo {
126 session_id: s.session_id,
127 cwd: Some(s.cwd),
128 title: s.title.map(Into::into),
129 updated_at: s.updated_at.and_then(|date_str| {
130 chrono::DateTime::parse_from_rfc3339(&date_str)
131 .ok()
132 .map(|dt| dt.with_timezone(&chrono::Utc))
133 }),
134 meta: s.meta,
135 })
136 .collect(),
137 next_cursor: response.next_cursor,
138 meta: response.meta,
139 })
140 })
141 }
142
143 fn watch(
144 &self,
145 _cx: &mut App,
146 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
147 Some(self.updates_rx.clone())
148 }
149
150 fn notify_refresh(&self) {
151 self.notify_update();
152 }
153
154 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
155 self
156 }
157}
158
159pub async fn connect(
160 server_name: SharedString,
161 display_name: SharedString,
162 command: AgentServerCommand,
163 default_mode: Option<acp::SessionModeId>,
164 default_model: Option<acp::ModelId>,
165 default_config_options: HashMap<String, String>,
166 cx: &mut AsyncApp,
167) -> Result<Rc<dyn AgentConnection>> {
168 let conn = AcpConnection::stdio(
169 server_name,
170 display_name,
171 command.clone(),
172 default_mode,
173 default_model,
174 default_config_options,
175 cx,
176 )
177 .await?;
178 Ok(Rc::new(conn) as _)
179}
180
181const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
182
183impl AcpConnection {
184 pub async fn stdio(
185 server_name: SharedString,
186 display_name: SharedString,
187 command: AgentServerCommand,
188 default_mode: Option<acp::SessionModeId>,
189 default_model: Option<acp::ModelId>,
190 default_config_options: HashMap<String, String>,
191 cx: &mut AsyncApp,
192 ) -> Result<Self> {
193 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
194 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
195 let mut child =
196 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
197 child.envs(command.env.iter().flatten());
198 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
199
200 let stdout = child.stdout.take().context("Failed to take stdout")?;
201 let stdin = child.stdin.take().context("Failed to take stdin")?;
202 let stderr = child.stderr.take().context("Failed to take stderr")?;
203 log::debug!(
204 "Spawning external agent server: {:?}, {:?}",
205 command.path,
206 command.args
207 );
208 log::trace!("Spawned (pid: {})", child.id());
209
210 let sessions = Rc::new(RefCell::new(HashMap::default()));
211
212 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
213 (
214 release_channel::ReleaseChannel::try_global(cx)
215 .map(|release_channel| release_channel.display_name()),
216 release_channel::AppVersion::global(cx).to_string(),
217 )
218 });
219
220 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
221 Rc::new(RefCell::new(None));
222
223 let client = ClientDelegate {
224 sessions: sessions.clone(),
225 session_list: client_session_list.clone(),
226 cx: cx.clone(),
227 };
228 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
229 let foreground_executor = cx.foreground_executor().clone();
230 move |fut| {
231 foreground_executor.spawn(fut).detach();
232 }
233 });
234
235 let io_task = cx.background_spawn(io_task);
236
237 let stderr_task = cx.background_spawn(async move {
238 let mut stderr = BufReader::new(stderr);
239 let mut line = String::new();
240 while let Ok(n) = stderr.read_line(&mut line).await
241 && n > 0
242 {
243 log::warn!("agent stderr: {}", line.trim());
244 line.clear();
245 }
246 Ok(())
247 });
248
249 let wait_task = cx.spawn({
250 let sessions = sessions.clone();
251 let status_fut = child.status();
252 async move |cx| {
253 let status = status_fut.await?;
254
255 for session in sessions.borrow().values() {
256 session
257 .thread
258 .update(cx, |thread, cx| {
259 thread.emit_load_error(LoadError::Exited { status }, cx)
260 })
261 .ok();
262 }
263
264 anyhow::Ok(())
265 }
266 });
267
268 let connection = Rc::new(connection);
269
270 cx.update(|cx| {
271 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
272 registry.set_active_connection(server_name.clone(), &connection, cx)
273 });
274 });
275
276 let response = connection
277 .initialize(
278 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
279 .client_capabilities(
280 acp::ClientCapabilities::new()
281 .fs(acp::FileSystemCapability::new()
282 .read_text_file(true)
283 .write_text_file(true))
284 .terminal(true)
285 // Experimental: Allow for rendering terminal output from the agents
286 .meta(acp::Meta::from_iter([
287 ("terminal_output".into(), true.into()),
288 ("terminal-auth".into(), true.into()),
289 ])),
290 )
291 .client_info(
292 acp::Implementation::new("zed", version)
293 .title(release_channel.map(ToOwned::to_owned)),
294 ),
295 )
296 .await?;
297
298 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
299 return Err(UnsupportedVersion.into());
300 }
301
302 let telemetry_id = response
303 .agent_info
304 // Use the one the agent provides if we have one
305 .map(|info| info.name.into())
306 // Otherwise, just use the name
307 .unwrap_or_else(|| server_name.clone());
308
309 let session_list = if response
310 .agent_capabilities
311 .session_capabilities
312 .list
313 .is_some()
314 {
315 let list = Rc::new(AcpSessionList::new(connection.clone()));
316 *client_session_list.borrow_mut() = Some(list.clone());
317 Some(list)
318 } else {
319 None
320 };
321
322 // TODO: Remove this override once Google team releases their official auth methods
323 let auth_methods = if server_name == GEMINI_NAME {
324 let mut args = command.args.clone();
325 args.retain(|a| a != "--experimental-acp");
326 let value = serde_json::json!({
327 "label": "gemini /auth",
328 "command": command.path.to_string_lossy().into_owned(),
329 "args": args,
330 "env": command.env.clone().unwrap_or_default(),
331 });
332 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
333 vec![
334 acp::AuthMethod::new("spawn-gemini-cli", "Login")
335 .description("Login with your Google or Vertex AI account")
336 .meta(meta),
337 ]
338 } else {
339 response.auth_methods
340 };
341 Ok(Self {
342 auth_methods,
343 connection,
344 server_name,
345 display_name,
346 telemetry_id,
347 sessions,
348 agent_capabilities: response.agent_capabilities,
349 default_mode,
350 default_model,
351 default_config_options,
352 session_list,
353 _io_task: io_task,
354 _wait_task: wait_task,
355 _stderr_task: stderr_task,
356 child,
357 })
358 }
359
360 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
361 &self.agent_capabilities.prompt_capabilities
362 }
363}
364
365impl Drop for AcpConnection {
366 fn drop(&mut self) {
367 self.child.kill().log_err();
368 }
369}
370
371impl AgentConnection for AcpConnection {
372 fn telemetry_id(&self) -> SharedString {
373 self.telemetry_id.clone()
374 }
375
376 fn new_session(
377 self: Rc<Self>,
378 project: Entity<Project>,
379 cwd: &Path,
380 cx: &mut App,
381 ) -> Task<Result<Entity<AcpThread>>> {
382 let name = self.server_name.clone();
383 let cwd = cwd.to_path_buf();
384 let mcp_servers = mcp_servers_for_project(&project, cx);
385
386 cx.spawn(async move |cx| {
387 let response = self.connection
388 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
389 .await
390 .map_err(map_acp_error)?;
391
392 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
393
394 if let Some(default_mode) = self.default_mode.clone() {
395 if let Some(modes) = modes.as_ref() {
396 let mut modes_ref = modes.borrow_mut();
397 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
398
399 if has_mode {
400 let initial_mode_id = modes_ref.current_mode_id.clone();
401
402 cx.spawn({
403 let default_mode = default_mode.clone();
404 let session_id = response.session_id.clone();
405 let modes = modes.clone();
406 let conn = self.connection.clone();
407 async move |_| {
408 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
409 .await.log_err();
410
411 if result.is_none() {
412 modes.borrow_mut().current_mode_id = initial_mode_id;
413 }
414 }
415 }).detach();
416
417 modes_ref.current_mode_id = default_mode;
418 } else {
419 let available_modes = modes_ref
420 .available_modes
421 .iter()
422 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
423 .collect::<Vec<_>>()
424 .join("\n");
425
426 log::warn!(
427 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
428 );
429 }
430 }
431 }
432
433 if let Some(default_model) = self.default_model.clone() {
434 if let Some(models) = models.as_ref() {
435 let mut models_ref = models.borrow_mut();
436 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
437
438 if has_model {
439 let initial_model_id = models_ref.current_model_id.clone();
440
441 cx.spawn({
442 let default_model = default_model.clone();
443 let session_id = response.session_id.clone();
444 let models = models.clone();
445 let conn = self.connection.clone();
446 async move |_| {
447 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
448 .await.log_err();
449
450 if result.is_none() {
451 models.borrow_mut().current_model_id = initial_model_id;
452 }
453 }
454 }).detach();
455
456 models_ref.current_model_id = default_model;
457 } else {
458 let available_models = models_ref
459 .available_models
460 .iter()
461 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
462 .collect::<Vec<_>>()
463 .join("\n");
464
465 log::warn!(
466 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
467 );
468 }
469 }
470 }
471
472 if let Some(config_opts) = config_options.as_ref() {
473 let defaults_to_apply: Vec<_> = {
474 let config_opts_ref = config_opts.borrow();
475 config_opts_ref
476 .iter()
477 .filter_map(|config_option| {
478 let default_value = self.default_config_options.get(&*config_option.id.0)?;
479
480 let is_valid = match &config_option.kind {
481 acp::SessionConfigKind::Select(select) => match &select.options {
482 acp::SessionConfigSelectOptions::Ungrouped(options) => {
483 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
484 }
485 acp::SessionConfigSelectOptions::Grouped(groups) => groups
486 .iter()
487 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
488 _ => false,
489 },
490 _ => false,
491 };
492
493 if is_valid {
494 let initial_value = match &config_option.kind {
495 acp::SessionConfigKind::Select(select) => {
496 Some(select.current_value.clone())
497 }
498 _ => None,
499 };
500 Some((config_option.id.clone(), default_value.clone(), initial_value))
501 } else {
502 log::warn!(
503 "`{}` is not a valid value for config option `{}` in {}",
504 default_value,
505 config_option.id.0,
506 name
507 );
508 None
509 }
510 })
511 .collect()
512 };
513
514 for (config_id, default_value, initial_value) in defaults_to_apply {
515 cx.spawn({
516 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
517 let session_id = response.session_id.clone();
518 let config_id_clone = config_id.clone();
519 let config_opts = config_opts.clone();
520 let conn = self.connection.clone();
521 async move |_| {
522 let result = conn
523 .set_session_config_option(
524 acp::SetSessionConfigOptionRequest::new(
525 session_id,
526 config_id_clone.clone(),
527 default_value_id,
528 ),
529 )
530 .await
531 .log_err();
532
533 if result.is_none() {
534 if let Some(initial) = initial_value {
535 let mut opts = config_opts.borrow_mut();
536 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
537 if let acp::SessionConfigKind::Select(select) =
538 &mut opt.kind
539 {
540 select.current_value = initial;
541 }
542 }
543 }
544 }
545 }
546 })
547 .detach();
548
549 let mut opts = config_opts.borrow_mut();
550 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
551 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
552 select.current_value = acp::SessionConfigValueId::new(default_value);
553 }
554 }
555 }
556 }
557
558 let action_log = cx.new(|_| ActionLog::new(project.clone()));
559 let thread: Entity<AcpThread> = cx.new(|cx| {
560 AcpThread::new(
561 None,
562 self.display_name.clone(),
563 self.clone(),
564 project,
565 action_log,
566 response.session_id.clone(),
567 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
568 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
569 cx,
570 )
571 });
572
573 self.sessions.borrow_mut().insert(
574 response.session_id,
575 AcpSession {
576 thread: thread.downgrade(),
577 suppress_abort_err: false,
578 session_modes: modes,
579 models,
580 config_options: config_options.map(ConfigOptions::new),
581 },
582 );
583
584 Ok(thread)
585 })
586 }
587
588 fn supports_load_session(&self) -> bool {
589 self.agent_capabilities.load_session
590 }
591
592 fn supports_resume_session(&self) -> bool {
593 self.agent_capabilities
594 .session_capabilities
595 .resume
596 .is_some()
597 }
598
599 fn load_session(
600 self: Rc<Self>,
601 session: AgentSessionInfo,
602 project: Entity<Project>,
603 cwd: &Path,
604 cx: &mut App,
605 ) -> Task<Result<Entity<AcpThread>>> {
606 if !self.agent_capabilities.load_session {
607 return Task::ready(Err(anyhow!(LoadError::Other(
608 "Loading sessions is not supported by this agent.".into()
609 ))));
610 }
611
612 let cwd = cwd.to_path_buf();
613 let mcp_servers = mcp_servers_for_project(&project, cx);
614 let action_log = cx.new(|_| ActionLog::new(project.clone()));
615 let title = session
616 .title
617 .clone()
618 .unwrap_or_else(|| self.display_name.clone());
619 let thread: Entity<AcpThread> = cx.new(|cx| {
620 AcpThread::new(
621 None,
622 title,
623 self.clone(),
624 project,
625 action_log,
626 session.session_id.clone(),
627 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
628 cx,
629 )
630 });
631
632 self.sessions.borrow_mut().insert(
633 session.session_id.clone(),
634 AcpSession {
635 thread: thread.downgrade(),
636 suppress_abort_err: false,
637 session_modes: None,
638 models: None,
639 config_options: None,
640 },
641 );
642
643 cx.spawn(async move |_| {
644 let response = match self
645 .connection
646 .load_session(
647 acp::LoadSessionRequest::new(session.session_id.clone(), cwd)
648 .mcp_servers(mcp_servers),
649 )
650 .await
651 {
652 Ok(response) => response,
653 Err(err) => {
654 self.sessions.borrow_mut().remove(&session.session_id);
655 return Err(map_acp_error(err));
656 }
657 };
658
659 let (modes, models, config_options) =
660 config_state(response.modes, response.models, response.config_options);
661 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
662 session.session_modes = modes;
663 session.models = models;
664 session.config_options = config_options.map(ConfigOptions::new);
665 }
666
667 Ok(thread)
668 })
669 }
670
671 fn resume_session(
672 self: Rc<Self>,
673 session: AgentSessionInfo,
674 project: Entity<Project>,
675 cwd: &Path,
676 cx: &mut App,
677 ) -> Task<Result<Entity<AcpThread>>> {
678 if self
679 .agent_capabilities
680 .session_capabilities
681 .resume
682 .is_none()
683 {
684 return Task::ready(Err(anyhow!(LoadError::Other(
685 "Resuming sessions is not supported by this agent.".into()
686 ))));
687 }
688
689 let cwd = cwd.to_path_buf();
690 let mcp_servers = mcp_servers_for_project(&project, cx);
691 let action_log = cx.new(|_| ActionLog::new(project.clone()));
692 let title = session
693 .title
694 .clone()
695 .unwrap_or_else(|| self.display_name.clone());
696 let thread: Entity<AcpThread> = cx.new(|cx| {
697 AcpThread::new(
698 None,
699 title,
700 self.clone(),
701 project,
702 action_log,
703 session.session_id.clone(),
704 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
705 cx,
706 )
707 });
708
709 self.sessions.borrow_mut().insert(
710 session.session_id.clone(),
711 AcpSession {
712 thread: thread.downgrade(),
713 suppress_abort_err: false,
714 session_modes: None,
715 models: None,
716 config_options: None,
717 },
718 );
719
720 cx.spawn(async move |_| {
721 let response = match self
722 .connection
723 .resume_session(
724 acp::ResumeSessionRequest::new(session.session_id.clone(), cwd)
725 .mcp_servers(mcp_servers),
726 )
727 .await
728 {
729 Ok(response) => response,
730 Err(err) => {
731 self.sessions.borrow_mut().remove(&session.session_id);
732 return Err(map_acp_error(err));
733 }
734 };
735
736 let (modes, models, config_options) =
737 config_state(response.modes, response.models, response.config_options);
738 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
739 session.session_modes = modes;
740 session.models = models;
741 session.config_options = config_options.map(ConfigOptions::new);
742 }
743
744 Ok(thread)
745 })
746 }
747
748 fn auth_methods(&self) -> &[acp::AuthMethod] {
749 &self.auth_methods
750 }
751
752 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
753 let conn = self.connection.clone();
754 cx.foreground_executor().spawn(async move {
755 conn.authenticate(acp::AuthenticateRequest::new(method_id))
756 .await?;
757 Ok(())
758 })
759 }
760
761 fn prompt(
762 &self,
763 _id: Option<acp_thread::UserMessageId>,
764 params: acp::PromptRequest,
765 cx: &mut App,
766 ) -> Task<Result<acp::PromptResponse>> {
767 let conn = self.connection.clone();
768 let sessions = self.sessions.clone();
769 let session_id = params.session_id.clone();
770 cx.foreground_executor().spawn(async move {
771 let result = conn.prompt(params).await;
772
773 let mut suppress_abort_err = false;
774
775 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
776 suppress_abort_err = session.suppress_abort_err;
777 session.suppress_abort_err = false;
778 }
779
780 match result {
781 Ok(response) => Ok(response),
782 Err(err) => {
783 if err.code == acp::ErrorCode::AuthRequired {
784 return Err(anyhow!(acp::Error::auth_required()));
785 }
786
787 if err.code != ErrorCode::InternalError {
788 anyhow::bail!(err)
789 }
790
791 let Some(data) = &err.data else {
792 anyhow::bail!(err)
793 };
794
795 // Temporary workaround until the following PR is generally available:
796 // https://github.com/google-gemini/gemini-cli/pull/6656
797
798 #[derive(Deserialize)]
799 #[serde(deny_unknown_fields)]
800 struct ErrorDetails {
801 details: Box<str>,
802 }
803
804 match serde_json::from_value(data.clone()) {
805 Ok(ErrorDetails { details }) => {
806 if suppress_abort_err
807 && (details.contains("This operation was aborted")
808 || details.contains("The user aborted a request"))
809 {
810 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
811 } else {
812 Err(anyhow!(details))
813 }
814 }
815 Err(_) => Err(anyhow!(err)),
816 }
817 }
818 }
819 })
820 }
821
822 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
823 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
824 session.suppress_abort_err = true;
825 }
826 let conn = self.connection.clone();
827 let params = acp::CancelNotification::new(session_id.clone());
828 cx.foreground_executor()
829 .spawn(async move { conn.cancel(params).await })
830 .detach();
831 }
832
833 fn session_modes(
834 &self,
835 session_id: &acp::SessionId,
836 _cx: &App,
837 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
838 let sessions = self.sessions.clone();
839 let sessions_ref = sessions.borrow();
840 let Some(session) = sessions_ref.get(session_id) else {
841 return None;
842 };
843
844 if let Some(modes) = session.session_modes.as_ref() {
845 Some(Rc::new(AcpSessionModes {
846 connection: self.connection.clone(),
847 session_id: session_id.clone(),
848 state: modes.clone(),
849 }) as _)
850 } else {
851 None
852 }
853 }
854
855 fn model_selector(
856 &self,
857 session_id: &acp::SessionId,
858 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
859 let sessions = self.sessions.clone();
860 let sessions_ref = sessions.borrow();
861 let Some(session) = sessions_ref.get(session_id) else {
862 return None;
863 };
864
865 if let Some(models) = session.models.as_ref() {
866 Some(Rc::new(AcpModelSelector::new(
867 session_id.clone(),
868 self.connection.clone(),
869 models.clone(),
870 )) as _)
871 } else {
872 None
873 }
874 }
875
876 fn session_config_options(
877 &self,
878 session_id: &acp::SessionId,
879 _cx: &App,
880 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
881 let sessions = self.sessions.borrow();
882 let session = sessions.get(session_id)?;
883
884 let config_opts = session.config_options.as_ref()?;
885
886 Some(Rc::new(AcpSessionConfigOptions {
887 session_id: session_id.clone(),
888 connection: self.connection.clone(),
889 state: config_opts.config_options.clone(),
890 watch_tx: config_opts.tx.clone(),
891 watch_rx: config_opts.rx.clone(),
892 }) as _)
893 }
894
895 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
896 self.session_list.clone().map(|s| s as _)
897 }
898
899 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
900 self
901 }
902}
903
904fn map_acp_error(err: acp::Error) -> anyhow::Error {
905 if err.code == acp::ErrorCode::AuthRequired {
906 let mut error = AuthRequired::new();
907
908 if err.message != acp::ErrorCode::AuthRequired.to_string() {
909 error = error.with_description(err.message);
910 }
911
912 anyhow!(error)
913 } else {
914 anyhow!(err)
915 }
916}
917
918fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
919 let context_server_store = project.read(cx).context_server_store().read(cx);
920 let is_local = project.read(cx).is_local();
921 context_server_store
922 .configured_server_ids()
923 .iter()
924 .filter_map(|id| {
925 let configuration = context_server_store.configuration_for_server(id)?;
926 match &*configuration {
927 project::context_server_store::ContextServerConfiguration::Custom {
928 command,
929 remote,
930 ..
931 }
932 | project::context_server_store::ContextServerConfiguration::Extension {
933 command,
934 remote,
935 ..
936 } if is_local || *remote => Some(acp::McpServer::Stdio(
937 acp::McpServerStdio::new(id.0.to_string(), &command.path)
938 .args(command.args.clone())
939 .env(if let Some(env) = command.env.as_ref() {
940 env.iter()
941 .map(|(name, value)| acp::EnvVariable::new(name, value))
942 .collect()
943 } else {
944 vec![]
945 }),
946 )),
947 project::context_server_store::ContextServerConfiguration::Http {
948 url,
949 headers,
950 timeout: _,
951 } => Some(acp::McpServer::Http(
952 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
953 headers
954 .iter()
955 .map(|(name, value)| acp::HttpHeader::new(name, value))
956 .collect(),
957 ),
958 )),
959 _ => None,
960 }
961 })
962 .collect()
963}
964
965fn config_state(
966 modes: Option<acp::SessionModeState>,
967 models: Option<acp::SessionModelState>,
968 config_options: Option<Vec<acp::SessionConfigOption>>,
969) -> (
970 Option<Rc<RefCell<acp::SessionModeState>>>,
971 Option<Rc<RefCell<acp::SessionModelState>>>,
972 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
973) {
974 if let Some(opts) = config_options {
975 return (None, None, Some(Rc::new(RefCell::new(opts))));
976 }
977
978 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
979 let models = models.map(|models| Rc::new(RefCell::new(models)));
980 (modes, models, None)
981}
982
983struct AcpSessionModes {
984 session_id: acp::SessionId,
985 connection: Rc<acp::ClientSideConnection>,
986 state: Rc<RefCell<acp::SessionModeState>>,
987}
988
989impl acp_thread::AgentSessionModes for AcpSessionModes {
990 fn current_mode(&self) -> acp::SessionModeId {
991 self.state.borrow().current_mode_id.clone()
992 }
993
994 fn all_modes(&self) -> Vec<acp::SessionMode> {
995 self.state.borrow().available_modes.clone()
996 }
997
998 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
999 let connection = self.connection.clone();
1000 let session_id = self.session_id.clone();
1001 let old_mode_id;
1002 {
1003 let mut state = self.state.borrow_mut();
1004 old_mode_id = state.current_mode_id.clone();
1005 state.current_mode_id = mode_id.clone();
1006 };
1007 let state = self.state.clone();
1008 cx.foreground_executor().spawn(async move {
1009 let result = connection
1010 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1011 .await;
1012
1013 if result.is_err() {
1014 state.borrow_mut().current_mode_id = old_mode_id;
1015 }
1016
1017 result?;
1018
1019 Ok(())
1020 })
1021 }
1022}
1023
1024struct AcpModelSelector {
1025 session_id: acp::SessionId,
1026 connection: Rc<acp::ClientSideConnection>,
1027 state: Rc<RefCell<acp::SessionModelState>>,
1028}
1029
1030impl AcpModelSelector {
1031 fn new(
1032 session_id: acp::SessionId,
1033 connection: Rc<acp::ClientSideConnection>,
1034 state: Rc<RefCell<acp::SessionModelState>>,
1035 ) -> Self {
1036 Self {
1037 session_id,
1038 connection,
1039 state,
1040 }
1041 }
1042}
1043
1044impl acp_thread::AgentModelSelector for AcpModelSelector {
1045 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1046 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1047 self.state
1048 .borrow()
1049 .available_models
1050 .clone()
1051 .into_iter()
1052 .map(acp_thread::AgentModelInfo::from)
1053 .collect(),
1054 )))
1055 }
1056
1057 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1058 let connection = self.connection.clone();
1059 let session_id = self.session_id.clone();
1060 let old_model_id;
1061 {
1062 let mut state = self.state.borrow_mut();
1063 old_model_id = state.current_model_id.clone();
1064 state.current_model_id = model_id.clone();
1065 };
1066 let state = self.state.clone();
1067 cx.foreground_executor().spawn(async move {
1068 let result = connection
1069 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1070 .await;
1071
1072 if result.is_err() {
1073 state.borrow_mut().current_model_id = old_model_id;
1074 }
1075
1076 result?;
1077
1078 Ok(())
1079 })
1080 }
1081
1082 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1083 let state = self.state.borrow();
1084 Task::ready(
1085 state
1086 .available_models
1087 .iter()
1088 .find(|m| m.model_id == state.current_model_id)
1089 .cloned()
1090 .map(acp_thread::AgentModelInfo::from)
1091 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1092 )
1093 }
1094}
1095
1096struct AcpSessionConfigOptions {
1097 session_id: acp::SessionId,
1098 connection: Rc<acp::ClientSideConnection>,
1099 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1100 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1101 watch_rx: watch::Receiver<()>,
1102}
1103
1104impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1105 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1106 self.state.borrow().clone()
1107 }
1108
1109 fn set_config_option(
1110 &self,
1111 config_id: acp::SessionConfigId,
1112 value: acp::SessionConfigValueId,
1113 cx: &mut App,
1114 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1115 let connection = self.connection.clone();
1116 let session_id = self.session_id.clone();
1117 let state = self.state.clone();
1118
1119 let watch_tx = self.watch_tx.clone();
1120
1121 cx.foreground_executor().spawn(async move {
1122 let response = connection
1123 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1124 session_id, config_id, value,
1125 ))
1126 .await?;
1127
1128 *state.borrow_mut() = response.config_options.clone();
1129 watch_tx.borrow_mut().send(()).ok();
1130 Ok(response.config_options)
1131 })
1132 }
1133
1134 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1135 Some(self.watch_rx.clone())
1136 }
1137}
1138
1139struct ClientDelegate {
1140 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1141 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1142 cx: AsyncApp,
1143}
1144
1145#[async_trait::async_trait(?Send)]
1146impl acp::Client for ClientDelegate {
1147 async fn request_permission(
1148 &self,
1149 arguments: acp::RequestPermissionRequest,
1150 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1151 let thread;
1152 {
1153 let sessions_ref = self.sessions.borrow();
1154 let session = sessions_ref
1155 .get(&arguments.session_id)
1156 .context("Failed to get session")?;
1157 thread = session.thread.clone();
1158 }
1159
1160 let cx = &mut self.cx.clone();
1161
1162 let task = thread.update(cx, |thread, cx| {
1163 thread.request_tool_call_authorization(
1164 arguments.tool_call,
1165 acp_thread::PermissionOptions::Flat(arguments.options),
1166 cx,
1167 )
1168 })??;
1169
1170 let outcome = task.await;
1171
1172 Ok(acp::RequestPermissionResponse::new(outcome))
1173 }
1174
1175 async fn write_text_file(
1176 &self,
1177 arguments: acp::WriteTextFileRequest,
1178 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1179 let cx = &mut self.cx.clone();
1180 let task = self
1181 .session_thread(&arguments.session_id)?
1182 .update(cx, |thread, cx| {
1183 thread.write_text_file(arguments.path, arguments.content, cx)
1184 })?;
1185
1186 task.await?;
1187
1188 Ok(Default::default())
1189 }
1190
1191 async fn read_text_file(
1192 &self,
1193 arguments: acp::ReadTextFileRequest,
1194 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1195 let task = self.session_thread(&arguments.session_id)?.update(
1196 &mut self.cx.clone(),
1197 |thread, cx| {
1198 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1199 },
1200 )?;
1201
1202 let content = task.await?;
1203
1204 Ok(acp::ReadTextFileResponse::new(content))
1205 }
1206
1207 async fn session_notification(
1208 &self,
1209 notification: acp::SessionNotification,
1210 ) -> Result<(), acp::Error> {
1211 let sessions = self.sessions.borrow();
1212 let session = sessions
1213 .get(¬ification.session_id)
1214 .context("Failed to get session")?;
1215
1216 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1217 current_mode_id,
1218 ..
1219 }) = ¬ification.update
1220 {
1221 if let Some(session_modes) = &session.session_modes {
1222 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1223 }
1224 }
1225
1226 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1227 config_options,
1228 ..
1229 }) = ¬ification.update
1230 {
1231 if let Some(opts) = &session.config_options {
1232 *opts.config_options.borrow_mut() = config_options.clone();
1233 opts.tx.borrow_mut().send(()).ok();
1234 }
1235 }
1236
1237 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1238 && let Some(session_list) = self.session_list.borrow().as_ref()
1239 {
1240 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1241 }
1242
1243 // Clone so we can inspect meta both before and after handing off to the thread
1244 let update_clone = notification.update.clone();
1245
1246 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1247 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1248 if let Some(meta) = &tc.meta {
1249 if let Some(terminal_info) = meta.get("terminal_info") {
1250 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1251 {
1252 let terminal_id = acp::TerminalId::new(id_str);
1253 let cwd = terminal_info
1254 .get("cwd")
1255 .and_then(|v| v.as_str().map(PathBuf::from));
1256
1257 // Create a minimal display-only lower-level terminal and register it.
1258 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1259 let builder = TerminalBuilder::new_display_only(
1260 CursorShape::default(),
1261 AlternateScroll::On,
1262 None,
1263 0,
1264 cx.background_executor(),
1265 thread.project().read(cx).path_style(cx),
1266 )?;
1267 let lower = cx.new(|cx| builder.subscribe(cx));
1268 thread.on_terminal_provider_event(
1269 TerminalProviderEvent::Created {
1270 terminal_id,
1271 label: tc.title.clone(),
1272 cwd,
1273 output_byte_limit: None,
1274 terminal: lower,
1275 },
1276 cx,
1277 );
1278 anyhow::Ok(())
1279 });
1280 }
1281 }
1282 }
1283 }
1284
1285 // Forward the update to the acp_thread as usual.
1286 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1287 thread.handle_session_update(notification.update.clone(), cx)
1288 })??;
1289
1290 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1291 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1292 if let Some(meta) = &tcu.meta {
1293 if let Some(term_out) = meta.get("terminal_output") {
1294 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1295 let terminal_id = acp::TerminalId::new(id_str);
1296 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1297 let data = s.as_bytes().to_vec();
1298 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1299 thread.on_terminal_provider_event(
1300 TerminalProviderEvent::Output { terminal_id, data },
1301 cx,
1302 );
1303 });
1304 }
1305 }
1306 }
1307
1308 // terminal_exit
1309 if let Some(term_exit) = meta.get("terminal_exit") {
1310 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1311 let terminal_id = acp::TerminalId::new(id_str);
1312 let status = acp::TerminalExitStatus::new()
1313 .exit_code(
1314 term_exit
1315 .get("exit_code")
1316 .and_then(|v| v.as_u64())
1317 .map(|i| i as u32),
1318 )
1319 .signal(
1320 term_exit
1321 .get("signal")
1322 .and_then(|v| v.as_str().map(|s| s.to_string())),
1323 );
1324
1325 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1326 thread.on_terminal_provider_event(
1327 TerminalProviderEvent::Exit {
1328 terminal_id,
1329 status,
1330 },
1331 cx,
1332 );
1333 });
1334 }
1335 }
1336 }
1337 }
1338
1339 Ok(())
1340 }
1341
1342 async fn create_terminal(
1343 &self,
1344 args: acp::CreateTerminalRequest,
1345 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1346 let thread = self.session_thread(&args.session_id)?;
1347 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1348
1349 let terminal_entity = acp_thread::create_terminal_entity(
1350 args.command.clone(),
1351 &args.args,
1352 args.env
1353 .into_iter()
1354 .map(|env| (env.name, env.value))
1355 .collect(),
1356 args.cwd.clone(),
1357 &project,
1358 &mut self.cx.clone(),
1359 )
1360 .await?;
1361
1362 // Register with renderer
1363 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1364 thread.register_terminal_created(
1365 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1366 format!("{} {}", args.command, args.args.join(" ")),
1367 args.cwd.clone(),
1368 args.output_byte_limit,
1369 terminal_entity,
1370 cx,
1371 )
1372 })?;
1373 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1374 Ok(acp::CreateTerminalResponse::new(terminal_id))
1375 }
1376
1377 async fn kill_terminal_command(
1378 &self,
1379 args: acp::KillTerminalCommandRequest,
1380 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1381 self.session_thread(&args.session_id)?
1382 .update(&mut self.cx.clone(), |thread, cx| {
1383 thread.kill_terminal(args.terminal_id, cx)
1384 })??;
1385
1386 Ok(Default::default())
1387 }
1388
1389 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1390 Err(acp::Error::method_not_found())
1391 }
1392
1393 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1394 Err(acp::Error::method_not_found())
1395 }
1396
1397 async fn release_terminal(
1398 &self,
1399 args: acp::ReleaseTerminalRequest,
1400 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1401 self.session_thread(&args.session_id)?
1402 .update(&mut self.cx.clone(), |thread, cx| {
1403 thread.release_terminal(args.terminal_id, cx)
1404 })??;
1405
1406 Ok(Default::default())
1407 }
1408
1409 async fn terminal_output(
1410 &self,
1411 args: acp::TerminalOutputRequest,
1412 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1413 self.session_thread(&args.session_id)?
1414 .read_with(&mut self.cx.clone(), |thread, cx| {
1415 let out = thread
1416 .terminal(args.terminal_id)?
1417 .read(cx)
1418 .current_output(cx);
1419
1420 Ok(out)
1421 })?
1422 }
1423
1424 async fn wait_for_terminal_exit(
1425 &self,
1426 args: acp::WaitForTerminalExitRequest,
1427 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1428 let exit_status = self
1429 .session_thread(&args.session_id)?
1430 .update(&mut self.cx.clone(), |thread, cx| {
1431 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1432 })??
1433 .await;
1434
1435 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1436 }
1437}
1438
1439impl ClientDelegate {
1440 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1441 let sessions = self.sessions.borrow();
1442 sessions
1443 .get(session_id)
1444 .context("Failed to get session")
1445 .map(|session| session.thread.clone())
1446 }
1447}