1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::Project;
14use project::agent_server_store::AgentServerCommand;
15use serde::Deserialize;
16use settings::Settings as _;
17use task::ShellBuilder;
18use util::ResultExt as _;
19use util::process::Child;
20
21use std::path::PathBuf;
22use std::process::Stdio;
23use std::{any::Any, cell::RefCell};
24use std::{path::Path, rc::Rc};
25use thiserror::Error;
26
27use anyhow::{Context as _, Result};
28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
29
30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
31use terminal::TerminalBuilder;
32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
33
34#[derive(Debug, Error)]
35#[error("Unsupported version")]
36pub struct UnsupportedVersion;
37
38pub struct AcpConnection {
39 server_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 root_dir: PathBuf,
49 child: Child,
50 session_list: Option<Rc<AcpSessionList>>,
51 _io_task: Task<Result<(), acp::Error>>,
52 _wait_task: Task<Result<()>>,
53 _stderr_task: Task<Result<()>>,
54}
55
56struct ConfigOptions {
57 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
58 tx: Rc<RefCell<watch::Sender<()>>>,
59 rx: watch::Receiver<()>,
60}
61
62impl ConfigOptions {
63 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
64 let (tx, rx) = watch::channel(());
65 Self {
66 config_options,
67 tx: Rc::new(RefCell::new(tx)),
68 rx,
69 }
70 }
71}
72
73pub struct AcpSession {
74 thread: WeakEntity<AcpThread>,
75 suppress_abort_err: bool,
76 models: Option<Rc<RefCell<acp::SessionModelState>>>,
77 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
78 config_options: Option<ConfigOptions>,
79}
80
81pub struct AcpSessionList {
82 connection: Rc<acp::ClientSideConnection>,
83 updates_tx: Rc<RefCell<watch::Sender<()>>>,
84 updates_rx: watch::Receiver<()>,
85}
86
87impl AcpSessionList {
88 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
89 let (tx, rx) = watch::channel(());
90 Self {
91 connection,
92 updates_tx: Rc::new(RefCell::new(tx)),
93 updates_rx: rx,
94 }
95 }
96
97 fn notify_update(&self) {
98 self.updates_tx.borrow_mut().send(()).ok();
99 }
100}
101
102impl AgentSessionList for AcpSessionList {
103 fn list_sessions(
104 &self,
105 request: AgentSessionListRequest,
106 cx: &mut App,
107 ) -> Task<Result<AgentSessionListResponse>> {
108 let conn = self.connection.clone();
109 cx.foreground_executor().spawn(async move {
110 let acp_request = acp::ListSessionsRequest::new()
111 .cwd(request.cwd)
112 .cursor(request.cursor);
113 let response = conn.list_sessions(acp_request).await?;
114 Ok(AgentSessionListResponse {
115 sessions: response
116 .sessions
117 .into_iter()
118 .map(|s| AgentSessionInfo {
119 session_id: s.session_id,
120 cwd: Some(s.cwd),
121 title: s.title.map(Into::into),
122 updated_at: s.updated_at.and_then(|date_str| {
123 chrono::DateTime::parse_from_rfc3339(&date_str)
124 .ok()
125 .map(|dt| dt.with_timezone(&chrono::Utc))
126 }),
127 meta: s.meta,
128 })
129 .collect(),
130 next_cursor: response.next_cursor,
131 meta: response.meta,
132 })
133 })
134 }
135
136 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
137 Some(self.updates_rx.clone())
138 }
139
140 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
141 self
142 }
143}
144
145pub async fn connect(
146 server_name: SharedString,
147 command: AgentServerCommand,
148 root_dir: &Path,
149 default_mode: Option<acp::SessionModeId>,
150 default_model: Option<acp::ModelId>,
151 default_config_options: HashMap<String, String>,
152 is_remote: bool,
153 cx: &mut AsyncApp,
154) -> Result<Rc<dyn AgentConnection>> {
155 let conn = AcpConnection::stdio(
156 server_name,
157 command.clone(),
158 root_dir,
159 default_mode,
160 default_model,
161 default_config_options,
162 is_remote,
163 cx,
164 )
165 .await?;
166 Ok(Rc::new(conn) as _)
167}
168
169const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
170
171impl AcpConnection {
172 pub async fn stdio(
173 server_name: SharedString,
174 command: AgentServerCommand,
175 root_dir: &Path,
176 default_mode: Option<acp::SessionModeId>,
177 default_model: Option<acp::ModelId>,
178 default_config_options: HashMap<String, String>,
179 is_remote: bool,
180 cx: &mut AsyncApp,
181 ) -> Result<Self> {
182 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
183 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
184 let mut child =
185 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
186 child.envs(command.env.iter().flatten());
187 if !is_remote {
188 child.current_dir(root_dir);
189 }
190 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
191
192 let stdout = child.stdout.take().context("Failed to take stdout")?;
193 let stdin = child.stdin.take().context("Failed to take stdin")?;
194 let stderr = child.stderr.take().context("Failed to take stderr")?;
195 log::debug!(
196 "Spawning external agent server: {:?}, {:?}",
197 command.path,
198 command.args
199 );
200 log::trace!("Spawned (pid: {})", child.id());
201
202 let sessions = Rc::new(RefCell::new(HashMap::default()));
203
204 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
205 (
206 release_channel::ReleaseChannel::try_global(cx)
207 .map(|release_channel| release_channel.display_name()),
208 release_channel::AppVersion::global(cx).to_string(),
209 )
210 });
211
212 let client = ClientDelegate {
213 sessions: sessions.clone(),
214 cx: cx.clone(),
215 };
216 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
217 let foreground_executor = cx.foreground_executor().clone();
218 move |fut| {
219 foreground_executor.spawn(fut).detach();
220 }
221 });
222
223 let io_task = cx.background_spawn(io_task);
224
225 let stderr_task = cx.background_spawn(async move {
226 let mut stderr = BufReader::new(stderr);
227 let mut line = String::new();
228 while let Ok(n) = stderr.read_line(&mut line).await
229 && n > 0
230 {
231 log::warn!("agent stderr: {}", line.trim());
232 line.clear();
233 }
234 Ok(())
235 });
236
237 let wait_task = cx.spawn({
238 let sessions = sessions.clone();
239 let status_fut = child.status();
240 async move |cx| {
241 let status = status_fut.await?;
242
243 for session in sessions.borrow().values() {
244 session
245 .thread
246 .update(cx, |thread, cx| {
247 thread.emit_load_error(LoadError::Exited { status }, cx)
248 })
249 .ok();
250 }
251
252 anyhow::Ok(())
253 }
254 });
255
256 let connection = Rc::new(connection);
257
258 cx.update(|cx| {
259 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
260 registry.set_active_connection(server_name.clone(), &connection, cx)
261 });
262 });
263
264 let response = connection
265 .initialize(
266 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
267 .client_capabilities(
268 acp::ClientCapabilities::new()
269 .fs(acp::FileSystemCapability::new()
270 .read_text_file(true)
271 .write_text_file(true))
272 .terminal(true)
273 // Experimental: Allow for rendering terminal output from the agents
274 .meta(acp::Meta::from_iter([
275 ("terminal_output".into(), true.into()),
276 ("terminal-auth".into(), true.into()),
277 ])),
278 )
279 .client_info(
280 acp::Implementation::new("zed", version)
281 .title(release_channel.map(ToOwned::to_owned)),
282 ),
283 )
284 .await?;
285
286 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
287 return Err(UnsupportedVersion.into());
288 }
289
290 let telemetry_id = response
291 .agent_info
292 // Use the one the agent provides if we have one
293 .map(|info| info.name.into())
294 // Otherwise, just use the name
295 .unwrap_or_else(|| server_name.clone());
296
297 let session_list = if response
298 .agent_capabilities
299 .session_capabilities
300 .list
301 .is_some()
302 {
303 Some(Rc::new(AcpSessionList::new(connection.clone())))
304 } else {
305 None
306 };
307
308 Ok(Self {
309 auth_methods: response.auth_methods,
310 root_dir: root_dir.to_owned(),
311 connection,
312 server_name,
313 telemetry_id,
314 sessions,
315 agent_capabilities: response.agent_capabilities,
316 default_mode,
317 default_model,
318 default_config_options,
319 session_list,
320 _io_task: io_task,
321 _wait_task: wait_task,
322 _stderr_task: stderr_task,
323 child,
324 })
325 }
326
327 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
328 &self.agent_capabilities.prompt_capabilities
329 }
330
331 pub fn root_dir(&self) -> &Path {
332 &self.root_dir
333 }
334}
335
336impl Drop for AcpConnection {
337 fn drop(&mut self) {
338 self.child.kill().log_err();
339 }
340}
341
342impl AgentConnection for AcpConnection {
343 fn telemetry_id(&self) -> SharedString {
344 self.telemetry_id.clone()
345 }
346
347 fn new_thread(
348 self: Rc<Self>,
349 project: Entity<Project>,
350 cwd: &Path,
351 cx: &mut App,
352 ) -> Task<Result<Entity<AcpThread>>> {
353 let name = self.server_name.clone();
354 let conn = self.connection.clone();
355 let sessions = self.sessions.clone();
356 let default_mode = self.default_mode.clone();
357 let default_model = self.default_model.clone();
358 let default_config_options = self.default_config_options.clone();
359 let cwd = cwd.to_path_buf();
360 let context_server_store = project.read(cx).context_server_store().read(cx);
361 let is_local = project.read(cx).is_local();
362 let mcp_servers = context_server_store
363 .configured_server_ids()
364 .iter()
365 .filter_map(|id| {
366 let configuration = context_server_store.configuration_for_server(id)?;
367 match &*configuration {
368 project::context_server_store::ContextServerConfiguration::Custom {
369 command,
370 remote,
371 ..
372 }
373 | project::context_server_store::ContextServerConfiguration::Extension {
374 command,
375 remote,
376 ..
377 } if is_local || *remote => Some(acp::McpServer::Stdio(
378 acp::McpServerStdio::new(id.0.to_string(), &command.path)
379 .args(command.args.clone())
380 .env(if let Some(env) = command.env.as_ref() {
381 env.iter()
382 .map(|(name, value)| acp::EnvVariable::new(name, value))
383 .collect()
384 } else {
385 vec![]
386 }),
387 )),
388 project::context_server_store::ContextServerConfiguration::Http {
389 url,
390 headers,
391 timeout: _,
392 } => Some(acp::McpServer::Http(
393 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
394 headers
395 .iter()
396 .map(|(name, value)| acp::HttpHeader::new(name, value))
397 .collect(),
398 ),
399 )),
400 _ => None,
401 }
402 })
403 .collect();
404
405 cx.spawn(async move |cx| {
406 let response = conn
407 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
408 .await
409 .map_err(|err| {
410 if err.code == acp::ErrorCode::AuthRequired {
411 let mut error = AuthRequired::new();
412
413 if err.message != acp::ErrorCode::AuthRequired.to_string() {
414 error = error.with_description(err.message);
415 }
416
417 anyhow!(error)
418 } else {
419 anyhow!(err)
420 }
421 })?;
422
423 let use_config_options = cx.update(|cx| cx.has_flag::<AcpBetaFeatureFlag>());
424
425 // Config options take precedence over legacy modes/models
426 let (modes, models, config_options) = if use_config_options && let Some(opts) = response.config_options {
427 (
428 None,
429 None,
430 Some(Rc::new(RefCell::new(opts))),
431 )
432 } else {
433 // Fall back to legacy modes/models
434 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
435 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
436 (modes, models, None)
437 };
438
439 if let Some(default_mode) = default_mode {
440 if let Some(modes) = modes.as_ref() {
441 let mut modes_ref = modes.borrow_mut();
442 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
443
444 if has_mode {
445 let initial_mode_id = modes_ref.current_mode_id.clone();
446
447 cx.spawn({
448 let default_mode = default_mode.clone();
449 let session_id = response.session_id.clone();
450 let modes = modes.clone();
451 let conn = conn.clone();
452 async move |_| {
453 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
454 .await.log_err();
455
456 if result.is_none() {
457 modes.borrow_mut().current_mode_id = initial_mode_id;
458 }
459 }
460 }).detach();
461
462 modes_ref.current_mode_id = default_mode;
463 } else {
464 let available_modes = modes_ref
465 .available_modes
466 .iter()
467 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
468 .collect::<Vec<_>>()
469 .join("\n");
470
471 log::warn!(
472 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
473 );
474 }
475 } else {
476 log::warn!(
477 "`{name}` does not support modes, but `default_mode` was set in settings.",
478 );
479 }
480 }
481
482 if let Some(default_model) = default_model {
483 if let Some(models) = models.as_ref() {
484 let mut models_ref = models.borrow_mut();
485 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
486
487 if has_model {
488 let initial_model_id = models_ref.current_model_id.clone();
489
490 cx.spawn({
491 let default_model = default_model.clone();
492 let session_id = response.session_id.clone();
493 let models = models.clone();
494 let conn = conn.clone();
495 async move |_| {
496 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
497 .await.log_err();
498
499 if result.is_none() {
500 models.borrow_mut().current_model_id = initial_model_id;
501 }
502 }
503 }).detach();
504
505 models_ref.current_model_id = default_model;
506 } else {
507 let available_models = models_ref
508 .available_models
509 .iter()
510 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
511 .collect::<Vec<_>>()
512 .join("\n");
513
514 log::warn!(
515 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
516 );
517 }
518 } else {
519 log::warn!(
520 "`{name}` does not support model selection, but `default_model` was set in settings.",
521 );
522 }
523 }
524
525 if let Some(config_opts) = config_options.as_ref() {
526 let defaults_to_apply: Vec<_> = {
527 let config_opts_ref = config_opts.borrow();
528 config_opts_ref
529 .iter()
530 .filter_map(|config_option| {
531 let default_value = default_config_options.get(&*config_option.id.0)?;
532
533 let is_valid = match &config_option.kind {
534 acp::SessionConfigKind::Select(select) => match &select.options {
535 acp::SessionConfigSelectOptions::Ungrouped(options) => {
536 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
537 }
538 acp::SessionConfigSelectOptions::Grouped(groups) => groups
539 .iter()
540 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
541 _ => false,
542 },
543 _ => false,
544 };
545
546 if is_valid {
547 let initial_value = match &config_option.kind {
548 acp::SessionConfigKind::Select(select) => {
549 Some(select.current_value.clone())
550 }
551 _ => None,
552 };
553 Some((config_option.id.clone(), default_value.clone(), initial_value))
554 } else {
555 log::warn!(
556 "`{}` is not a valid value for config option `{}` in {}",
557 default_value,
558 config_option.id.0,
559 name
560 );
561 None
562 }
563 })
564 .collect()
565 };
566
567 for (config_id, default_value, initial_value) in defaults_to_apply {
568 cx.spawn({
569 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
570 let session_id = response.session_id.clone();
571 let config_id_clone = config_id.clone();
572 let config_opts = config_opts.clone();
573 let conn = conn.clone();
574 async move |_| {
575 let result = conn
576 .set_session_config_option(
577 acp::SetSessionConfigOptionRequest::new(
578 session_id,
579 config_id_clone.clone(),
580 default_value_id,
581 ),
582 )
583 .await
584 .log_err();
585
586 if result.is_none() {
587 if let Some(initial) = initial_value {
588 let mut opts = config_opts.borrow_mut();
589 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
590 if let acp::SessionConfigKind::Select(select) =
591 &mut opt.kind
592 {
593 select.current_value = initial;
594 }
595 }
596 }
597 }
598 }
599 })
600 .detach();
601
602 let mut opts = config_opts.borrow_mut();
603 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
604 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
605 select.current_value = acp::SessionConfigValueId::new(default_value);
606 }
607 }
608 }
609 }
610
611 let session_id = response.session_id;
612 let action_log = cx.new(|_| ActionLog::new(project.clone()));
613 let thread: Entity<AcpThread> = cx.new(|cx| {
614 AcpThread::new(
615 self.server_name.clone(),
616 self.clone(),
617 project,
618 action_log,
619 session_id.clone(),
620 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
621 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
622 cx,
623 )
624 });
625
626
627 let session = AcpSession {
628 thread: thread.downgrade(),
629 suppress_abort_err: false,
630 session_modes: modes,
631 models,
632 config_options: config_options.map(|opts| ConfigOptions::new(opts))
633 };
634 sessions.borrow_mut().insert(session_id, session);
635
636 if let Some(session_list) = &self.session_list {
637 session_list.notify_update();
638 }
639
640 Ok(thread)
641 })
642 }
643
644 fn auth_methods(&self) -> &[acp::AuthMethod] {
645 &self.auth_methods
646 }
647
648 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
649 let conn = self.connection.clone();
650 cx.foreground_executor().spawn(async move {
651 conn.authenticate(acp::AuthenticateRequest::new(method_id))
652 .await?;
653 Ok(())
654 })
655 }
656
657 fn prompt(
658 &self,
659 _id: Option<acp_thread::UserMessageId>,
660 params: acp::PromptRequest,
661 cx: &mut App,
662 ) -> Task<Result<acp::PromptResponse>> {
663 let conn = self.connection.clone();
664 let sessions = self.sessions.clone();
665 let session_id = params.session_id.clone();
666 cx.foreground_executor().spawn(async move {
667 let result = conn.prompt(params).await;
668
669 let mut suppress_abort_err = false;
670
671 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
672 suppress_abort_err = session.suppress_abort_err;
673 session.suppress_abort_err = false;
674 }
675
676 match result {
677 Ok(response) => Ok(response),
678 Err(err) => {
679 if err.code == acp::ErrorCode::AuthRequired {
680 return Err(anyhow!(acp::Error::auth_required()));
681 }
682
683 if err.code != ErrorCode::InternalError {
684 anyhow::bail!(err)
685 }
686
687 let Some(data) = &err.data else {
688 anyhow::bail!(err)
689 };
690
691 // Temporary workaround until the following PR is generally available:
692 // https://github.com/google-gemini/gemini-cli/pull/6656
693
694 #[derive(Deserialize)]
695 #[serde(deny_unknown_fields)]
696 struct ErrorDetails {
697 details: Box<str>,
698 }
699
700 match serde_json::from_value(data.clone()) {
701 Ok(ErrorDetails { details }) => {
702 if suppress_abort_err
703 && (details.contains("This operation was aborted")
704 || details.contains("The user aborted a request"))
705 {
706 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
707 } else {
708 Err(anyhow!(details))
709 }
710 }
711 Err(_) => Err(anyhow!(err)),
712 }
713 }
714 }
715 })
716 }
717
718 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
719 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
720 session.suppress_abort_err = true;
721 }
722 let conn = self.connection.clone();
723 let params = acp::CancelNotification::new(session_id.clone());
724 cx.foreground_executor()
725 .spawn(async move { conn.cancel(params).await })
726 .detach();
727 }
728
729 fn session_modes(
730 &self,
731 session_id: &acp::SessionId,
732 _cx: &App,
733 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
734 let sessions = self.sessions.clone();
735 let sessions_ref = sessions.borrow();
736 let Some(session) = sessions_ref.get(session_id) else {
737 return None;
738 };
739
740 if let Some(modes) = session.session_modes.as_ref() {
741 Some(Rc::new(AcpSessionModes {
742 connection: self.connection.clone(),
743 session_id: session_id.clone(),
744 state: modes.clone(),
745 }) as _)
746 } else {
747 None
748 }
749 }
750
751 fn model_selector(
752 &self,
753 session_id: &acp::SessionId,
754 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
755 let sessions = self.sessions.clone();
756 let sessions_ref = sessions.borrow();
757 let Some(session) = sessions_ref.get(session_id) else {
758 return None;
759 };
760
761 if let Some(models) = session.models.as_ref() {
762 Some(Rc::new(AcpModelSelector::new(
763 session_id.clone(),
764 self.connection.clone(),
765 models.clone(),
766 )) as _)
767 } else {
768 None
769 }
770 }
771
772 fn session_config_options(
773 &self,
774 session_id: &acp::SessionId,
775 _cx: &App,
776 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
777 let sessions = self.sessions.borrow();
778 let session = sessions.get(session_id)?;
779
780 let config_opts = session.config_options.as_ref()?;
781
782 Some(Rc::new(AcpSessionConfigOptions {
783 session_id: session_id.clone(),
784 connection: self.connection.clone(),
785 state: config_opts.config_options.clone(),
786 watch_tx: config_opts.tx.clone(),
787 watch_rx: config_opts.rx.clone(),
788 }) as _)
789 }
790
791 fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
792 if cx.has_flag::<AcpBetaFeatureFlag>() {
793 self.session_list.clone().map(|s| s as _)
794 } else {
795 None
796 }
797 }
798
799 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
800 self
801 }
802}
803
804struct AcpSessionModes {
805 session_id: acp::SessionId,
806 connection: Rc<acp::ClientSideConnection>,
807 state: Rc<RefCell<acp::SessionModeState>>,
808}
809
810impl acp_thread::AgentSessionModes for AcpSessionModes {
811 fn current_mode(&self) -> acp::SessionModeId {
812 self.state.borrow().current_mode_id.clone()
813 }
814
815 fn all_modes(&self) -> Vec<acp::SessionMode> {
816 self.state.borrow().available_modes.clone()
817 }
818
819 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
820 let connection = self.connection.clone();
821 let session_id = self.session_id.clone();
822 let old_mode_id;
823 {
824 let mut state = self.state.borrow_mut();
825 old_mode_id = state.current_mode_id.clone();
826 state.current_mode_id = mode_id.clone();
827 };
828 let state = self.state.clone();
829 cx.foreground_executor().spawn(async move {
830 let result = connection
831 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
832 .await;
833
834 if result.is_err() {
835 state.borrow_mut().current_mode_id = old_mode_id;
836 }
837
838 result?;
839
840 Ok(())
841 })
842 }
843}
844
845struct AcpModelSelector {
846 session_id: acp::SessionId,
847 connection: Rc<acp::ClientSideConnection>,
848 state: Rc<RefCell<acp::SessionModelState>>,
849}
850
851impl AcpModelSelector {
852 fn new(
853 session_id: acp::SessionId,
854 connection: Rc<acp::ClientSideConnection>,
855 state: Rc<RefCell<acp::SessionModelState>>,
856 ) -> Self {
857 Self {
858 session_id,
859 connection,
860 state,
861 }
862 }
863}
864
865impl acp_thread::AgentModelSelector for AcpModelSelector {
866 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
867 Task::ready(Ok(acp_thread::AgentModelList::Flat(
868 self.state
869 .borrow()
870 .available_models
871 .clone()
872 .into_iter()
873 .map(acp_thread::AgentModelInfo::from)
874 .collect(),
875 )))
876 }
877
878 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
879 let connection = self.connection.clone();
880 let session_id = self.session_id.clone();
881 let old_model_id;
882 {
883 let mut state = self.state.borrow_mut();
884 old_model_id = state.current_model_id.clone();
885 state.current_model_id = model_id.clone();
886 };
887 let state = self.state.clone();
888 cx.foreground_executor().spawn(async move {
889 let result = connection
890 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
891 .await;
892
893 if result.is_err() {
894 state.borrow_mut().current_model_id = old_model_id;
895 }
896
897 result?;
898
899 Ok(())
900 })
901 }
902
903 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
904 let state = self.state.borrow();
905 Task::ready(
906 state
907 .available_models
908 .iter()
909 .find(|m| m.model_id == state.current_model_id)
910 .cloned()
911 .map(acp_thread::AgentModelInfo::from)
912 .ok_or_else(|| anyhow::anyhow!("Model not found")),
913 )
914 }
915}
916
917struct AcpSessionConfigOptions {
918 session_id: acp::SessionId,
919 connection: Rc<acp::ClientSideConnection>,
920 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
921 watch_tx: Rc<RefCell<watch::Sender<()>>>,
922 watch_rx: watch::Receiver<()>,
923}
924
925impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
926 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
927 self.state.borrow().clone()
928 }
929
930 fn set_config_option(
931 &self,
932 config_id: acp::SessionConfigId,
933 value: acp::SessionConfigValueId,
934 cx: &mut App,
935 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
936 let connection = self.connection.clone();
937 let session_id = self.session_id.clone();
938 let state = self.state.clone();
939
940 let watch_tx = self.watch_tx.clone();
941
942 cx.foreground_executor().spawn(async move {
943 let response = connection
944 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
945 session_id, config_id, value,
946 ))
947 .await?;
948
949 *state.borrow_mut() = response.config_options.clone();
950 watch_tx.borrow_mut().send(()).ok();
951 Ok(response.config_options)
952 })
953 }
954
955 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
956 Some(self.watch_rx.clone())
957 }
958}
959
960struct ClientDelegate {
961 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
962 cx: AsyncApp,
963}
964
965#[async_trait::async_trait(?Send)]
966impl acp::Client for ClientDelegate {
967 async fn request_permission(
968 &self,
969 arguments: acp::RequestPermissionRequest,
970 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
971 let respect_always_allow_setting;
972 let thread;
973 {
974 let sessions_ref = self.sessions.borrow();
975 let session = sessions_ref
976 .get(&arguments.session_id)
977 .context("Failed to get session")?;
978 respect_always_allow_setting = session.session_modes.is_none();
979 thread = session.thread.clone();
980 }
981
982 let cx = &mut self.cx.clone();
983
984 let task = thread.update(cx, |thread, cx| {
985 thread.request_tool_call_authorization(
986 arguments.tool_call,
987 arguments.options,
988 respect_always_allow_setting,
989 cx,
990 )
991 })??;
992
993 let outcome = task.await;
994
995 Ok(acp::RequestPermissionResponse::new(outcome))
996 }
997
998 async fn write_text_file(
999 &self,
1000 arguments: acp::WriteTextFileRequest,
1001 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1002 let cx = &mut self.cx.clone();
1003 let task = self
1004 .session_thread(&arguments.session_id)?
1005 .update(cx, |thread, cx| {
1006 thread.write_text_file(arguments.path, arguments.content, cx)
1007 })?;
1008
1009 task.await?;
1010
1011 Ok(Default::default())
1012 }
1013
1014 async fn read_text_file(
1015 &self,
1016 arguments: acp::ReadTextFileRequest,
1017 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1018 let task = self.session_thread(&arguments.session_id)?.update(
1019 &mut self.cx.clone(),
1020 |thread, cx| {
1021 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1022 },
1023 )?;
1024
1025 let content = task.await?;
1026
1027 Ok(acp::ReadTextFileResponse::new(content))
1028 }
1029
1030 async fn session_notification(
1031 &self,
1032 notification: acp::SessionNotification,
1033 ) -> Result<(), acp::Error> {
1034 let sessions = self.sessions.borrow();
1035 let session = sessions
1036 .get(¬ification.session_id)
1037 .context("Failed to get session")?;
1038
1039 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1040 current_mode_id,
1041 ..
1042 }) = ¬ification.update
1043 {
1044 if let Some(session_modes) = &session.session_modes {
1045 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1046 } else {
1047 log::error!(
1048 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
1049 );
1050 }
1051 }
1052
1053 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1054 config_options,
1055 ..
1056 }) = ¬ification.update
1057 {
1058 if let Some(opts) = &session.config_options {
1059 *opts.config_options.borrow_mut() = config_options.clone();
1060 opts.tx.borrow_mut().send(()).ok();
1061 } else {
1062 log::error!(
1063 "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
1064 );
1065 }
1066 }
1067
1068 // Clone so we can inspect meta both before and after handing off to the thread
1069 let update_clone = notification.update.clone();
1070
1071 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1072 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1073 if let Some(meta) = &tc.meta {
1074 if let Some(terminal_info) = meta.get("terminal_info") {
1075 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1076 {
1077 let terminal_id = acp::TerminalId::new(id_str);
1078 let cwd = terminal_info
1079 .get("cwd")
1080 .and_then(|v| v.as_str().map(PathBuf::from));
1081
1082 // Create a minimal display-only lower-level terminal and register it.
1083 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1084 let builder = TerminalBuilder::new_display_only(
1085 CursorShape::default(),
1086 AlternateScroll::On,
1087 None,
1088 0,
1089 )?;
1090 let lower = cx.new(|cx| builder.subscribe(cx));
1091 thread.on_terminal_provider_event(
1092 TerminalProviderEvent::Created {
1093 terminal_id,
1094 label: tc.title.clone(),
1095 cwd,
1096 output_byte_limit: None,
1097 terminal: lower,
1098 },
1099 cx,
1100 );
1101 anyhow::Ok(())
1102 });
1103 }
1104 }
1105 }
1106 }
1107
1108 // Forward the update to the acp_thread as usual.
1109 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1110 thread.handle_session_update(notification.update.clone(), cx)
1111 })??;
1112
1113 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1114 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1115 if let Some(meta) = &tcu.meta {
1116 if let Some(term_out) = meta.get("terminal_output") {
1117 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1118 let terminal_id = acp::TerminalId::new(id_str);
1119 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1120 let data = s.as_bytes().to_vec();
1121 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1122 thread.on_terminal_provider_event(
1123 TerminalProviderEvent::Output { terminal_id, data },
1124 cx,
1125 );
1126 });
1127 }
1128 }
1129 }
1130
1131 // terminal_exit
1132 if let Some(term_exit) = meta.get("terminal_exit") {
1133 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1134 let terminal_id = acp::TerminalId::new(id_str);
1135 let status = acp::TerminalExitStatus::new()
1136 .exit_code(
1137 term_exit
1138 .get("exit_code")
1139 .and_then(|v| v.as_u64())
1140 .map(|i| i as u32),
1141 )
1142 .signal(
1143 term_exit
1144 .get("signal")
1145 .and_then(|v| v.as_str().map(|s| s.to_string())),
1146 );
1147
1148 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1149 thread.on_terminal_provider_event(
1150 TerminalProviderEvent::Exit {
1151 terminal_id,
1152 status,
1153 },
1154 cx,
1155 );
1156 });
1157 }
1158 }
1159 }
1160 }
1161
1162 Ok(())
1163 }
1164
1165 async fn create_terminal(
1166 &self,
1167 args: acp::CreateTerminalRequest,
1168 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1169 let thread = self.session_thread(&args.session_id)?;
1170 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1171
1172 let terminal_entity = acp_thread::create_terminal_entity(
1173 args.command.clone(),
1174 &args.args,
1175 args.env
1176 .into_iter()
1177 .map(|env| (env.name, env.value))
1178 .collect(),
1179 args.cwd.clone(),
1180 &project,
1181 &mut self.cx.clone(),
1182 )
1183 .await?;
1184
1185 // Register with renderer
1186 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1187 thread.register_terminal_created(
1188 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1189 format!("{} {}", args.command, args.args.join(" ")),
1190 args.cwd.clone(),
1191 args.output_byte_limit,
1192 terminal_entity,
1193 cx,
1194 )
1195 })?;
1196 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1197 Ok(acp::CreateTerminalResponse::new(terminal_id))
1198 }
1199
1200 async fn kill_terminal_command(
1201 &self,
1202 args: acp::KillTerminalCommandRequest,
1203 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1204 self.session_thread(&args.session_id)?
1205 .update(&mut self.cx.clone(), |thread, cx| {
1206 thread.kill_terminal(args.terminal_id, cx)
1207 })??;
1208
1209 Ok(Default::default())
1210 }
1211
1212 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1213 Err(acp::Error::method_not_found())
1214 }
1215
1216 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1217 Err(acp::Error::method_not_found())
1218 }
1219
1220 async fn release_terminal(
1221 &self,
1222 args: acp::ReleaseTerminalRequest,
1223 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1224 self.session_thread(&args.session_id)?
1225 .update(&mut self.cx.clone(), |thread, cx| {
1226 thread.release_terminal(args.terminal_id, cx)
1227 })??;
1228
1229 Ok(Default::default())
1230 }
1231
1232 async fn terminal_output(
1233 &self,
1234 args: acp::TerminalOutputRequest,
1235 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1236 self.session_thread(&args.session_id)?
1237 .read_with(&mut self.cx.clone(), |thread, cx| {
1238 let out = thread
1239 .terminal(args.terminal_id)?
1240 .read(cx)
1241 .current_output(cx);
1242
1243 Ok(out)
1244 })?
1245 }
1246
1247 async fn wait_for_terminal_exit(
1248 &self,
1249 args: acp::WaitForTerminalExitRequest,
1250 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1251 let exit_status = self
1252 .session_thread(&args.session_id)?
1253 .update(&mut self.cx.clone(), |thread, cx| {
1254 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1255 })??
1256 .await;
1257
1258 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1259 }
1260}
1261
1262impl ClientDelegate {
1263 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1264 let sessions = self.sessions.borrow();
1265 sessions
1266 .get(session_id)
1267 .context("Failed to get session")
1268 .map(|session| session.thread.clone())
1269 }
1270}