1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::Project;
14use project::agent_server_store::AgentServerCommand;
15use serde::Deserialize;
16use settings::Settings as _;
17use task::ShellBuilder;
18use util::ResultExt as _;
19use util::process::Child;
20
21use std::path::PathBuf;
22use std::process::Stdio;
23use std::{any::Any, cell::RefCell};
24use std::{path::Path, rc::Rc};
25use thiserror::Error;
26
27use anyhow::{Context as _, Result};
28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
29
30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
31use terminal::TerminalBuilder;
32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
33
34#[derive(Debug, Error)]
35#[error("Unsupported version")]
36pub struct UnsupportedVersion;
37
38pub struct AcpConnection {
39 server_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 root_dir: PathBuf,
49 child: Child,
50 session_list: Option<Rc<AcpSessionList>>,
51 _io_task: Task<Result<(), acp::Error>>,
52 _wait_task: Task<Result<()>>,
53 _stderr_task: Task<Result<()>>,
54}
55
56struct ConfigOptions {
57 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
58 tx: Rc<RefCell<watch::Sender<()>>>,
59 rx: watch::Receiver<()>,
60}
61
62impl ConfigOptions {
63 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
64 let (tx, rx) = watch::channel(());
65 Self {
66 config_options,
67 tx: Rc::new(RefCell::new(tx)),
68 rx,
69 }
70 }
71}
72
73pub struct AcpSession {
74 thread: WeakEntity<AcpThread>,
75 suppress_abort_err: bool,
76 models: Option<Rc<RefCell<acp::SessionModelState>>>,
77 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
78 config_options: Option<ConfigOptions>,
79}
80
81pub struct AcpSessionList {
82 connection: Rc<acp::ClientSideConnection>,
83 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
84 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
85}
86
87impl AcpSessionList {
88 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
89 let (tx, rx) = smol::channel::unbounded();
90 Self {
91 connection,
92 updates_tx: tx,
93 updates_rx: rx,
94 }
95 }
96
97 fn notify_update(&self) {
98 self.updates_tx
99 .try_send(acp_thread::SessionListUpdate::Refresh)
100 .log_err();
101 }
102
103 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
104 self.updates_tx
105 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
106 .log_err();
107 }
108}
109
110impl AgentSessionList for AcpSessionList {
111 fn list_sessions(
112 &self,
113 request: AgentSessionListRequest,
114 cx: &mut App,
115 ) -> Task<Result<AgentSessionListResponse>> {
116 let conn = self.connection.clone();
117 cx.foreground_executor().spawn(async move {
118 let acp_request = acp::ListSessionsRequest::new()
119 .cwd(request.cwd)
120 .cursor(request.cursor);
121 let response = conn.list_sessions(acp_request).await?;
122 Ok(AgentSessionListResponse {
123 sessions: response
124 .sessions
125 .into_iter()
126 .map(|s| AgentSessionInfo {
127 session_id: s.session_id,
128 cwd: Some(s.cwd),
129 title: s.title.map(Into::into),
130 updated_at: s.updated_at.and_then(|date_str| {
131 chrono::DateTime::parse_from_rfc3339(&date_str)
132 .ok()
133 .map(|dt| dt.with_timezone(&chrono::Utc))
134 }),
135 meta: s.meta,
136 })
137 .collect(),
138 next_cursor: response.next_cursor,
139 meta: response.meta,
140 })
141 })
142 }
143
144 fn watch(
145 &self,
146 _cx: &mut App,
147 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
148 Some(self.updates_rx.clone())
149 }
150
151 fn notify_refresh(&self) {
152 self.notify_update();
153 }
154
155 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
156 self
157 }
158}
159
160pub async fn connect(
161 server_name: SharedString,
162 command: AgentServerCommand,
163 root_dir: &Path,
164 default_mode: Option<acp::SessionModeId>,
165 default_model: Option<acp::ModelId>,
166 default_config_options: HashMap<String, String>,
167 is_remote: bool,
168 cx: &mut AsyncApp,
169) -> Result<Rc<dyn AgentConnection>> {
170 let conn = AcpConnection::stdio(
171 server_name,
172 command.clone(),
173 root_dir,
174 default_mode,
175 default_model,
176 default_config_options,
177 is_remote,
178 cx,
179 )
180 .await?;
181 Ok(Rc::new(conn) as _)
182}
183
184const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
185
186impl AcpConnection {
187 pub async fn stdio(
188 server_name: SharedString,
189 command: AgentServerCommand,
190 root_dir: &Path,
191 default_mode: Option<acp::SessionModeId>,
192 default_model: Option<acp::ModelId>,
193 default_config_options: HashMap<String, String>,
194 is_remote: bool,
195 cx: &mut AsyncApp,
196 ) -> Result<Self> {
197 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
198 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
199 let mut child =
200 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
201 child.envs(command.env.iter().flatten());
202 if !is_remote {
203 child.current_dir(root_dir);
204 }
205 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
206
207 let stdout = child.stdout.take().context("Failed to take stdout")?;
208 let stdin = child.stdin.take().context("Failed to take stdin")?;
209 let stderr = child.stderr.take().context("Failed to take stderr")?;
210 log::debug!(
211 "Spawning external agent server: {:?}, {:?}",
212 command.path,
213 command.args
214 );
215 log::trace!("Spawned (pid: {})", child.id());
216
217 let sessions = Rc::new(RefCell::new(HashMap::default()));
218
219 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
220 (
221 release_channel::ReleaseChannel::try_global(cx)
222 .map(|release_channel| release_channel.display_name()),
223 release_channel::AppVersion::global(cx).to_string(),
224 )
225 });
226
227 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
228 Rc::new(RefCell::new(None));
229
230 let client = ClientDelegate {
231 sessions: sessions.clone(),
232 session_list: client_session_list.clone(),
233 cx: cx.clone(),
234 };
235 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
236 let foreground_executor = cx.foreground_executor().clone();
237 move |fut| {
238 foreground_executor.spawn(fut).detach();
239 }
240 });
241
242 let io_task = cx.background_spawn(io_task);
243
244 let stderr_task = cx.background_spawn(async move {
245 let mut stderr = BufReader::new(stderr);
246 let mut line = String::new();
247 while let Ok(n) = stderr.read_line(&mut line).await
248 && n > 0
249 {
250 log::warn!("agent stderr: {}", line.trim());
251 line.clear();
252 }
253 Ok(())
254 });
255
256 let wait_task = cx.spawn({
257 let sessions = sessions.clone();
258 let status_fut = child.status();
259 async move |cx| {
260 let status = status_fut.await?;
261
262 for session in sessions.borrow().values() {
263 session
264 .thread
265 .update(cx, |thread, cx| {
266 thread.emit_load_error(LoadError::Exited { status }, cx)
267 })
268 .ok();
269 }
270
271 anyhow::Ok(())
272 }
273 });
274
275 let connection = Rc::new(connection);
276
277 cx.update(|cx| {
278 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
279 registry.set_active_connection(server_name.clone(), &connection, cx)
280 });
281 });
282
283 let response = connection
284 .initialize(
285 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
286 .client_capabilities(
287 acp::ClientCapabilities::new()
288 .fs(acp::FileSystemCapability::new()
289 .read_text_file(true)
290 .write_text_file(true))
291 .terminal(true)
292 // Experimental: Allow for rendering terminal output from the agents
293 .meta(acp::Meta::from_iter([
294 ("terminal_output".into(), true.into()),
295 ("terminal-auth".into(), true.into()),
296 ])),
297 )
298 .client_info(
299 acp::Implementation::new("zed", version)
300 .title(release_channel.map(ToOwned::to_owned)),
301 ),
302 )
303 .await?;
304
305 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
306 return Err(UnsupportedVersion.into());
307 }
308
309 let telemetry_id = response
310 .agent_info
311 // Use the one the agent provides if we have one
312 .map(|info| info.name.into())
313 // Otherwise, just use the name
314 .unwrap_or_else(|| server_name.clone());
315
316 let session_list = if response
317 .agent_capabilities
318 .session_capabilities
319 .list
320 .is_some()
321 {
322 let list = Rc::new(AcpSessionList::new(connection.clone()));
323 *client_session_list.borrow_mut() = Some(list.clone());
324 Some(list)
325 } else {
326 None
327 };
328
329 Ok(Self {
330 auth_methods: response.auth_methods,
331 root_dir: root_dir.to_owned(),
332 connection,
333 server_name,
334 telemetry_id,
335 sessions,
336 agent_capabilities: response.agent_capabilities,
337 default_mode,
338 default_model,
339 default_config_options,
340 session_list,
341 _io_task: io_task,
342 _wait_task: wait_task,
343 _stderr_task: stderr_task,
344 child,
345 })
346 }
347
348 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
349 &self.agent_capabilities.prompt_capabilities
350 }
351
352 pub fn root_dir(&self) -> &Path {
353 &self.root_dir
354 }
355}
356
357impl Drop for AcpConnection {
358 fn drop(&mut self) {
359 self.child.kill().log_err();
360 }
361}
362
363impl AgentConnection for AcpConnection {
364 fn telemetry_id(&self) -> SharedString {
365 self.telemetry_id.clone()
366 }
367
368 fn new_thread(
369 self: Rc<Self>,
370 project: Entity<Project>,
371 cwd: &Path,
372 cx: &mut App,
373 ) -> Task<Result<Entity<AcpThread>>> {
374 let name = self.server_name.clone();
375 let cwd = cwd.to_path_buf();
376 let mcp_servers = mcp_servers_for_project(&project, cx);
377
378 cx.spawn(async move |cx| {
379 let response = self.connection
380 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
381 .await
382 .map_err(map_acp_error)?;
383
384 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
385
386 if let Some(default_mode) = self.default_mode.clone() {
387 if let Some(modes) = modes.as_ref() {
388 let mut modes_ref = modes.borrow_mut();
389 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
390
391 if has_mode {
392 let initial_mode_id = modes_ref.current_mode_id.clone();
393
394 cx.spawn({
395 let default_mode = default_mode.clone();
396 let session_id = response.session_id.clone();
397 let modes = modes.clone();
398 let conn = self.connection.clone();
399 async move |_| {
400 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
401 .await.log_err();
402
403 if result.is_none() {
404 modes.borrow_mut().current_mode_id = initial_mode_id;
405 }
406 }
407 }).detach();
408
409 modes_ref.current_mode_id = default_mode;
410 } else {
411 let available_modes = modes_ref
412 .available_modes
413 .iter()
414 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
415 .collect::<Vec<_>>()
416 .join("\n");
417
418 log::warn!(
419 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
420 );
421 }
422 } else {
423 log::warn!(
424 "`{name}` does not support modes, but `default_mode` was set in settings.",
425 );
426 }
427 }
428
429 if let Some(default_model) = self.default_model.clone() {
430 if let Some(models) = models.as_ref() {
431 let mut models_ref = models.borrow_mut();
432 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
433
434 if has_model {
435 let initial_model_id = models_ref.current_model_id.clone();
436
437 cx.spawn({
438 let default_model = default_model.clone();
439 let session_id = response.session_id.clone();
440 let models = models.clone();
441 let conn = self.connection.clone();
442 async move |_| {
443 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
444 .await.log_err();
445
446 if result.is_none() {
447 models.borrow_mut().current_model_id = initial_model_id;
448 }
449 }
450 }).detach();
451
452 models_ref.current_model_id = default_model;
453 } else {
454 let available_models = models_ref
455 .available_models
456 .iter()
457 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
458 .collect::<Vec<_>>()
459 .join("\n");
460
461 log::warn!(
462 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
463 );
464 }
465 } else {
466 log::warn!(
467 "`{name}` does not support model selection, but `default_model` was set in settings.",
468 );
469 }
470 }
471
472 if let Some(config_opts) = config_options.as_ref() {
473 let defaults_to_apply: Vec<_> = {
474 let config_opts_ref = config_opts.borrow();
475 config_opts_ref
476 .iter()
477 .filter_map(|config_option| {
478 let default_value = self.default_config_options.get(&*config_option.id.0)?;
479
480 let is_valid = match &config_option.kind {
481 acp::SessionConfigKind::Select(select) => match &select.options {
482 acp::SessionConfigSelectOptions::Ungrouped(options) => {
483 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
484 }
485 acp::SessionConfigSelectOptions::Grouped(groups) => groups
486 .iter()
487 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
488 _ => false,
489 },
490 _ => false,
491 };
492
493 if is_valid {
494 let initial_value = match &config_option.kind {
495 acp::SessionConfigKind::Select(select) => {
496 Some(select.current_value.clone())
497 }
498 _ => None,
499 };
500 Some((config_option.id.clone(), default_value.clone(), initial_value))
501 } else {
502 log::warn!(
503 "`{}` is not a valid value for config option `{}` in {}",
504 default_value,
505 config_option.id.0,
506 name
507 );
508 None
509 }
510 })
511 .collect()
512 };
513
514 for (config_id, default_value, initial_value) in defaults_to_apply {
515 cx.spawn({
516 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
517 let session_id = response.session_id.clone();
518 let config_id_clone = config_id.clone();
519 let config_opts = config_opts.clone();
520 let conn = self.connection.clone();
521 async move |_| {
522 let result = conn
523 .set_session_config_option(
524 acp::SetSessionConfigOptionRequest::new(
525 session_id,
526 config_id_clone.clone(),
527 default_value_id,
528 ),
529 )
530 .await
531 .log_err();
532
533 if result.is_none() {
534 if let Some(initial) = initial_value {
535 let mut opts = config_opts.borrow_mut();
536 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
537 if let acp::SessionConfigKind::Select(select) =
538 &mut opt.kind
539 {
540 select.current_value = initial;
541 }
542 }
543 }
544 }
545 }
546 })
547 .detach();
548
549 let mut opts = config_opts.borrow_mut();
550 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
551 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
552 select.current_value = acp::SessionConfigValueId::new(default_value);
553 }
554 }
555 }
556 }
557
558 let action_log = cx.new(|_| ActionLog::new(project.clone()));
559 let thread: Entity<AcpThread> = cx.new(|cx| {
560 AcpThread::new(
561 self.server_name.clone(),
562 self.clone(),
563 project,
564 action_log,
565 response.session_id.clone(),
566 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
567 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
568 cx,
569 )
570 });
571
572 self.sessions.borrow_mut().insert(
573 response.session_id,
574 AcpSession {
575 thread: thread.downgrade(),
576 suppress_abort_err: false,
577 session_modes: modes,
578 models,
579 config_options: config_options.map(ConfigOptions::new),
580 },
581 );
582
583 Ok(thread)
584 })
585 }
586
587 fn supports_load_session(&self, cx: &App) -> bool {
588 cx.has_flag::<AcpBetaFeatureFlag>() && self.agent_capabilities.load_session
589 }
590
591 fn supports_resume_session(&self, cx: &App) -> bool {
592 cx.has_flag::<AcpBetaFeatureFlag>()
593 && self
594 .agent_capabilities
595 .session_capabilities
596 .resume
597 .is_some()
598 }
599
600 fn load_session(
601 self: Rc<Self>,
602 session: AgentSessionInfo,
603 project: Entity<Project>,
604 cwd: &Path,
605 cx: &mut App,
606 ) -> Task<Result<Entity<AcpThread>>> {
607 if !cx.has_flag::<AcpBetaFeatureFlag>() || !self.agent_capabilities.load_session {
608 return Task::ready(Err(anyhow!(LoadError::Other(
609 "Loading sessions is not supported by this agent.".into()
610 ))));
611 }
612
613 let cwd = cwd.to_path_buf();
614 let mcp_servers = mcp_servers_for_project(&project, cx);
615 let action_log = cx.new(|_| ActionLog::new(project.clone()));
616 let thread: Entity<AcpThread> = cx.new(|cx| {
617 AcpThread::new(
618 self.server_name.clone(),
619 self.clone(),
620 project,
621 action_log,
622 session.session_id.clone(),
623 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
624 cx,
625 )
626 });
627
628 self.sessions.borrow_mut().insert(
629 session.session_id.clone(),
630 AcpSession {
631 thread: thread.downgrade(),
632 suppress_abort_err: false,
633 session_modes: None,
634 models: None,
635 config_options: None,
636 },
637 );
638
639 cx.spawn(async move |_| {
640 let response = match self
641 .connection
642 .load_session(
643 acp::LoadSessionRequest::new(session.session_id.clone(), cwd)
644 .mcp_servers(mcp_servers),
645 )
646 .await
647 {
648 Ok(response) => response,
649 Err(err) => {
650 self.sessions.borrow_mut().remove(&session.session_id);
651 return Err(map_acp_error(err));
652 }
653 };
654
655 let (modes, models, config_options) =
656 config_state(response.modes, response.models, response.config_options);
657 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
658 session.session_modes = modes;
659 session.models = models;
660 session.config_options = config_options.map(ConfigOptions::new);
661 }
662
663 Ok(thread)
664 })
665 }
666
667 fn resume_session(
668 self: Rc<Self>,
669 session: AgentSessionInfo,
670 project: Entity<Project>,
671 cwd: &Path,
672 cx: &mut App,
673 ) -> Task<Result<Entity<AcpThread>>> {
674 if !cx.has_flag::<AcpBetaFeatureFlag>()
675 || self
676 .agent_capabilities
677 .session_capabilities
678 .resume
679 .is_none()
680 {
681 return Task::ready(Err(anyhow!(LoadError::Other(
682 "Resuming sessions is not supported by this agent.".into()
683 ))));
684 }
685
686 let cwd = cwd.to_path_buf();
687 let mcp_servers = mcp_servers_for_project(&project, cx);
688 let action_log = cx.new(|_| ActionLog::new(project.clone()));
689 let thread: Entity<AcpThread> = cx.new(|cx| {
690 AcpThread::new(
691 self.server_name.clone(),
692 self.clone(),
693 project,
694 action_log,
695 session.session_id.clone(),
696 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
697 cx,
698 )
699 });
700
701 self.sessions.borrow_mut().insert(
702 session.session_id.clone(),
703 AcpSession {
704 thread: thread.downgrade(),
705 suppress_abort_err: false,
706 session_modes: None,
707 models: None,
708 config_options: None,
709 },
710 );
711
712 cx.spawn(async move |_| {
713 let response = match self
714 .connection
715 .resume_session(
716 acp::ResumeSessionRequest::new(session.session_id.clone(), cwd)
717 .mcp_servers(mcp_servers),
718 )
719 .await
720 {
721 Ok(response) => response,
722 Err(err) => {
723 self.sessions.borrow_mut().remove(&session.session_id);
724 return Err(map_acp_error(err));
725 }
726 };
727
728 let (modes, models, config_options) =
729 config_state(response.modes, response.models, response.config_options);
730 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
731 session.session_modes = modes;
732 session.models = models;
733 session.config_options = config_options.map(ConfigOptions::new);
734 }
735
736 Ok(thread)
737 })
738 }
739
740 fn auth_methods(&self) -> &[acp::AuthMethod] {
741 &self.auth_methods
742 }
743
744 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
745 let conn = self.connection.clone();
746 cx.foreground_executor().spawn(async move {
747 conn.authenticate(acp::AuthenticateRequest::new(method_id))
748 .await?;
749 Ok(())
750 })
751 }
752
753 fn prompt(
754 &self,
755 _id: Option<acp_thread::UserMessageId>,
756 params: acp::PromptRequest,
757 cx: &mut App,
758 ) -> Task<Result<acp::PromptResponse>> {
759 let conn = self.connection.clone();
760 let sessions = self.sessions.clone();
761 let session_id = params.session_id.clone();
762 cx.foreground_executor().spawn(async move {
763 let result = conn.prompt(params).await;
764
765 let mut suppress_abort_err = false;
766
767 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
768 suppress_abort_err = session.suppress_abort_err;
769 session.suppress_abort_err = false;
770 }
771
772 match result {
773 Ok(response) => Ok(response),
774 Err(err) => {
775 if err.code == acp::ErrorCode::AuthRequired {
776 return Err(anyhow!(acp::Error::auth_required()));
777 }
778
779 if err.code != ErrorCode::InternalError {
780 anyhow::bail!(err)
781 }
782
783 let Some(data) = &err.data else {
784 anyhow::bail!(err)
785 };
786
787 // Temporary workaround until the following PR is generally available:
788 // https://github.com/google-gemini/gemini-cli/pull/6656
789
790 #[derive(Deserialize)]
791 #[serde(deny_unknown_fields)]
792 struct ErrorDetails {
793 details: Box<str>,
794 }
795
796 match serde_json::from_value(data.clone()) {
797 Ok(ErrorDetails { details }) => {
798 if suppress_abort_err
799 && (details.contains("This operation was aborted")
800 || details.contains("The user aborted a request"))
801 {
802 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
803 } else {
804 Err(anyhow!(details))
805 }
806 }
807 Err(_) => Err(anyhow!(err)),
808 }
809 }
810 }
811 })
812 }
813
814 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
815 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
816 session.suppress_abort_err = true;
817 }
818 let conn = self.connection.clone();
819 let params = acp::CancelNotification::new(session_id.clone());
820 cx.foreground_executor()
821 .spawn(async move { conn.cancel(params).await })
822 .detach();
823 }
824
825 fn session_modes(
826 &self,
827 session_id: &acp::SessionId,
828 _cx: &App,
829 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
830 let sessions = self.sessions.clone();
831 let sessions_ref = sessions.borrow();
832 let Some(session) = sessions_ref.get(session_id) else {
833 return None;
834 };
835
836 if let Some(modes) = session.session_modes.as_ref() {
837 Some(Rc::new(AcpSessionModes {
838 connection: self.connection.clone(),
839 session_id: session_id.clone(),
840 state: modes.clone(),
841 }) as _)
842 } else {
843 None
844 }
845 }
846
847 fn model_selector(
848 &self,
849 session_id: &acp::SessionId,
850 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
851 let sessions = self.sessions.clone();
852 let sessions_ref = sessions.borrow();
853 let Some(session) = sessions_ref.get(session_id) else {
854 return None;
855 };
856
857 if let Some(models) = session.models.as_ref() {
858 Some(Rc::new(AcpModelSelector::new(
859 session_id.clone(),
860 self.connection.clone(),
861 models.clone(),
862 )) as _)
863 } else {
864 None
865 }
866 }
867
868 fn session_config_options(
869 &self,
870 session_id: &acp::SessionId,
871 _cx: &App,
872 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
873 let sessions = self.sessions.borrow();
874 let session = sessions.get(session_id)?;
875
876 let config_opts = session.config_options.as_ref()?;
877
878 Some(Rc::new(AcpSessionConfigOptions {
879 session_id: session_id.clone(),
880 connection: self.connection.clone(),
881 state: config_opts.config_options.clone(),
882 watch_tx: config_opts.tx.clone(),
883 watch_rx: config_opts.rx.clone(),
884 }) as _)
885 }
886
887 fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
888 if cx.has_flag::<AcpBetaFeatureFlag>() {
889 self.session_list.clone().map(|s| s as _)
890 } else {
891 None
892 }
893 }
894
895 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
896 self
897 }
898}
899
900fn map_acp_error(err: acp::Error) -> anyhow::Error {
901 if err.code == acp::ErrorCode::AuthRequired {
902 let mut error = AuthRequired::new();
903
904 if err.message != acp::ErrorCode::AuthRequired.to_string() {
905 error = error.with_description(err.message);
906 }
907
908 anyhow!(error)
909 } else {
910 anyhow!(err)
911 }
912}
913
914fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
915 let context_server_store = project.read(cx).context_server_store().read(cx);
916 let is_local = project.read(cx).is_local();
917 context_server_store
918 .configured_server_ids()
919 .iter()
920 .filter_map(|id| {
921 let configuration = context_server_store.configuration_for_server(id)?;
922 match &*configuration {
923 project::context_server_store::ContextServerConfiguration::Custom {
924 command,
925 remote,
926 ..
927 }
928 | project::context_server_store::ContextServerConfiguration::Extension {
929 command,
930 remote,
931 ..
932 } if is_local || *remote => Some(acp::McpServer::Stdio(
933 acp::McpServerStdio::new(id.0.to_string(), &command.path)
934 .args(command.args.clone())
935 .env(if let Some(env) = command.env.as_ref() {
936 env.iter()
937 .map(|(name, value)| acp::EnvVariable::new(name, value))
938 .collect()
939 } else {
940 vec![]
941 }),
942 )),
943 project::context_server_store::ContextServerConfiguration::Http {
944 url,
945 headers,
946 timeout: _,
947 } => Some(acp::McpServer::Http(
948 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
949 headers
950 .iter()
951 .map(|(name, value)| acp::HttpHeader::new(name, value))
952 .collect(),
953 ),
954 )),
955 _ => None,
956 }
957 })
958 .collect()
959}
960
961fn config_state(
962 modes: Option<acp::SessionModeState>,
963 models: Option<acp::SessionModelState>,
964 config_options: Option<Vec<acp::SessionConfigOption>>,
965) -> (
966 Option<Rc<RefCell<acp::SessionModeState>>>,
967 Option<Rc<RefCell<acp::SessionModelState>>>,
968 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
969) {
970 if let Some(opts) = config_options {
971 return (None, None, Some(Rc::new(RefCell::new(opts))));
972 }
973
974 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
975 let models = models.map(|models| Rc::new(RefCell::new(models)));
976 (modes, models, None)
977}
978
979struct AcpSessionModes {
980 session_id: acp::SessionId,
981 connection: Rc<acp::ClientSideConnection>,
982 state: Rc<RefCell<acp::SessionModeState>>,
983}
984
985impl acp_thread::AgentSessionModes for AcpSessionModes {
986 fn current_mode(&self) -> acp::SessionModeId {
987 self.state.borrow().current_mode_id.clone()
988 }
989
990 fn all_modes(&self) -> Vec<acp::SessionMode> {
991 self.state.borrow().available_modes.clone()
992 }
993
994 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
995 let connection = self.connection.clone();
996 let session_id = self.session_id.clone();
997 let old_mode_id;
998 {
999 let mut state = self.state.borrow_mut();
1000 old_mode_id = state.current_mode_id.clone();
1001 state.current_mode_id = mode_id.clone();
1002 };
1003 let state = self.state.clone();
1004 cx.foreground_executor().spawn(async move {
1005 let result = connection
1006 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1007 .await;
1008
1009 if result.is_err() {
1010 state.borrow_mut().current_mode_id = old_mode_id;
1011 }
1012
1013 result?;
1014
1015 Ok(())
1016 })
1017 }
1018}
1019
1020struct AcpModelSelector {
1021 session_id: acp::SessionId,
1022 connection: Rc<acp::ClientSideConnection>,
1023 state: Rc<RefCell<acp::SessionModelState>>,
1024}
1025
1026impl AcpModelSelector {
1027 fn new(
1028 session_id: acp::SessionId,
1029 connection: Rc<acp::ClientSideConnection>,
1030 state: Rc<RefCell<acp::SessionModelState>>,
1031 ) -> Self {
1032 Self {
1033 session_id,
1034 connection,
1035 state,
1036 }
1037 }
1038}
1039
1040impl acp_thread::AgentModelSelector for AcpModelSelector {
1041 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1042 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1043 self.state
1044 .borrow()
1045 .available_models
1046 .clone()
1047 .into_iter()
1048 .map(acp_thread::AgentModelInfo::from)
1049 .collect(),
1050 )))
1051 }
1052
1053 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1054 let connection = self.connection.clone();
1055 let session_id = self.session_id.clone();
1056 let old_model_id;
1057 {
1058 let mut state = self.state.borrow_mut();
1059 old_model_id = state.current_model_id.clone();
1060 state.current_model_id = model_id.clone();
1061 };
1062 let state = self.state.clone();
1063 cx.foreground_executor().spawn(async move {
1064 let result = connection
1065 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1066 .await;
1067
1068 if result.is_err() {
1069 state.borrow_mut().current_model_id = old_model_id;
1070 }
1071
1072 result?;
1073
1074 Ok(())
1075 })
1076 }
1077
1078 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1079 let state = self.state.borrow();
1080 Task::ready(
1081 state
1082 .available_models
1083 .iter()
1084 .find(|m| m.model_id == state.current_model_id)
1085 .cloned()
1086 .map(acp_thread::AgentModelInfo::from)
1087 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1088 )
1089 }
1090}
1091
1092struct AcpSessionConfigOptions {
1093 session_id: acp::SessionId,
1094 connection: Rc<acp::ClientSideConnection>,
1095 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1096 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1097 watch_rx: watch::Receiver<()>,
1098}
1099
1100impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1101 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1102 self.state.borrow().clone()
1103 }
1104
1105 fn set_config_option(
1106 &self,
1107 config_id: acp::SessionConfigId,
1108 value: acp::SessionConfigValueId,
1109 cx: &mut App,
1110 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1111 let connection = self.connection.clone();
1112 let session_id = self.session_id.clone();
1113 let state = self.state.clone();
1114
1115 let watch_tx = self.watch_tx.clone();
1116
1117 cx.foreground_executor().spawn(async move {
1118 let response = connection
1119 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1120 session_id, config_id, value,
1121 ))
1122 .await?;
1123
1124 *state.borrow_mut() = response.config_options.clone();
1125 watch_tx.borrow_mut().send(()).ok();
1126 Ok(response.config_options)
1127 })
1128 }
1129
1130 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1131 Some(self.watch_rx.clone())
1132 }
1133}
1134
1135struct ClientDelegate {
1136 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1137 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1138 cx: AsyncApp,
1139}
1140
1141#[async_trait::async_trait(?Send)]
1142impl acp::Client for ClientDelegate {
1143 async fn request_permission(
1144 &self,
1145 arguments: acp::RequestPermissionRequest,
1146 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1147 let respect_always_allow_setting;
1148 let thread;
1149 {
1150 let sessions_ref = self.sessions.borrow();
1151 let session = sessions_ref
1152 .get(&arguments.session_id)
1153 .context("Failed to get session")?;
1154 respect_always_allow_setting = session.session_modes.is_none();
1155 thread = session.thread.clone();
1156 }
1157
1158 let cx = &mut self.cx.clone();
1159
1160 let task = thread.update(cx, |thread, cx| {
1161 thread.request_tool_call_authorization(
1162 arguments.tool_call,
1163 acp_thread::PermissionOptions::Flat(arguments.options),
1164 respect_always_allow_setting,
1165 cx,
1166 )
1167 })??;
1168
1169 let outcome = task.await;
1170
1171 Ok(acp::RequestPermissionResponse::new(outcome))
1172 }
1173
1174 async fn write_text_file(
1175 &self,
1176 arguments: acp::WriteTextFileRequest,
1177 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1178 let cx = &mut self.cx.clone();
1179 let task = self
1180 .session_thread(&arguments.session_id)?
1181 .update(cx, |thread, cx| {
1182 thread.write_text_file(arguments.path, arguments.content, cx)
1183 })?;
1184
1185 task.await?;
1186
1187 Ok(Default::default())
1188 }
1189
1190 async fn read_text_file(
1191 &self,
1192 arguments: acp::ReadTextFileRequest,
1193 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1194 let task = self.session_thread(&arguments.session_id)?.update(
1195 &mut self.cx.clone(),
1196 |thread, cx| {
1197 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1198 },
1199 )?;
1200
1201 let content = task.await?;
1202
1203 Ok(acp::ReadTextFileResponse::new(content))
1204 }
1205
1206 async fn session_notification(
1207 &self,
1208 notification: acp::SessionNotification,
1209 ) -> Result<(), acp::Error> {
1210 let sessions = self.sessions.borrow();
1211 let session = sessions
1212 .get(¬ification.session_id)
1213 .context("Failed to get session")?;
1214
1215 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1216 current_mode_id,
1217 ..
1218 }) = ¬ification.update
1219 {
1220 if let Some(session_modes) = &session.session_modes {
1221 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1222 } else {
1223 log::error!(
1224 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
1225 );
1226 }
1227 }
1228
1229 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1230 config_options,
1231 ..
1232 }) = ¬ification.update
1233 {
1234 if let Some(opts) = &session.config_options {
1235 *opts.config_options.borrow_mut() = config_options.clone();
1236 opts.tx.borrow_mut().send(()).ok();
1237 } else {
1238 log::error!(
1239 "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
1240 );
1241 }
1242 }
1243
1244 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1245 && let Some(session_list) = self.session_list.borrow().as_ref()
1246 {
1247 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1248 }
1249
1250 // Clone so we can inspect meta both before and after handing off to the thread
1251 let update_clone = notification.update.clone();
1252
1253 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1254 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1255 if let Some(meta) = &tc.meta {
1256 if let Some(terminal_info) = meta.get("terminal_info") {
1257 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1258 {
1259 let terminal_id = acp::TerminalId::new(id_str);
1260 let cwd = terminal_info
1261 .get("cwd")
1262 .and_then(|v| v.as_str().map(PathBuf::from));
1263
1264 // Create a minimal display-only lower-level terminal and register it.
1265 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1266 let builder = TerminalBuilder::new_display_only(
1267 CursorShape::default(),
1268 AlternateScroll::On,
1269 None,
1270 0,
1271 cx.background_executor(),
1272 thread.project().read(cx).path_style(cx),
1273 )?;
1274 let lower = cx.new(|cx| builder.subscribe(cx));
1275 thread.on_terminal_provider_event(
1276 TerminalProviderEvent::Created {
1277 terminal_id,
1278 label: tc.title.clone(),
1279 cwd,
1280 output_byte_limit: None,
1281 terminal: lower,
1282 },
1283 cx,
1284 );
1285 anyhow::Ok(())
1286 });
1287 }
1288 }
1289 }
1290 }
1291
1292 // Forward the update to the acp_thread as usual.
1293 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1294 thread.handle_session_update(notification.update.clone(), cx)
1295 })??;
1296
1297 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1298 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1299 if let Some(meta) = &tcu.meta {
1300 if let Some(term_out) = meta.get("terminal_output") {
1301 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1302 let terminal_id = acp::TerminalId::new(id_str);
1303 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1304 let data = s.as_bytes().to_vec();
1305 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1306 thread.on_terminal_provider_event(
1307 TerminalProviderEvent::Output { terminal_id, data },
1308 cx,
1309 );
1310 });
1311 }
1312 }
1313 }
1314
1315 // terminal_exit
1316 if let Some(term_exit) = meta.get("terminal_exit") {
1317 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1318 let terminal_id = acp::TerminalId::new(id_str);
1319 let status = acp::TerminalExitStatus::new()
1320 .exit_code(
1321 term_exit
1322 .get("exit_code")
1323 .and_then(|v| v.as_u64())
1324 .map(|i| i as u32),
1325 )
1326 .signal(
1327 term_exit
1328 .get("signal")
1329 .and_then(|v| v.as_str().map(|s| s.to_string())),
1330 );
1331
1332 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1333 thread.on_terminal_provider_event(
1334 TerminalProviderEvent::Exit {
1335 terminal_id,
1336 status,
1337 },
1338 cx,
1339 );
1340 });
1341 }
1342 }
1343 }
1344 }
1345
1346 Ok(())
1347 }
1348
1349 async fn create_terminal(
1350 &self,
1351 args: acp::CreateTerminalRequest,
1352 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1353 let thread = self.session_thread(&args.session_id)?;
1354 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1355
1356 let terminal_entity = acp_thread::create_terminal_entity(
1357 args.command.clone(),
1358 &args.args,
1359 args.env
1360 .into_iter()
1361 .map(|env| (env.name, env.value))
1362 .collect(),
1363 args.cwd.clone(),
1364 &project,
1365 &mut self.cx.clone(),
1366 )
1367 .await?;
1368
1369 // Register with renderer
1370 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1371 thread.register_terminal_created(
1372 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1373 format!("{} {}", args.command, args.args.join(" ")),
1374 args.cwd.clone(),
1375 args.output_byte_limit,
1376 terminal_entity,
1377 cx,
1378 )
1379 })?;
1380 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1381 Ok(acp::CreateTerminalResponse::new(terminal_id))
1382 }
1383
1384 async fn kill_terminal_command(
1385 &self,
1386 args: acp::KillTerminalCommandRequest,
1387 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1388 self.session_thread(&args.session_id)?
1389 .update(&mut self.cx.clone(), |thread, cx| {
1390 thread.kill_terminal(args.terminal_id, cx)
1391 })??;
1392
1393 Ok(Default::default())
1394 }
1395
1396 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1397 Err(acp::Error::method_not_found())
1398 }
1399
1400 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1401 Err(acp::Error::method_not_found())
1402 }
1403
1404 async fn release_terminal(
1405 &self,
1406 args: acp::ReleaseTerminalRequest,
1407 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1408 self.session_thread(&args.session_id)?
1409 .update(&mut self.cx.clone(), |thread, cx| {
1410 thread.release_terminal(args.terminal_id, cx)
1411 })??;
1412
1413 Ok(Default::default())
1414 }
1415
1416 async fn terminal_output(
1417 &self,
1418 args: acp::TerminalOutputRequest,
1419 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1420 self.session_thread(&args.session_id)?
1421 .read_with(&mut self.cx.clone(), |thread, cx| {
1422 let out = thread
1423 .terminal(args.terminal_id)?
1424 .read(cx)
1425 .current_output(cx);
1426
1427 Ok(out)
1428 })?
1429 }
1430
1431 async fn wait_for_terminal_exit(
1432 &self,
1433 args: acp::WaitForTerminalExitRequest,
1434 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1435 let exit_status = self
1436 .session_thread(&args.session_id)?
1437 .update(&mut self.cx.clone(), |thread, cx| {
1438 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1439 })??
1440 .await;
1441
1442 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1443 }
1444}
1445
1446impl ClientDelegate {
1447 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1448 let sessions = self.sessions.borrow();
1449 sessions
1450 .get(session_id)
1451 .context("Failed to get session")
1452 .map(|session| session.thread.clone())
1453 }
1454}