1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
11use futures::AsyncBufReadExt as _;
12use futures::io::BufReader;
13use project::Project;
14use project::agent_server_store::AgentServerCommand;
15use serde::Deserialize;
16use settings::Settings as _;
17use task::ShellBuilder;
18use util::ResultExt as _;
19use util::process::Child;
20
21use std::path::PathBuf;
22use std::process::Stdio;
23use std::{any::Any, cell::RefCell};
24use std::{path::Path, rc::Rc};
25use thiserror::Error;
26
27use anyhow::{Context as _, Result};
28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
29
30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
31use terminal::TerminalBuilder;
32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
33
34#[derive(Debug, Error)]
35#[error("Unsupported version")]
36pub struct UnsupportedVersion;
37
38pub struct AcpConnection {
39 server_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 root_dir: PathBuf,
49 child: Child,
50 session_list: Option<Rc<AcpSessionList>>,
51 _io_task: Task<Result<(), acp::Error>>,
52 _wait_task: Task<Result<()>>,
53 _stderr_task: Task<Result<()>>,
54}
55
56struct ConfigOptions {
57 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
58 tx: Rc<RefCell<watch::Sender<()>>>,
59 rx: watch::Receiver<()>,
60}
61
62impl ConfigOptions {
63 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
64 let (tx, rx) = watch::channel(());
65 Self {
66 config_options,
67 tx: Rc::new(RefCell::new(tx)),
68 rx,
69 }
70 }
71}
72
73pub struct AcpSession {
74 thread: WeakEntity<AcpThread>,
75 suppress_abort_err: bool,
76 models: Option<Rc<RefCell<acp::SessionModelState>>>,
77 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
78 config_options: Option<ConfigOptions>,
79}
80
81pub struct AcpSessionList {
82 connection: Rc<acp::ClientSideConnection>,
83 updates_tx: Rc<RefCell<watch::Sender<()>>>,
84 updates_rx: watch::Receiver<()>,
85}
86
87impl AcpSessionList {
88 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
89 let (tx, rx) = watch::channel(());
90 Self {
91 connection,
92 updates_tx: Rc::new(RefCell::new(tx)),
93 updates_rx: rx,
94 }
95 }
96
97 fn notify_update(&self) {
98 self.updates_tx.borrow_mut().send(()).ok();
99 }
100}
101
102impl AgentSessionList for AcpSessionList {
103 fn list_sessions(
104 &self,
105 request: AgentSessionListRequest,
106 cx: &mut App,
107 ) -> Task<Result<AgentSessionListResponse>> {
108 let conn = self.connection.clone();
109 cx.foreground_executor().spawn(async move {
110 let acp_request = acp::ListSessionsRequest::new()
111 .cwd(request.cwd)
112 .cursor(request.cursor);
113 let response = conn.list_sessions(acp_request).await?;
114 Ok(AgentSessionListResponse {
115 sessions: response
116 .sessions
117 .into_iter()
118 .map(|s| AgentSessionInfo {
119 session_id: s.session_id,
120 cwd: Some(s.cwd),
121 title: s.title.map(Into::into),
122 updated_at: s.updated_at.and_then(|date_str| {
123 chrono::DateTime::parse_from_rfc3339(&date_str)
124 .ok()
125 .map(|dt| dt.with_timezone(&chrono::Utc))
126 }),
127 meta: s.meta,
128 })
129 .collect(),
130 next_cursor: response.next_cursor,
131 meta: response.meta,
132 })
133 })
134 }
135
136 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
137 Some(self.updates_rx.clone())
138 }
139
140 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
141 self
142 }
143}
144
145pub async fn connect(
146 server_name: SharedString,
147 command: AgentServerCommand,
148 root_dir: &Path,
149 default_mode: Option<acp::SessionModeId>,
150 default_model: Option<acp::ModelId>,
151 default_config_options: HashMap<String, String>,
152 is_remote: bool,
153 cx: &mut AsyncApp,
154) -> Result<Rc<dyn AgentConnection>> {
155 let conn = AcpConnection::stdio(
156 server_name,
157 command.clone(),
158 root_dir,
159 default_mode,
160 default_model,
161 default_config_options,
162 is_remote,
163 cx,
164 )
165 .await?;
166 Ok(Rc::new(conn) as _)
167}
168
169const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
170
171impl AcpConnection {
172 pub async fn stdio(
173 server_name: SharedString,
174 command: AgentServerCommand,
175 root_dir: &Path,
176 default_mode: Option<acp::SessionModeId>,
177 default_model: Option<acp::ModelId>,
178 default_config_options: HashMap<String, String>,
179 is_remote: bool,
180 cx: &mut AsyncApp,
181 ) -> Result<Self> {
182 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
183 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
184 let mut child =
185 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
186 child.envs(command.env.iter().flatten());
187 if !is_remote {
188 child.current_dir(root_dir);
189 }
190 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
191
192 let stdout = child.stdout.take().context("Failed to take stdout")?;
193 let stdin = child.stdin.take().context("Failed to take stdin")?;
194 let stderr = child.stderr.take().context("Failed to take stderr")?;
195 log::debug!(
196 "Spawning external agent server: {:?}, {:?}",
197 command.path,
198 command.args
199 );
200 log::trace!("Spawned (pid: {})", child.id());
201
202 let sessions = Rc::new(RefCell::new(HashMap::default()));
203
204 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
205 (
206 release_channel::ReleaseChannel::try_global(cx)
207 .map(|release_channel| release_channel.display_name()),
208 release_channel::AppVersion::global(cx).to_string(),
209 )
210 });
211
212 let client = ClientDelegate {
213 sessions: sessions.clone(),
214 cx: cx.clone(),
215 };
216 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
217 let foreground_executor = cx.foreground_executor().clone();
218 move |fut| {
219 foreground_executor.spawn(fut).detach();
220 }
221 });
222
223 let io_task = cx.background_spawn(io_task);
224
225 let stderr_task = cx.background_spawn(async move {
226 let mut stderr = BufReader::new(stderr);
227 let mut line = String::new();
228 while let Ok(n) = stderr.read_line(&mut line).await
229 && n > 0
230 {
231 log::warn!("agent stderr: {}", line.trim());
232 line.clear();
233 }
234 Ok(())
235 });
236
237 let wait_task = cx.spawn({
238 let sessions = sessions.clone();
239 let status_fut = child.status();
240 async move |cx| {
241 let status = status_fut.await?;
242
243 for session in sessions.borrow().values() {
244 session
245 .thread
246 .update(cx, |thread, cx| {
247 thread.emit_load_error(LoadError::Exited { status }, cx)
248 })
249 .ok();
250 }
251
252 anyhow::Ok(())
253 }
254 });
255
256 let connection = Rc::new(connection);
257
258 cx.update(|cx| {
259 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
260 registry.set_active_connection(server_name.clone(), &connection, cx)
261 });
262 });
263
264 let response = connection
265 .initialize(
266 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
267 .client_capabilities(
268 acp::ClientCapabilities::new()
269 .fs(acp::FileSystemCapability::new()
270 .read_text_file(true)
271 .write_text_file(true))
272 .terminal(true)
273 // Experimental: Allow for rendering terminal output from the agents
274 .meta(acp::Meta::from_iter([
275 ("terminal_output".into(), true.into()),
276 ("terminal-auth".into(), true.into()),
277 ])),
278 )
279 .client_info(
280 acp::Implementation::new("zed", version)
281 .title(release_channel.map(ToOwned::to_owned)),
282 ),
283 )
284 .await?;
285
286 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
287 return Err(UnsupportedVersion.into());
288 }
289
290 let telemetry_id = response
291 .agent_info
292 // Use the one the agent provides if we have one
293 .map(|info| info.name.into())
294 // Otherwise, just use the name
295 .unwrap_or_else(|| server_name.clone());
296
297 let session_list = if response
298 .agent_capabilities
299 .session_capabilities
300 .list
301 .is_some()
302 {
303 Some(Rc::new(AcpSessionList::new(connection.clone())))
304 } else {
305 None
306 };
307
308 Ok(Self {
309 auth_methods: response.auth_methods,
310 root_dir: root_dir.to_owned(),
311 connection,
312 server_name,
313 telemetry_id,
314 sessions,
315 agent_capabilities: response.agent_capabilities,
316 default_mode,
317 default_model,
318 default_config_options,
319 session_list,
320 _io_task: io_task,
321 _wait_task: wait_task,
322 _stderr_task: stderr_task,
323 child,
324 })
325 }
326
327 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
328 &self.agent_capabilities.prompt_capabilities
329 }
330
331 pub fn root_dir(&self) -> &Path {
332 &self.root_dir
333 }
334}
335
336impl Drop for AcpConnection {
337 fn drop(&mut self) {
338 self.child.kill().log_err();
339 }
340}
341
342impl AgentConnection for AcpConnection {
343 fn telemetry_id(&self) -> SharedString {
344 self.telemetry_id.clone()
345 }
346
347 fn new_thread(
348 self: Rc<Self>,
349 project: Entity<Project>,
350 cwd: &Path,
351 cx: &mut App,
352 ) -> Task<Result<Entity<AcpThread>>> {
353 let name = self.server_name.clone();
354 let conn = self.connection.clone();
355 let sessions = self.sessions.clone();
356 let default_mode = self.default_mode.clone();
357 let default_model = self.default_model.clone();
358 let default_config_options = self.default_config_options.clone();
359 let cwd = cwd.to_path_buf();
360 let context_server_store = project.read(cx).context_server_store().read(cx);
361 let mcp_servers = if project.read(cx).is_local() {
362 context_server_store
363 .configured_server_ids()
364 .iter()
365 .filter_map(|id| {
366 let configuration = context_server_store.configuration_for_server(id)?;
367 match &*configuration {
368 project::context_server_store::ContextServerConfiguration::Custom {
369 command,
370 ..
371 }
372 | project::context_server_store::ContextServerConfiguration::Extension {
373 command,
374 ..
375 } => Some(acp::McpServer::Stdio(
376 acp::McpServerStdio::new(id.0.to_string(), &command.path)
377 .args(command.args.clone())
378 .env(if let Some(env) = command.env.as_ref() {
379 env.iter()
380 .map(|(name, value)| acp::EnvVariable::new(name, value))
381 .collect()
382 } else {
383 vec![]
384 }),
385 )),
386 project::context_server_store::ContextServerConfiguration::Http {
387 url,
388 headers,
389 timeout: _,
390 } => Some(acp::McpServer::Http(
391 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
392 headers
393 .iter()
394 .map(|(name, value)| acp::HttpHeader::new(name, value))
395 .collect(),
396 ),
397 )),
398 }
399 })
400 .collect()
401 } else {
402 // In SSH projects, the external agent is running on the remote
403 // machine, and currently we only run MCP servers on the local
404 // machine. So don't pass any MCP servers to the agent in that case.
405 Vec::new()
406 };
407
408 cx.spawn(async move |cx| {
409 let response = conn
410 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
411 .await
412 .map_err(|err| {
413 if err.code == acp::ErrorCode::AuthRequired {
414 let mut error = AuthRequired::new();
415
416 if err.message != acp::ErrorCode::AuthRequired.to_string() {
417 error = error.with_description(err.message);
418 }
419
420 anyhow!(error)
421 } else {
422 anyhow!(err)
423 }
424 })?;
425
426 let use_config_options = cx.update(|cx| cx.has_flag::<AcpBetaFeatureFlag>());
427
428 // Config options take precedence over legacy modes/models
429 let (modes, models, config_options) = if use_config_options && let Some(opts) = response.config_options {
430 (
431 None,
432 None,
433 Some(Rc::new(RefCell::new(opts))),
434 )
435 } else {
436 // Fall back to legacy modes/models
437 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
438 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
439 (modes, models, None)
440 };
441
442 if let Some(default_mode) = default_mode {
443 if let Some(modes) = modes.as_ref() {
444 let mut modes_ref = modes.borrow_mut();
445 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
446
447 if has_mode {
448 let initial_mode_id = modes_ref.current_mode_id.clone();
449
450 cx.spawn({
451 let default_mode = default_mode.clone();
452 let session_id = response.session_id.clone();
453 let modes = modes.clone();
454 let conn = conn.clone();
455 async move |_| {
456 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
457 .await.log_err();
458
459 if result.is_none() {
460 modes.borrow_mut().current_mode_id = initial_mode_id;
461 }
462 }
463 }).detach();
464
465 modes_ref.current_mode_id = default_mode;
466 } else {
467 let available_modes = modes_ref
468 .available_modes
469 .iter()
470 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
471 .collect::<Vec<_>>()
472 .join("\n");
473
474 log::warn!(
475 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
476 );
477 }
478 } else {
479 log::warn!(
480 "`{name}` does not support modes, but `default_mode` was set in settings.",
481 );
482 }
483 }
484
485 if let Some(default_model) = default_model {
486 if let Some(models) = models.as_ref() {
487 let mut models_ref = models.borrow_mut();
488 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
489
490 if has_model {
491 let initial_model_id = models_ref.current_model_id.clone();
492
493 cx.spawn({
494 let default_model = default_model.clone();
495 let session_id = response.session_id.clone();
496 let models = models.clone();
497 let conn = conn.clone();
498 async move |_| {
499 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
500 .await.log_err();
501
502 if result.is_none() {
503 models.borrow_mut().current_model_id = initial_model_id;
504 }
505 }
506 }).detach();
507
508 models_ref.current_model_id = default_model;
509 } else {
510 let available_models = models_ref
511 .available_models
512 .iter()
513 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
514 .collect::<Vec<_>>()
515 .join("\n");
516
517 log::warn!(
518 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
519 );
520 }
521 } else {
522 log::warn!(
523 "`{name}` does not support model selection, but `default_model` was set in settings.",
524 );
525 }
526 }
527
528 if let Some(config_opts) = config_options.as_ref() {
529 let defaults_to_apply: Vec<_> = {
530 let config_opts_ref = config_opts.borrow();
531 config_opts_ref
532 .iter()
533 .filter_map(|config_option| {
534 let default_value = default_config_options.get(&*config_option.id.0)?;
535
536 let is_valid = match &config_option.kind {
537 acp::SessionConfigKind::Select(select) => match &select.options {
538 acp::SessionConfigSelectOptions::Ungrouped(options) => {
539 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
540 }
541 acp::SessionConfigSelectOptions::Grouped(groups) => groups
542 .iter()
543 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
544 _ => false,
545 },
546 _ => false,
547 };
548
549 if is_valid {
550 let initial_value = match &config_option.kind {
551 acp::SessionConfigKind::Select(select) => {
552 Some(select.current_value.clone())
553 }
554 _ => None,
555 };
556 Some((config_option.id.clone(), default_value.clone(), initial_value))
557 } else {
558 log::warn!(
559 "`{}` is not a valid value for config option `{}` in {}",
560 default_value,
561 config_option.id.0,
562 name
563 );
564 None
565 }
566 })
567 .collect()
568 };
569
570 for (config_id, default_value, initial_value) in defaults_to_apply {
571 cx.spawn({
572 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
573 let session_id = response.session_id.clone();
574 let config_id_clone = config_id.clone();
575 let config_opts = config_opts.clone();
576 let conn = conn.clone();
577 async move |_| {
578 let result = conn
579 .set_session_config_option(
580 acp::SetSessionConfigOptionRequest::new(
581 session_id,
582 config_id_clone.clone(),
583 default_value_id,
584 ),
585 )
586 .await
587 .log_err();
588
589 if result.is_none() {
590 if let Some(initial) = initial_value {
591 let mut opts = config_opts.borrow_mut();
592 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
593 if let acp::SessionConfigKind::Select(select) =
594 &mut opt.kind
595 {
596 select.current_value = initial;
597 }
598 }
599 }
600 }
601 }
602 })
603 .detach();
604
605 let mut opts = config_opts.borrow_mut();
606 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
607 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
608 select.current_value = acp::SessionConfigValueId::new(default_value);
609 }
610 }
611 }
612 }
613
614 let session_id = response.session_id;
615 let action_log = cx.new(|_| ActionLog::new(project.clone()));
616 let thread: Entity<AcpThread> = cx.new(|cx| {
617 AcpThread::new(
618 self.server_name.clone(),
619 self.clone(),
620 project,
621 action_log,
622 session_id.clone(),
623 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
624 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
625 cx,
626 )
627 });
628
629
630 let session = AcpSession {
631 thread: thread.downgrade(),
632 suppress_abort_err: false,
633 session_modes: modes,
634 models,
635 config_options: config_options.map(|opts| ConfigOptions::new(opts))
636 };
637 sessions.borrow_mut().insert(session_id, session);
638
639 if let Some(session_list) = &self.session_list {
640 session_list.notify_update();
641 }
642
643 Ok(thread)
644 })
645 }
646
647 fn auth_methods(&self) -> &[acp::AuthMethod] {
648 &self.auth_methods
649 }
650
651 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
652 let conn = self.connection.clone();
653 cx.foreground_executor().spawn(async move {
654 conn.authenticate(acp::AuthenticateRequest::new(method_id))
655 .await?;
656 Ok(())
657 })
658 }
659
660 fn prompt(
661 &self,
662 _id: Option<acp_thread::UserMessageId>,
663 params: acp::PromptRequest,
664 cx: &mut App,
665 ) -> Task<Result<acp::PromptResponse>> {
666 let conn = self.connection.clone();
667 let sessions = self.sessions.clone();
668 let session_id = params.session_id.clone();
669 cx.foreground_executor().spawn(async move {
670 let result = conn.prompt(params).await;
671
672 let mut suppress_abort_err = false;
673
674 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
675 suppress_abort_err = session.suppress_abort_err;
676 session.suppress_abort_err = false;
677 }
678
679 match result {
680 Ok(response) => Ok(response),
681 Err(err) => {
682 if err.code == acp::ErrorCode::AuthRequired {
683 return Err(anyhow!(acp::Error::auth_required()));
684 }
685
686 if err.code != ErrorCode::InternalError {
687 anyhow::bail!(err)
688 }
689
690 let Some(data) = &err.data else {
691 anyhow::bail!(err)
692 };
693
694 // Temporary workaround until the following PR is generally available:
695 // https://github.com/google-gemini/gemini-cli/pull/6656
696
697 #[derive(Deserialize)]
698 #[serde(deny_unknown_fields)]
699 struct ErrorDetails {
700 details: Box<str>,
701 }
702
703 match serde_json::from_value(data.clone()) {
704 Ok(ErrorDetails { details }) => {
705 if suppress_abort_err
706 && (details.contains("This operation was aborted")
707 || details.contains("The user aborted a request"))
708 {
709 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
710 } else {
711 Err(anyhow!(details))
712 }
713 }
714 Err(_) => Err(anyhow!(err)),
715 }
716 }
717 }
718 })
719 }
720
721 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
722 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
723 session.suppress_abort_err = true;
724 }
725 let conn = self.connection.clone();
726 let params = acp::CancelNotification::new(session_id.clone());
727 cx.foreground_executor()
728 .spawn(async move { conn.cancel(params).await })
729 .detach();
730 }
731
732 fn session_modes(
733 &self,
734 session_id: &acp::SessionId,
735 _cx: &App,
736 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
737 let sessions = self.sessions.clone();
738 let sessions_ref = sessions.borrow();
739 let Some(session) = sessions_ref.get(session_id) else {
740 return None;
741 };
742
743 if let Some(modes) = session.session_modes.as_ref() {
744 Some(Rc::new(AcpSessionModes {
745 connection: self.connection.clone(),
746 session_id: session_id.clone(),
747 state: modes.clone(),
748 }) as _)
749 } else {
750 None
751 }
752 }
753
754 fn model_selector(
755 &self,
756 session_id: &acp::SessionId,
757 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
758 let sessions = self.sessions.clone();
759 let sessions_ref = sessions.borrow();
760 let Some(session) = sessions_ref.get(session_id) else {
761 return None;
762 };
763
764 if let Some(models) = session.models.as_ref() {
765 Some(Rc::new(AcpModelSelector::new(
766 session_id.clone(),
767 self.connection.clone(),
768 models.clone(),
769 )) as _)
770 } else {
771 None
772 }
773 }
774
775 fn session_config_options(
776 &self,
777 session_id: &acp::SessionId,
778 _cx: &App,
779 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
780 let sessions = self.sessions.borrow();
781 let session = sessions.get(session_id)?;
782
783 let config_opts = session.config_options.as_ref()?;
784
785 Some(Rc::new(AcpSessionConfigOptions {
786 session_id: session_id.clone(),
787 connection: self.connection.clone(),
788 state: config_opts.config_options.clone(),
789 watch_tx: config_opts.tx.clone(),
790 watch_rx: config_opts.rx.clone(),
791 }) as _)
792 }
793
794 fn session_list(&self, cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
795 if cx.has_flag::<AcpBetaFeatureFlag>() {
796 self.session_list.clone().map(|s| s as _)
797 } else {
798 None
799 }
800 }
801
802 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
803 self
804 }
805}
806
807struct AcpSessionModes {
808 session_id: acp::SessionId,
809 connection: Rc<acp::ClientSideConnection>,
810 state: Rc<RefCell<acp::SessionModeState>>,
811}
812
813impl acp_thread::AgentSessionModes for AcpSessionModes {
814 fn current_mode(&self) -> acp::SessionModeId {
815 self.state.borrow().current_mode_id.clone()
816 }
817
818 fn all_modes(&self) -> Vec<acp::SessionMode> {
819 self.state.borrow().available_modes.clone()
820 }
821
822 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
823 let connection = self.connection.clone();
824 let session_id = self.session_id.clone();
825 let old_mode_id;
826 {
827 let mut state = self.state.borrow_mut();
828 old_mode_id = state.current_mode_id.clone();
829 state.current_mode_id = mode_id.clone();
830 };
831 let state = self.state.clone();
832 cx.foreground_executor().spawn(async move {
833 let result = connection
834 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
835 .await;
836
837 if result.is_err() {
838 state.borrow_mut().current_mode_id = old_mode_id;
839 }
840
841 result?;
842
843 Ok(())
844 })
845 }
846}
847
848struct AcpModelSelector {
849 session_id: acp::SessionId,
850 connection: Rc<acp::ClientSideConnection>,
851 state: Rc<RefCell<acp::SessionModelState>>,
852}
853
854impl AcpModelSelector {
855 fn new(
856 session_id: acp::SessionId,
857 connection: Rc<acp::ClientSideConnection>,
858 state: Rc<RefCell<acp::SessionModelState>>,
859 ) -> Self {
860 Self {
861 session_id,
862 connection,
863 state,
864 }
865 }
866}
867
868impl acp_thread::AgentModelSelector for AcpModelSelector {
869 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
870 Task::ready(Ok(acp_thread::AgentModelList::Flat(
871 self.state
872 .borrow()
873 .available_models
874 .clone()
875 .into_iter()
876 .map(acp_thread::AgentModelInfo::from)
877 .collect(),
878 )))
879 }
880
881 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
882 let connection = self.connection.clone();
883 let session_id = self.session_id.clone();
884 let old_model_id;
885 {
886 let mut state = self.state.borrow_mut();
887 old_model_id = state.current_model_id.clone();
888 state.current_model_id = model_id.clone();
889 };
890 let state = self.state.clone();
891 cx.foreground_executor().spawn(async move {
892 let result = connection
893 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
894 .await;
895
896 if result.is_err() {
897 state.borrow_mut().current_model_id = old_model_id;
898 }
899
900 result?;
901
902 Ok(())
903 })
904 }
905
906 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
907 let state = self.state.borrow();
908 Task::ready(
909 state
910 .available_models
911 .iter()
912 .find(|m| m.model_id == state.current_model_id)
913 .cloned()
914 .map(acp_thread::AgentModelInfo::from)
915 .ok_or_else(|| anyhow::anyhow!("Model not found")),
916 )
917 }
918}
919
920struct AcpSessionConfigOptions {
921 session_id: acp::SessionId,
922 connection: Rc<acp::ClientSideConnection>,
923 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
924 watch_tx: Rc<RefCell<watch::Sender<()>>>,
925 watch_rx: watch::Receiver<()>,
926}
927
928impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
929 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
930 self.state.borrow().clone()
931 }
932
933 fn set_config_option(
934 &self,
935 config_id: acp::SessionConfigId,
936 value: acp::SessionConfigValueId,
937 cx: &mut App,
938 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
939 let connection = self.connection.clone();
940 let session_id = self.session_id.clone();
941 let state = self.state.clone();
942
943 let watch_tx = self.watch_tx.clone();
944
945 cx.foreground_executor().spawn(async move {
946 let response = connection
947 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
948 session_id, config_id, value,
949 ))
950 .await?;
951
952 *state.borrow_mut() = response.config_options.clone();
953 watch_tx.borrow_mut().send(()).ok();
954 Ok(response.config_options)
955 })
956 }
957
958 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
959 Some(self.watch_rx.clone())
960 }
961}
962
963struct ClientDelegate {
964 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
965 cx: AsyncApp,
966}
967
968#[async_trait::async_trait(?Send)]
969impl acp::Client for ClientDelegate {
970 async fn request_permission(
971 &self,
972 arguments: acp::RequestPermissionRequest,
973 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
974 let respect_always_allow_setting;
975 let thread;
976 {
977 let sessions_ref = self.sessions.borrow();
978 let session = sessions_ref
979 .get(&arguments.session_id)
980 .context("Failed to get session")?;
981 respect_always_allow_setting = session.session_modes.is_none();
982 thread = session.thread.clone();
983 }
984
985 let cx = &mut self.cx.clone();
986
987 let task = thread.update(cx, |thread, cx| {
988 thread.request_tool_call_authorization(
989 arguments.tool_call,
990 arguments.options,
991 respect_always_allow_setting,
992 cx,
993 )
994 })??;
995
996 let outcome = task.await;
997
998 Ok(acp::RequestPermissionResponse::new(outcome))
999 }
1000
1001 async fn write_text_file(
1002 &self,
1003 arguments: acp::WriteTextFileRequest,
1004 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1005 let cx = &mut self.cx.clone();
1006 let task = self
1007 .session_thread(&arguments.session_id)?
1008 .update(cx, |thread, cx| {
1009 thread.write_text_file(arguments.path, arguments.content, cx)
1010 })?;
1011
1012 task.await?;
1013
1014 Ok(Default::default())
1015 }
1016
1017 async fn read_text_file(
1018 &self,
1019 arguments: acp::ReadTextFileRequest,
1020 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1021 let task = self.session_thread(&arguments.session_id)?.update(
1022 &mut self.cx.clone(),
1023 |thread, cx| {
1024 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1025 },
1026 )?;
1027
1028 let content = task.await?;
1029
1030 Ok(acp::ReadTextFileResponse::new(content))
1031 }
1032
1033 async fn session_notification(
1034 &self,
1035 notification: acp::SessionNotification,
1036 ) -> Result<(), acp::Error> {
1037 let sessions = self.sessions.borrow();
1038 let session = sessions
1039 .get(¬ification.session_id)
1040 .context("Failed to get session")?;
1041
1042 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1043 current_mode_id,
1044 ..
1045 }) = ¬ification.update
1046 {
1047 if let Some(session_modes) = &session.session_modes {
1048 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1049 } else {
1050 log::error!(
1051 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
1052 );
1053 }
1054 }
1055
1056 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1057 config_options,
1058 ..
1059 }) = ¬ification.update
1060 {
1061 if let Some(opts) = &session.config_options {
1062 *opts.config_options.borrow_mut() = config_options.clone();
1063 opts.tx.borrow_mut().send(()).ok();
1064 } else {
1065 log::error!(
1066 "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
1067 );
1068 }
1069 }
1070
1071 // Clone so we can inspect meta both before and after handing off to the thread
1072 let update_clone = notification.update.clone();
1073
1074 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1075 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1076 if let Some(meta) = &tc.meta {
1077 if let Some(terminal_info) = meta.get("terminal_info") {
1078 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1079 {
1080 let terminal_id = acp::TerminalId::new(id_str);
1081 let cwd = terminal_info
1082 .get("cwd")
1083 .and_then(|v| v.as_str().map(PathBuf::from));
1084
1085 // Create a minimal display-only lower-level terminal and register it.
1086 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1087 let builder = TerminalBuilder::new_display_only(
1088 CursorShape::default(),
1089 AlternateScroll::On,
1090 None,
1091 0,
1092 )?;
1093 let lower = cx.new(|cx| builder.subscribe(cx));
1094 thread.on_terminal_provider_event(
1095 TerminalProviderEvent::Created {
1096 terminal_id,
1097 label: tc.title.clone(),
1098 cwd,
1099 output_byte_limit: None,
1100 terminal: lower,
1101 },
1102 cx,
1103 );
1104 anyhow::Ok(())
1105 });
1106 }
1107 }
1108 }
1109 }
1110
1111 // Forward the update to the acp_thread as usual.
1112 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1113 thread.handle_session_update(notification.update.clone(), cx)
1114 })??;
1115
1116 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1117 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1118 if let Some(meta) = &tcu.meta {
1119 if let Some(term_out) = meta.get("terminal_output") {
1120 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1121 let terminal_id = acp::TerminalId::new(id_str);
1122 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1123 let data = s.as_bytes().to_vec();
1124 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1125 thread.on_terminal_provider_event(
1126 TerminalProviderEvent::Output { terminal_id, data },
1127 cx,
1128 );
1129 });
1130 }
1131 }
1132 }
1133
1134 // terminal_exit
1135 if let Some(term_exit) = meta.get("terminal_exit") {
1136 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1137 let terminal_id = acp::TerminalId::new(id_str);
1138 let status = acp::TerminalExitStatus::new()
1139 .exit_code(
1140 term_exit
1141 .get("exit_code")
1142 .and_then(|v| v.as_u64())
1143 .map(|i| i as u32),
1144 )
1145 .signal(
1146 term_exit
1147 .get("signal")
1148 .and_then(|v| v.as_str().map(|s| s.to_string())),
1149 );
1150
1151 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1152 thread.on_terminal_provider_event(
1153 TerminalProviderEvent::Exit {
1154 terminal_id,
1155 status,
1156 },
1157 cx,
1158 );
1159 });
1160 }
1161 }
1162 }
1163 }
1164
1165 Ok(())
1166 }
1167
1168 async fn create_terminal(
1169 &self,
1170 args: acp::CreateTerminalRequest,
1171 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1172 let thread = self.session_thread(&args.session_id)?;
1173 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1174
1175 let terminal_entity = acp_thread::create_terminal_entity(
1176 args.command.clone(),
1177 &args.args,
1178 args.env
1179 .into_iter()
1180 .map(|env| (env.name, env.value))
1181 .collect(),
1182 args.cwd.clone(),
1183 &project,
1184 &mut self.cx.clone(),
1185 )
1186 .await?;
1187
1188 // Register with renderer
1189 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1190 thread.register_terminal_created(
1191 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1192 format!("{} {}", args.command, args.args.join(" ")),
1193 args.cwd.clone(),
1194 args.output_byte_limit,
1195 terminal_entity,
1196 cx,
1197 )
1198 })?;
1199 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1200 Ok(acp::CreateTerminalResponse::new(terminal_id))
1201 }
1202
1203 async fn kill_terminal_command(
1204 &self,
1205 args: acp::KillTerminalCommandRequest,
1206 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1207 self.session_thread(&args.session_id)?
1208 .update(&mut self.cx.clone(), |thread, cx| {
1209 thread.kill_terminal(args.terminal_id, cx)
1210 })??;
1211
1212 Ok(Default::default())
1213 }
1214
1215 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1216 Err(acp::Error::method_not_found())
1217 }
1218
1219 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1220 Err(acp::Error::method_not_found())
1221 }
1222
1223 async fn release_terminal(
1224 &self,
1225 args: acp::ReleaseTerminalRequest,
1226 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1227 self.session_thread(&args.session_id)?
1228 .update(&mut self.cx.clone(), |thread, cx| {
1229 thread.release_terminal(args.terminal_id, cx)
1230 })??;
1231
1232 Ok(Default::default())
1233 }
1234
1235 async fn terminal_output(
1236 &self,
1237 args: acp::TerminalOutputRequest,
1238 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1239 self.session_thread(&args.session_id)?
1240 .read_with(&mut self.cx.clone(), |thread, cx| {
1241 let out = thread
1242 .terminal(args.terminal_id)?
1243 .read(cx)
1244 .current_output(cx);
1245
1246 Ok(out)
1247 })?
1248 }
1249
1250 async fn wait_for_terminal_exit(
1251 &self,
1252 args: acp::WaitForTerminalExitRequest,
1253 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1254 let exit_status = self
1255 .session_thread(&args.session_id)?
1256 .update(&mut self.cx.clone(), |thread, cx| {
1257 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1258 })??
1259 .await;
1260
1261 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1262 }
1263}
1264
1265impl ClientDelegate {
1266 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1267 let sessions = self.sessions.borrow();
1268 sessions
1269 .get(session_id)
1270 .context("Failed to get session")
1271 .map(|session| session.thread.clone())
1272 }
1273}