1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::Project;
14use project::agent_server_store::AgentServerCommand;
15use serde::Deserialize;
16use settings::Settings as _;
17use task::ShellBuilder;
18use util::ResultExt as _;
19use util::process::Child;
20
21use std::path::PathBuf;
22use std::process::Stdio;
23use std::{any::Any, cell::RefCell};
24use std::{path::Path, rc::Rc};
25use thiserror::Error;
26
27use anyhow::{Context as _, Result};
28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
29
30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
31use terminal::TerminalBuilder;
32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
33
34#[derive(Debug, Error)]
35#[error("Unsupported version")]
36pub struct UnsupportedVersion;
37
38pub struct AcpConnection {
39 server_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 root_dir: PathBuf,
49 child: Child,
50 session_list: Option<Rc<AcpSessionList>>,
51 _io_task: Task<Result<(), acp::Error>>,
52 _wait_task: Task<Result<()>>,
53 _stderr_task: Task<Result<()>>,
54}
55
56struct ConfigOptions {
57 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
58 tx: Rc<RefCell<watch::Sender<()>>>,
59 rx: watch::Receiver<()>,
60}
61
62impl ConfigOptions {
63 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
64 let (tx, rx) = watch::channel(());
65 Self {
66 config_options,
67 tx: Rc::new(RefCell::new(tx)),
68 rx,
69 }
70 }
71}
72
73pub struct AcpSession {
74 thread: WeakEntity<AcpThread>,
75 suppress_abort_err: bool,
76 models: Option<Rc<RefCell<acp::SessionModelState>>>,
77 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
78 config_options: Option<ConfigOptions>,
79}
80
81pub struct AcpSessionList {
82 connection: Rc<acp::ClientSideConnection>,
83 updates_tx: Rc<RefCell<watch::Sender<()>>>,
84 updates_rx: watch::Receiver<()>,
85}
86
87impl AcpSessionList {
88 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
89 let (tx, rx) = watch::channel(());
90 Self {
91 connection,
92 updates_tx: Rc::new(RefCell::new(tx)),
93 updates_rx: rx,
94 }
95 }
96
97 fn notify_update(&self) {
98 self.updates_tx.borrow_mut().send(()).ok();
99 }
100}
101
102impl AgentSessionList for AcpSessionList {
103 fn list_sessions(
104 &self,
105 request: AgentSessionListRequest,
106 cx: &mut App,
107 ) -> Task<Result<AgentSessionListResponse>> {
108 let conn = self.connection.clone();
109 cx.foreground_executor().spawn(async move {
110 let acp_request = acp::ListSessionsRequest::new()
111 .cwd(request.cwd)
112 .cursor(request.cursor);
113 let response = conn.list_sessions(acp_request).await?;
114 Ok(AgentSessionListResponse {
115 sessions: response
116 .sessions
117 .into_iter()
118 .map(|s| AgentSessionInfo {
119 session_id: s.session_id,
120 cwd: Some(s.cwd),
121 title: s.title.map(Into::into),
122 updated_at: s.updated_at.and_then(|date_str| {
123 chrono::DateTime::parse_from_rfc3339(&date_str)
124 .ok()
125 .map(|dt| dt.with_timezone(&chrono::Utc))
126 }),
127 meta: s.meta,
128 })
129 .collect(),
130 next_cursor: response.next_cursor,
131 meta: response.meta,
132 })
133 })
134 }
135
136 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
137 Some(self.updates_rx.clone())
138 }
139
140 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
141 self
142 }
143}
144
145pub async fn connect(
146 server_name: SharedString,
147 command: AgentServerCommand,
148 root_dir: &Path,
149 default_mode: Option<acp::SessionModeId>,
150 default_model: Option<acp::ModelId>,
151 default_config_options: HashMap<String, String>,
152 is_remote: bool,
153 cx: &mut AsyncApp,
154) -> Result<Rc<dyn AgentConnection>> {
155 let conn = AcpConnection::stdio(
156 server_name,
157 command.clone(),
158 root_dir,
159 default_mode,
160 default_model,
161 default_config_options,
162 is_remote,
163 cx,
164 )
165 .await?;
166 Ok(Rc::new(conn) as _)
167}
168
169const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
170
171impl AcpConnection {
172 pub async fn stdio(
173 server_name: SharedString,
174 command: AgentServerCommand,
175 root_dir: &Path,
176 default_mode: Option<acp::SessionModeId>,
177 default_model: Option<acp::ModelId>,
178 default_config_options: HashMap<String, String>,
179 is_remote: bool,
180 cx: &mut AsyncApp,
181 ) -> Result<Self> {
182 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
183 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
184 let mut child =
185 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
186 child.envs(command.env.iter().flatten());
187 if !is_remote {
188 child.current_dir(root_dir);
189 }
190 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
191
192 let stdout = child.stdout.take().context("Failed to take stdout")?;
193 let stdin = child.stdin.take().context("Failed to take stdin")?;
194 let stderr = child.stderr.take().context("Failed to take stderr")?;
195 log::debug!(
196 "Spawning external agent server: {:?}, {:?}",
197 command.path,
198 command.args
199 );
200 log::trace!("Spawned (pid: {})", child.id());
201
202 let sessions = Rc::new(RefCell::new(HashMap::default()));
203
204 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
205 (
206 release_channel::ReleaseChannel::try_global(cx)
207 .map(|release_channel| release_channel.display_name()),
208 release_channel::AppVersion::global(cx).to_string(),
209 )
210 });
211
212 let client = ClientDelegate {
213 sessions: sessions.clone(),
214 cx: cx.clone(),
215 };
216 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
217 let foreground_executor = cx.foreground_executor().clone();
218 move |fut| {
219 foreground_executor.spawn(fut).detach();
220 }
221 });
222
223 let io_task = cx.background_spawn(io_task);
224
225 let stderr_task = cx.background_spawn(async move {
226 let mut stderr = BufReader::new(stderr);
227 let mut line = String::new();
228 while let Ok(n) = stderr.read_line(&mut line).await
229 && n > 0
230 {
231 log::warn!("agent stderr: {}", line.trim());
232 line.clear();
233 }
234 Ok(())
235 });
236
237 let wait_task = cx.spawn({
238 let sessions = sessions.clone();
239 let status_fut = child.status();
240 async move |cx| {
241 let status = status_fut.await?;
242
243 for session in sessions.borrow().values() {
244 session
245 .thread
246 .update(cx, |thread, cx| {
247 thread.emit_load_error(LoadError::Exited { status }, cx)
248 })
249 .ok();
250 }
251
252 anyhow::Ok(())
253 }
254 });
255
256 let connection = Rc::new(connection);
257
258 cx.update(|cx| {
259 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
260 registry.set_active_connection(server_name.clone(), &connection, cx)
261 });
262 });
263
264 let response = connection
265 .initialize(
266 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
267 .client_capabilities(
268 acp::ClientCapabilities::new()
269 .fs(acp::FileSystemCapability::new()
270 .read_text_file(true)
271 .write_text_file(true))
272 .terminal(true)
273 // Experimental: Allow for rendering terminal output from the agents
274 .meta(acp::Meta::from_iter([
275 ("terminal_output".into(), true.into()),
276 ("terminal-auth".into(), true.into()),
277 ])),
278 )
279 .client_info(
280 acp::Implementation::new("zed", version)
281 .title(release_channel.map(ToOwned::to_owned)),
282 ),
283 )
284 .await?;
285
286 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
287 return Err(UnsupportedVersion.into());
288 }
289
290 let telemetry_id = response
291 .agent_info
292 // Use the one the agent provides if we have one
293 .map(|info| info.name.into())
294 // Otherwise, just use the name
295 .unwrap_or_else(|| server_name.clone());
296
297 let session_list = if response
298 .agent_capabilities
299 .session_capabilities
300 .list
301 .is_some()
302 {
303 Some(Rc::new(AcpSessionList::new(connection.clone())))
304 } else {
305 None
306 };
307
308 Ok(Self {
309 auth_methods: response.auth_methods,
310 root_dir: root_dir.to_owned(),
311 connection,
312 server_name,
313 telemetry_id,
314 sessions,
315 agent_capabilities: response.agent_capabilities,
316 default_mode,
317 default_model,
318 default_config_options,
319 session_list,
320 _io_task: io_task,
321 _wait_task: wait_task,
322 _stderr_task: stderr_task,
323 child,
324 })
325 }
326
327 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
328 &self.agent_capabilities.prompt_capabilities
329 }
330
331 pub fn root_dir(&self) -> &Path {
332 &self.root_dir
333 }
334}
335
336impl Drop for AcpConnection {
337 fn drop(&mut self) {
338 self.child.kill().log_err();
339 }
340}
341
342impl AgentConnection for AcpConnection {
343 fn telemetry_id(&self) -> SharedString {
344 self.telemetry_id.clone()
345 }
346
347 fn new_thread(
348 self: Rc<Self>,
349 project: Entity<Project>,
350 cwd: &Path,
351 cx: &mut App,
352 ) -> Task<Result<Entity<AcpThread>>> {
353 let name = self.server_name.clone();
354 let cwd = cwd.to_path_buf();
355 let mcp_servers = mcp_servers_for_project(&project, cx);
356
357 cx.spawn(async move |cx| {
358 let response = self.connection
359 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
360 .await
361 .map_err(map_acp_error)?;
362
363 let (modes, models, config_options) = cx.update(|cx| {
364 config_state(cx, response.modes, response.models, response.config_options)
365 });
366
367 if let Some(default_mode) = self.default_mode.clone() {
368 if let Some(modes) = modes.as_ref() {
369 let mut modes_ref = modes.borrow_mut();
370 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
371
372 if has_mode {
373 let initial_mode_id = modes_ref.current_mode_id.clone();
374
375 cx.spawn({
376 let default_mode = default_mode.clone();
377 let session_id = response.session_id.clone();
378 let modes = modes.clone();
379 let conn = self.connection.clone();
380 async move |_| {
381 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
382 .await.log_err();
383
384 if result.is_none() {
385 modes.borrow_mut().current_mode_id = initial_mode_id;
386 }
387 }
388 }).detach();
389
390 modes_ref.current_mode_id = default_mode;
391 } else {
392 let available_modes = modes_ref
393 .available_modes
394 .iter()
395 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
396 .collect::<Vec<_>>()
397 .join("\n");
398
399 log::warn!(
400 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
401 );
402 }
403 } else {
404 log::warn!(
405 "`{name}` does not support modes, but `default_mode` was set in settings.",
406 );
407 }
408 }
409
410 if let Some(default_model) = self.default_model.clone() {
411 if let Some(models) = models.as_ref() {
412 let mut models_ref = models.borrow_mut();
413 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
414
415 if has_model {
416 let initial_model_id = models_ref.current_model_id.clone();
417
418 cx.spawn({
419 let default_model = default_model.clone();
420 let session_id = response.session_id.clone();
421 let models = models.clone();
422 let conn = self.connection.clone();
423 async move |_| {
424 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
425 .await.log_err();
426
427 if result.is_none() {
428 models.borrow_mut().current_model_id = initial_model_id;
429 }
430 }
431 }).detach();
432
433 models_ref.current_model_id = default_model;
434 } else {
435 let available_models = models_ref
436 .available_models
437 .iter()
438 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
439 .collect::<Vec<_>>()
440 .join("\n");
441
442 log::warn!(
443 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
444 );
445 }
446 } else {
447 log::warn!(
448 "`{name}` does not support model selection, but `default_model` was set in settings.",
449 );
450 }
451 }
452
453 if let Some(config_opts) = config_options.as_ref() {
454 let defaults_to_apply: Vec<_> = {
455 let config_opts_ref = config_opts.borrow();
456 config_opts_ref
457 .iter()
458 .filter_map(|config_option| {
459 let default_value = self.default_config_options.get(&*config_option.id.0)?;
460
461 let is_valid = match &config_option.kind {
462 acp::SessionConfigKind::Select(select) => match &select.options {
463 acp::SessionConfigSelectOptions::Ungrouped(options) => {
464 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
465 }
466 acp::SessionConfigSelectOptions::Grouped(groups) => groups
467 .iter()
468 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
469 _ => false,
470 },
471 _ => false,
472 };
473
474 if is_valid {
475 let initial_value = match &config_option.kind {
476 acp::SessionConfigKind::Select(select) => {
477 Some(select.current_value.clone())
478 }
479 _ => None,
480 };
481 Some((config_option.id.clone(), default_value.clone(), initial_value))
482 } else {
483 log::warn!(
484 "`{}` is not a valid value for config option `{}` in {}",
485 default_value,
486 config_option.id.0,
487 name
488 );
489 None
490 }
491 })
492 .collect()
493 };
494
495 for (config_id, default_value, initial_value) in defaults_to_apply {
496 cx.spawn({
497 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
498 let session_id = response.session_id.clone();
499 let config_id_clone = config_id.clone();
500 let config_opts = config_opts.clone();
501 let conn = self.connection.clone();
502 async move |_| {
503 let result = conn
504 .set_session_config_option(
505 acp::SetSessionConfigOptionRequest::new(
506 session_id,
507 config_id_clone.clone(),
508 default_value_id,
509 ),
510 )
511 .await
512 .log_err();
513
514 if result.is_none() {
515 if let Some(initial) = initial_value {
516 let mut opts = config_opts.borrow_mut();
517 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
518 if let acp::SessionConfigKind::Select(select) =
519 &mut opt.kind
520 {
521 select.current_value = initial;
522 }
523 }
524 }
525 }
526 }
527 })
528 .detach();
529
530 let mut opts = config_opts.borrow_mut();
531 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
532 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
533 select.current_value = acp::SessionConfigValueId::new(default_value);
534 }
535 }
536 }
537 }
538
539 let action_log = cx.new(|_| ActionLog::new(project.clone()));
540 let thread: Entity<AcpThread> = cx.new(|cx| {
541 AcpThread::new(
542 self.server_name.clone(),
543 self.clone(),
544 project,
545 action_log,
546 response.session_id.clone(),
547 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
548 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
549 cx,
550 )
551 });
552
553 self.sessions.borrow_mut().insert(
554 response.session_id,
555 AcpSession {
556 thread: thread.downgrade(),
557 suppress_abort_err: false,
558 session_modes: modes,
559 models,
560 config_options: config_options.map(ConfigOptions::new),
561 },
562 );
563
564 if let Some(session_list) = &self.session_list {
565 session_list.notify_update();
566 }
567
568 Ok(thread)
569 })
570 }
571
572 fn supports_load_session(&self, cx: &App) -> bool {
573 cx.has_flag::<AcpBetaFeatureFlag>() && self.agent_capabilities.load_session
574 }
575
576 fn load_session(
577 self: Rc<Self>,
578 session: AgentSessionInfo,
579 project: Entity<Project>,
580 cwd: &Path,
581 cx: &mut App,
582 ) -> Task<Result<Entity<AcpThread>>> {
583 if !cx.has_flag::<AcpBetaFeatureFlag>() || !self.agent_capabilities.load_session {
584 return Task::ready(Err(anyhow!(LoadError::Other(
585 "Loading sessions is not supported by this agent.".into()
586 ))));
587 }
588
589 let cwd = cwd.to_path_buf();
590 let mcp_servers = mcp_servers_for_project(&project, cx);
591 let action_log = cx.new(|_| ActionLog::new(project.clone()));
592 let thread: Entity<AcpThread> = cx.new(|cx| {
593 AcpThread::new(
594 self.server_name.clone(),
595 self.clone(),
596 project,
597 action_log,
598 session.session_id.clone(),
599 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
600 cx,
601 )
602 });
603
604 self.sessions.borrow_mut().insert(
605 session.session_id.clone(),
606 AcpSession {
607 thread: thread.downgrade(),
608 suppress_abort_err: false,
609 session_modes: None,
610 models: None,
611 config_options: None,
612 },
613 );
614
615 cx.spawn(async move |cx| {
616 let response = match self
617 .connection
618 .load_session(
619 acp::LoadSessionRequest::new(session.session_id.clone(), cwd)
620 .mcp_servers(mcp_servers),
621 )
622 .await
623 {
624 Ok(response) => response,
625 Err(err) => {
626 self.sessions.borrow_mut().remove(&session.session_id);
627 return Err(map_acp_error(err));
628 }
629 };
630
631 let (modes, models, config_options) = cx.update(|cx| {
632 config_state(cx, response.modes, response.models, response.config_options)
633 });
634 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
635 session.session_modes = modes;
636 session.models = models;
637 session.config_options = config_options.map(ConfigOptions::new);
638 }
639
640 if let Some(session_list) = &self.session_list {
641 session_list.notify_update();
642 }
643
644 Ok(thread)
645 })
646 }
647
648 fn auth_methods(&self) -> &[acp::AuthMethod] {
649 &self.auth_methods
650 }
651
652 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
653 let conn = self.connection.clone();
654 cx.foreground_executor().spawn(async move {
655 conn.authenticate(acp::AuthenticateRequest::new(method_id))
656 .await?;
657 Ok(())
658 })
659 }
660
661 fn prompt(
662 &self,
663 _id: Option<acp_thread::UserMessageId>,
664 params: acp::PromptRequest,
665 cx: &mut App,
666 ) -> Task<Result<acp::PromptResponse>> {
667 let conn = self.connection.clone();
668 let sessions = self.sessions.clone();
669 let session_id = params.session_id.clone();
670 cx.foreground_executor().spawn(async move {
671 let result = conn.prompt(params).await;
672
673 let mut suppress_abort_err = false;
674
675 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
676 suppress_abort_err = session.suppress_abort_err;
677 session.suppress_abort_err = false;
678 }
679
680 match result {
681 Ok(response) => Ok(response),
682 Err(err) => {
683 if err.code == acp::ErrorCode::AuthRequired {
684 return Err(anyhow!(acp::Error::auth_required()));
685 }
686
687 if err.code != ErrorCode::InternalError {
688 anyhow::bail!(err)
689 }
690
691 let Some(data) = &err.data else {
692 anyhow::bail!(err)
693 };
694
695 // Temporary workaround until the following PR is generally available:
696 // https://github.com/google-gemini/gemini-cli/pull/6656
697
698 #[derive(Deserialize)]
699 #[serde(deny_unknown_fields)]
700 struct ErrorDetails {
701 details: Box<str>,
702 }
703
704 match serde_json::from_value(data.clone()) {
705 Ok(ErrorDetails { details }) => {
706 if suppress_abort_err
707 && (details.contains("This operation was aborted")
708 || details.contains("The user aborted a request"))
709 {
710 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
711 } else {
712 Err(anyhow!(details))
713 }
714 }
715 Err(_) => Err(anyhow!(err)),
716 }
717 }
718 }
719 })
720 }
721
722 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
723 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
724 session.suppress_abort_err = true;
725 }
726 let conn = self.connection.clone();
727 let params = acp::CancelNotification::new(session_id.clone());
728 cx.foreground_executor()
729 .spawn(async move { conn.cancel(params).await })
730 .detach();
731 }
732
733 fn session_modes(
734 &self,
735 session_id: &acp::SessionId,
736 _cx: &App,
737 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
738 let sessions = self.sessions.clone();
739 let sessions_ref = sessions.borrow();
740 let Some(session) = sessions_ref.get(session_id) else {
741 return None;
742 };
743
744 if let Some(modes) = session.session_modes.as_ref() {
745 Some(Rc::new(AcpSessionModes {
746 connection: self.connection.clone(),
747 session_id: session_id.clone(),
748 state: modes.clone(),
749 }) as _)
750 } else {
751 None
752 }
753 }
754
755 fn model_selector(
756 &self,
757 session_id: &acp::SessionId,
758 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
759 let sessions = self.sessions.clone();
760 let sessions_ref = sessions.borrow();
761 let Some(session) = sessions_ref.get(session_id) else {
762 return None;
763 };
764
765 if let Some(models) = session.models.as_ref() {
766 Some(Rc::new(AcpModelSelector::new(
767 session_id.clone(),
768 self.connection.clone(),
769 models.clone(),
770 )) as _)
771 } else {
772 None
773 }
774 }
775
776 fn session_config_options(
777 &self,
778 session_id: &acp::SessionId,
779 _cx: &App,
780 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
781 let sessions = self.sessions.borrow();
782 let session = sessions.get(session_id)?;
783
784 let config_opts = session.config_options.as_ref()?;
785
786 Some(Rc::new(AcpSessionConfigOptions {
787 session_id: session_id.clone(),
788 connection: self.connection.clone(),
789 state: config_opts.config_options.clone(),
790 watch_tx: config_opts.tx.clone(),
791 watch_rx: config_opts.rx.clone(),
792 }) as _)
793 }
794
795 fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
796 if cx.has_flag::<AcpBetaFeatureFlag>() {
797 self.session_list.clone().map(|s| s as _)
798 } else {
799 None
800 }
801 }
802
803 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
804 self
805 }
806}
807
808fn map_acp_error(err: acp::Error) -> anyhow::Error {
809 if err.code == acp::ErrorCode::AuthRequired {
810 let mut error = AuthRequired::new();
811
812 if err.message != acp::ErrorCode::AuthRequired.to_string() {
813 error = error.with_description(err.message);
814 }
815
816 anyhow!(error)
817 } else {
818 anyhow!(err)
819 }
820}
821
822fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
823 let context_server_store = project.read(cx).context_server_store().read(cx);
824 let is_local = project.read(cx).is_local();
825 context_server_store
826 .configured_server_ids()
827 .iter()
828 .filter_map(|id| {
829 let configuration = context_server_store.configuration_for_server(id)?;
830 match &*configuration {
831 project::context_server_store::ContextServerConfiguration::Custom {
832 command,
833 remote,
834 ..
835 }
836 | project::context_server_store::ContextServerConfiguration::Extension {
837 command,
838 remote,
839 ..
840 } if is_local || *remote => Some(acp::McpServer::Stdio(
841 acp::McpServerStdio::new(id.0.to_string(), &command.path)
842 .args(command.args.clone())
843 .env(if let Some(env) = command.env.as_ref() {
844 env.iter()
845 .map(|(name, value)| acp::EnvVariable::new(name, value))
846 .collect()
847 } else {
848 vec![]
849 }),
850 )),
851 project::context_server_store::ContextServerConfiguration::Http {
852 url,
853 headers,
854 timeout: _,
855 } => Some(acp::McpServer::Http(
856 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
857 headers
858 .iter()
859 .map(|(name, value)| acp::HttpHeader::new(name, value))
860 .collect(),
861 ),
862 )),
863 _ => None,
864 }
865 })
866 .collect()
867}
868
869fn config_state(
870 cx: &App,
871 modes: Option<acp::SessionModeState>,
872 models: Option<acp::SessionModelState>,
873 config_options: Option<Vec<acp::SessionConfigOption>>,
874) -> (
875 Option<Rc<RefCell<acp::SessionModeState>>>,
876 Option<Rc<RefCell<acp::SessionModelState>>>,
877 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
878) {
879 if cx.has_flag::<AcpBetaFeatureFlag>()
880 && let Some(opts) = config_options
881 {
882 return (None, None, Some(Rc::new(RefCell::new(opts))));
883 }
884
885 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
886 let models = models.map(|models| Rc::new(RefCell::new(models)));
887 (modes, models, None)
888}
889
890struct AcpSessionModes {
891 session_id: acp::SessionId,
892 connection: Rc<acp::ClientSideConnection>,
893 state: Rc<RefCell<acp::SessionModeState>>,
894}
895
896impl acp_thread::AgentSessionModes for AcpSessionModes {
897 fn current_mode(&self) -> acp::SessionModeId {
898 self.state.borrow().current_mode_id.clone()
899 }
900
901 fn all_modes(&self) -> Vec<acp::SessionMode> {
902 self.state.borrow().available_modes.clone()
903 }
904
905 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
906 let connection = self.connection.clone();
907 let session_id = self.session_id.clone();
908 let old_mode_id;
909 {
910 let mut state = self.state.borrow_mut();
911 old_mode_id = state.current_mode_id.clone();
912 state.current_mode_id = mode_id.clone();
913 };
914 let state = self.state.clone();
915 cx.foreground_executor().spawn(async move {
916 let result = connection
917 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
918 .await;
919
920 if result.is_err() {
921 state.borrow_mut().current_mode_id = old_mode_id;
922 }
923
924 result?;
925
926 Ok(())
927 })
928 }
929}
930
931struct AcpModelSelector {
932 session_id: acp::SessionId,
933 connection: Rc<acp::ClientSideConnection>,
934 state: Rc<RefCell<acp::SessionModelState>>,
935}
936
937impl AcpModelSelector {
938 fn new(
939 session_id: acp::SessionId,
940 connection: Rc<acp::ClientSideConnection>,
941 state: Rc<RefCell<acp::SessionModelState>>,
942 ) -> Self {
943 Self {
944 session_id,
945 connection,
946 state,
947 }
948 }
949}
950
951impl acp_thread::AgentModelSelector for AcpModelSelector {
952 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
953 Task::ready(Ok(acp_thread::AgentModelList::Flat(
954 self.state
955 .borrow()
956 .available_models
957 .clone()
958 .into_iter()
959 .map(acp_thread::AgentModelInfo::from)
960 .collect(),
961 )))
962 }
963
964 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
965 let connection = self.connection.clone();
966 let session_id = self.session_id.clone();
967 let old_model_id;
968 {
969 let mut state = self.state.borrow_mut();
970 old_model_id = state.current_model_id.clone();
971 state.current_model_id = model_id.clone();
972 };
973 let state = self.state.clone();
974 cx.foreground_executor().spawn(async move {
975 let result = connection
976 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
977 .await;
978
979 if result.is_err() {
980 state.borrow_mut().current_model_id = old_model_id;
981 }
982
983 result?;
984
985 Ok(())
986 })
987 }
988
989 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
990 let state = self.state.borrow();
991 Task::ready(
992 state
993 .available_models
994 .iter()
995 .find(|m| m.model_id == state.current_model_id)
996 .cloned()
997 .map(acp_thread::AgentModelInfo::from)
998 .ok_or_else(|| anyhow::anyhow!("Model not found")),
999 )
1000 }
1001}
1002
1003struct AcpSessionConfigOptions {
1004 session_id: acp::SessionId,
1005 connection: Rc<acp::ClientSideConnection>,
1006 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1007 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1008 watch_rx: watch::Receiver<()>,
1009}
1010
1011impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1012 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1013 self.state.borrow().clone()
1014 }
1015
1016 fn set_config_option(
1017 &self,
1018 config_id: acp::SessionConfigId,
1019 value: acp::SessionConfigValueId,
1020 cx: &mut App,
1021 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1022 let connection = self.connection.clone();
1023 let session_id = self.session_id.clone();
1024 let state = self.state.clone();
1025
1026 let watch_tx = self.watch_tx.clone();
1027
1028 cx.foreground_executor().spawn(async move {
1029 let response = connection
1030 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1031 session_id, config_id, value,
1032 ))
1033 .await?;
1034
1035 *state.borrow_mut() = response.config_options.clone();
1036 watch_tx.borrow_mut().send(()).ok();
1037 Ok(response.config_options)
1038 })
1039 }
1040
1041 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1042 Some(self.watch_rx.clone())
1043 }
1044}
1045
1046struct ClientDelegate {
1047 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1048 cx: AsyncApp,
1049}
1050
1051#[async_trait::async_trait(?Send)]
1052impl acp::Client for ClientDelegate {
1053 async fn request_permission(
1054 &self,
1055 arguments: acp::RequestPermissionRequest,
1056 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1057 let respect_always_allow_setting;
1058 let thread;
1059 {
1060 let sessions_ref = self.sessions.borrow();
1061 let session = sessions_ref
1062 .get(&arguments.session_id)
1063 .context("Failed to get session")?;
1064 respect_always_allow_setting = session.session_modes.is_none();
1065 thread = session.thread.clone();
1066 }
1067
1068 let cx = &mut self.cx.clone();
1069
1070 let task = thread.update(cx, |thread, cx| {
1071 thread.request_tool_call_authorization(
1072 arguments.tool_call,
1073 acp_thread::PermissionOptions::Flat(arguments.options),
1074 respect_always_allow_setting,
1075 cx,
1076 )
1077 })??;
1078
1079 let outcome = task.await;
1080
1081 Ok(acp::RequestPermissionResponse::new(outcome))
1082 }
1083
1084 async fn write_text_file(
1085 &self,
1086 arguments: acp::WriteTextFileRequest,
1087 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1088 let cx = &mut self.cx.clone();
1089 let task = self
1090 .session_thread(&arguments.session_id)?
1091 .update(cx, |thread, cx| {
1092 thread.write_text_file(arguments.path, arguments.content, cx)
1093 })?;
1094
1095 task.await?;
1096
1097 Ok(Default::default())
1098 }
1099
1100 async fn read_text_file(
1101 &self,
1102 arguments: acp::ReadTextFileRequest,
1103 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1104 let task = self.session_thread(&arguments.session_id)?.update(
1105 &mut self.cx.clone(),
1106 |thread, cx| {
1107 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1108 },
1109 )?;
1110
1111 let content = task.await?;
1112
1113 Ok(acp::ReadTextFileResponse::new(content))
1114 }
1115
1116 async fn session_notification(
1117 &self,
1118 notification: acp::SessionNotification,
1119 ) -> Result<(), acp::Error> {
1120 let sessions = self.sessions.borrow();
1121 let session = sessions
1122 .get(¬ification.session_id)
1123 .context("Failed to get session")?;
1124
1125 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1126 current_mode_id,
1127 ..
1128 }) = ¬ification.update
1129 {
1130 if let Some(session_modes) = &session.session_modes {
1131 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1132 } else {
1133 log::error!(
1134 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
1135 );
1136 }
1137 }
1138
1139 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1140 config_options,
1141 ..
1142 }) = ¬ification.update
1143 {
1144 if let Some(opts) = &session.config_options {
1145 *opts.config_options.borrow_mut() = config_options.clone();
1146 opts.tx.borrow_mut().send(()).ok();
1147 } else {
1148 log::error!(
1149 "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
1150 );
1151 }
1152 }
1153
1154 // Clone so we can inspect meta both before and after handing off to the thread
1155 let update_clone = notification.update.clone();
1156
1157 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1158 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1159 if let Some(meta) = &tc.meta {
1160 if let Some(terminal_info) = meta.get("terminal_info") {
1161 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1162 {
1163 let terminal_id = acp::TerminalId::new(id_str);
1164 let cwd = terminal_info
1165 .get("cwd")
1166 .and_then(|v| v.as_str().map(PathBuf::from));
1167
1168 // Create a minimal display-only lower-level terminal and register it.
1169 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1170 let builder = TerminalBuilder::new_display_only(
1171 CursorShape::default(),
1172 AlternateScroll::On,
1173 None,
1174 0,
1175 )?;
1176 let lower = cx.new(|cx| builder.subscribe(cx));
1177 thread.on_terminal_provider_event(
1178 TerminalProviderEvent::Created {
1179 terminal_id,
1180 label: tc.title.clone(),
1181 cwd,
1182 output_byte_limit: None,
1183 terminal: lower,
1184 },
1185 cx,
1186 );
1187 anyhow::Ok(())
1188 });
1189 }
1190 }
1191 }
1192 }
1193
1194 // Forward the update to the acp_thread as usual.
1195 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1196 thread.handle_session_update(notification.update.clone(), cx)
1197 })??;
1198
1199 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1200 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1201 if let Some(meta) = &tcu.meta {
1202 if let Some(term_out) = meta.get("terminal_output") {
1203 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1204 let terminal_id = acp::TerminalId::new(id_str);
1205 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1206 let data = s.as_bytes().to_vec();
1207 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1208 thread.on_terminal_provider_event(
1209 TerminalProviderEvent::Output { terminal_id, data },
1210 cx,
1211 );
1212 });
1213 }
1214 }
1215 }
1216
1217 // terminal_exit
1218 if let Some(term_exit) = meta.get("terminal_exit") {
1219 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1220 let terminal_id = acp::TerminalId::new(id_str);
1221 let status = acp::TerminalExitStatus::new()
1222 .exit_code(
1223 term_exit
1224 .get("exit_code")
1225 .and_then(|v| v.as_u64())
1226 .map(|i| i as u32),
1227 )
1228 .signal(
1229 term_exit
1230 .get("signal")
1231 .and_then(|v| v.as_str().map(|s| s.to_string())),
1232 );
1233
1234 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1235 thread.on_terminal_provider_event(
1236 TerminalProviderEvent::Exit {
1237 terminal_id,
1238 status,
1239 },
1240 cx,
1241 );
1242 });
1243 }
1244 }
1245 }
1246 }
1247
1248 Ok(())
1249 }
1250
1251 async fn create_terminal(
1252 &self,
1253 args: acp::CreateTerminalRequest,
1254 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1255 let thread = self.session_thread(&args.session_id)?;
1256 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1257
1258 let terminal_entity = acp_thread::create_terminal_entity(
1259 args.command.clone(),
1260 &args.args,
1261 args.env
1262 .into_iter()
1263 .map(|env| (env.name, env.value))
1264 .collect(),
1265 args.cwd.clone(),
1266 &project,
1267 &mut self.cx.clone(),
1268 )
1269 .await?;
1270
1271 // Register with renderer
1272 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1273 thread.register_terminal_created(
1274 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1275 format!("{} {}", args.command, args.args.join(" ")),
1276 args.cwd.clone(),
1277 args.output_byte_limit,
1278 terminal_entity,
1279 cx,
1280 )
1281 })?;
1282 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1283 Ok(acp::CreateTerminalResponse::new(terminal_id))
1284 }
1285
1286 async fn kill_terminal_command(
1287 &self,
1288 args: acp::KillTerminalCommandRequest,
1289 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1290 self.session_thread(&args.session_id)?
1291 .update(&mut self.cx.clone(), |thread, cx| {
1292 thread.kill_terminal(args.terminal_id, cx)
1293 })??;
1294
1295 Ok(Default::default())
1296 }
1297
1298 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1299 Err(acp::Error::method_not_found())
1300 }
1301
1302 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1303 Err(acp::Error::method_not_found())
1304 }
1305
1306 async fn release_terminal(
1307 &self,
1308 args: acp::ReleaseTerminalRequest,
1309 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1310 self.session_thread(&args.session_id)?
1311 .update(&mut self.cx.clone(), |thread, cx| {
1312 thread.release_terminal(args.terminal_id, cx)
1313 })??;
1314
1315 Ok(Default::default())
1316 }
1317
1318 async fn terminal_output(
1319 &self,
1320 args: acp::TerminalOutputRequest,
1321 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1322 self.session_thread(&args.session_id)?
1323 .read_with(&mut self.cx.clone(), |thread, cx| {
1324 let out = thread
1325 .terminal(args.terminal_id)?
1326 .read(cx)
1327 .current_output(cx);
1328
1329 Ok(out)
1330 })?
1331 }
1332
1333 async fn wait_for_terminal_exit(
1334 &self,
1335 args: acp::WaitForTerminalExitRequest,
1336 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1337 let exit_status = self
1338 .session_thread(&args.session_id)?
1339 .update(&mut self.cx.clone(), |thread, cx| {
1340 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1341 })??
1342 .await;
1343
1344 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1345 }
1346}
1347
1348impl ClientDelegate {
1349 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1350 let sessions = self.sessions.borrow();
1351 sessions
1352 .get(session_id)
1353 .context("Failed to get session")
1354 .map(|session| session.thread.clone())
1355 }
1356}