1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
5use anyhow::anyhow;
6use collections::HashMap;
7use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
8use futures::AsyncBufReadExt as _;
9use futures::io::BufReader;
10use project::Project;
11use project::agent_server_store::AgentServerCommand;
12use serde::Deserialize;
13use settings::Settings as _;
14use task::ShellBuilder;
15use util::ResultExt as _;
16
17use std::path::PathBuf;
18use std::{any::Any, cell::RefCell};
19use std::{path::Path, rc::Rc};
20use thiserror::Error;
21
22use anyhow::{Context as _, Result};
23use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
24
25use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
26use terminal::TerminalBuilder;
27use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
28
29#[derive(Debug, Error)]
30#[error("Unsupported version")]
31pub struct UnsupportedVersion;
32
33pub struct AcpConnection {
34 server_name: SharedString,
35 telemetry_id: SharedString,
36 connection: Rc<acp::ClientSideConnection>,
37 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
38 auth_methods: Vec<acp::AuthMethod>,
39 agent_capabilities: acp::AgentCapabilities,
40 default_mode: Option<acp::SessionModeId>,
41 default_model: Option<acp::ModelId>,
42 default_config_options: HashMap<String, String>,
43 root_dir: PathBuf,
44 // NB: Don't move this into the wait_task, since we need to ensure the process is
45 // killed on drop (setting kill_on_drop on the command seems to not always work).
46 child: smol::process::Child,
47 _io_task: Task<Result<(), acp::Error>>,
48 _wait_task: Task<Result<()>>,
49 _stderr_task: Task<Result<()>>,
50}
51
52struct ConfigOptions {
53 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
54 tx: Rc<RefCell<watch::Sender<()>>>,
55 rx: watch::Receiver<()>,
56}
57
58impl ConfigOptions {
59 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
60 let (tx, rx) = watch::channel(());
61 Self {
62 config_options,
63 tx: Rc::new(RefCell::new(tx)),
64 rx,
65 }
66 }
67}
68
69pub struct AcpSession {
70 thread: WeakEntity<AcpThread>,
71 suppress_abort_err: bool,
72 models: Option<Rc<RefCell<acp::SessionModelState>>>,
73 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
74 config_options: Option<ConfigOptions>,
75}
76
77pub async fn connect(
78 server_name: SharedString,
79 command: AgentServerCommand,
80 root_dir: &Path,
81 default_mode: Option<acp::SessionModeId>,
82 default_model: Option<acp::ModelId>,
83 default_config_options: HashMap<String, String>,
84 is_remote: bool,
85 cx: &mut AsyncApp,
86) -> Result<Rc<dyn AgentConnection>> {
87 let conn = AcpConnection::stdio(
88 server_name,
89 command.clone(),
90 root_dir,
91 default_mode,
92 default_model,
93 default_config_options,
94 is_remote,
95 cx,
96 )
97 .await?;
98 Ok(Rc::new(conn) as _)
99}
100
101const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
102
103impl AcpConnection {
104 pub async fn stdio(
105 server_name: SharedString,
106 command: AgentServerCommand,
107 root_dir: &Path,
108 default_mode: Option<acp::SessionModeId>,
109 default_model: Option<acp::ModelId>,
110 default_config_options: HashMap<String, String>,
111 is_remote: bool,
112 cx: &mut AsyncApp,
113 ) -> Result<Self> {
114 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone())?;
115 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
116 let mut child =
117 builder.build_command(Some(command.path.display().to_string()), &command.args);
118 child
119 .envs(command.env.iter().flatten())
120 .stdin(std::process::Stdio::piped())
121 .stdout(std::process::Stdio::piped())
122 .stderr(std::process::Stdio::piped());
123 if !is_remote {
124 child.current_dir(root_dir);
125 }
126 let mut child = child.spawn()?;
127
128 let stdout = child.stdout.take().context("Failed to take stdout")?;
129 let stdin = child.stdin.take().context("Failed to take stdin")?;
130 let stderr = child.stderr.take().context("Failed to take stderr")?;
131 log::debug!(
132 "Spawning external agent server: {:?}, {:?}",
133 command.path,
134 command.args
135 );
136 log::trace!("Spawned (pid: {})", child.id());
137
138 let sessions = Rc::new(RefCell::new(HashMap::default()));
139
140 let (release_channel, version) = cx.update(|cx| {
141 (
142 release_channel::ReleaseChannel::try_global(cx)
143 .map(|release_channel| release_channel.display_name()),
144 release_channel::AppVersion::global(cx).to_string(),
145 )
146 })?;
147
148 let client = ClientDelegate {
149 sessions: sessions.clone(),
150 cx: cx.clone(),
151 };
152 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
153 let foreground_executor = cx.foreground_executor().clone();
154 move |fut| {
155 foreground_executor.spawn(fut).detach();
156 }
157 });
158
159 let io_task = cx.background_spawn(io_task);
160
161 let stderr_task = cx.background_spawn(async move {
162 let mut stderr = BufReader::new(stderr);
163 let mut line = String::new();
164 while let Ok(n) = stderr.read_line(&mut line).await
165 && n > 0
166 {
167 log::warn!("agent stderr: {}", line.trim());
168 line.clear();
169 }
170 Ok(())
171 });
172
173 let wait_task = cx.spawn({
174 let sessions = sessions.clone();
175 let status_fut = child.status();
176 async move |cx| {
177 let status = status_fut.await?;
178
179 for session in sessions.borrow().values() {
180 session
181 .thread
182 .update(cx, |thread, cx| {
183 thread.emit_load_error(LoadError::Exited { status }, cx)
184 })
185 .ok();
186 }
187
188 anyhow::Ok(())
189 }
190 });
191
192 let connection = Rc::new(connection);
193
194 cx.update(|cx| {
195 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
196 registry.set_active_connection(server_name.clone(), &connection, cx)
197 });
198 })?;
199
200 let response = connection
201 .initialize(
202 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
203 .client_capabilities(
204 acp::ClientCapabilities::new()
205 .fs(acp::FileSystemCapability::new()
206 .read_text_file(true)
207 .write_text_file(true))
208 .terminal(true)
209 // Experimental: Allow for rendering terminal output from the agents
210 .meta(acp::Meta::from_iter([
211 ("terminal_output".into(), true.into()),
212 ("terminal-auth".into(), true.into()),
213 ])),
214 )
215 .client_info(
216 acp::Implementation::new("zed", version)
217 .title(release_channel.map(ToOwned::to_owned)),
218 ),
219 )
220 .await?;
221
222 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
223 return Err(UnsupportedVersion.into());
224 }
225
226 let telemetry_id = response
227 .agent_info
228 // Use the one the agent provides if we have one
229 .map(|info| info.name.into())
230 // Otherwise, just use the name
231 .unwrap_or_else(|| server_name.clone());
232
233 Ok(Self {
234 auth_methods: response.auth_methods,
235 root_dir: root_dir.to_owned(),
236 connection,
237 server_name,
238 telemetry_id,
239 sessions,
240 agent_capabilities: response.agent_capabilities,
241 default_mode,
242 default_model,
243 default_config_options,
244 _io_task: io_task,
245 _wait_task: wait_task,
246 _stderr_task: stderr_task,
247 child,
248 })
249 }
250
251 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
252 &self.agent_capabilities.prompt_capabilities
253 }
254
255 pub fn root_dir(&self) -> &Path {
256 &self.root_dir
257 }
258}
259
260impl Drop for AcpConnection {
261 fn drop(&mut self) {
262 // See the comment on the child field.
263 self.child.kill().log_err();
264 }
265}
266
267impl AgentConnection for AcpConnection {
268 fn telemetry_id(&self) -> SharedString {
269 self.telemetry_id.clone()
270 }
271
272 fn new_thread(
273 self: Rc<Self>,
274 project: Entity<Project>,
275 cwd: &Path,
276 cx: &mut App,
277 ) -> Task<Result<Entity<AcpThread>>> {
278 let name = self.server_name.clone();
279 let conn = self.connection.clone();
280 let sessions = self.sessions.clone();
281 let default_mode = self.default_mode.clone();
282 let default_model = self.default_model.clone();
283 let default_config_options = self.default_config_options.clone();
284 let cwd = cwd.to_path_buf();
285 let context_server_store = project.read(cx).context_server_store().read(cx);
286 let mcp_servers = if project.read(cx).is_local() {
287 context_server_store
288 .configured_server_ids()
289 .iter()
290 .filter_map(|id| {
291 let configuration = context_server_store.configuration_for_server(id)?;
292 match &*configuration {
293 project::context_server_store::ContextServerConfiguration::Custom {
294 command,
295 ..
296 }
297 | project::context_server_store::ContextServerConfiguration::Extension {
298 command,
299 ..
300 } => Some(acp::McpServer::Stdio(
301 acp::McpServerStdio::new(id.0.to_string(), &command.path)
302 .args(command.args.clone())
303 .env(if let Some(env) = command.env.as_ref() {
304 env.iter()
305 .map(|(name, value)| acp::EnvVariable::new(name, value))
306 .collect()
307 } else {
308 vec![]
309 }),
310 )),
311 project::context_server_store::ContextServerConfiguration::Http {
312 url,
313 headers,
314 } => Some(acp::McpServer::Http(
315 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
316 headers
317 .iter()
318 .map(|(name, value)| acp::HttpHeader::new(name, value))
319 .collect(),
320 ),
321 )),
322 }
323 })
324 .collect()
325 } else {
326 // In SSH projects, the external agent is running on the remote
327 // machine, and currently we only run MCP servers on the local
328 // machine. So don't pass any MCP servers to the agent in that case.
329 Vec::new()
330 };
331
332 cx.spawn(async move |cx| {
333 let response = conn
334 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
335 .await
336 .map_err(|err| {
337 if err.code == acp::ErrorCode::AuthRequired {
338 let mut error = AuthRequired::new();
339
340 if err.message != acp::ErrorCode::AuthRequired.to_string() {
341 error = error.with_description(err.message);
342 }
343
344 anyhow!(error)
345 } else {
346 anyhow!(err)
347 }
348 })?;
349
350 let use_config_options = cx.update(|cx| cx.has_flag::<AcpBetaFeatureFlag>())?;
351
352 // Config options take precedence over legacy modes/models
353 let (modes, models, config_options) = if use_config_options && let Some(opts) = response.config_options {
354 (
355 None,
356 None,
357 Some(Rc::new(RefCell::new(opts))),
358 )
359 } else {
360 // Fall back to legacy modes/models
361 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
362 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
363 (modes, models, None)
364 };
365
366 if let Some(default_mode) = default_mode {
367 if let Some(modes) = modes.as_ref() {
368 let mut modes_ref = modes.borrow_mut();
369 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
370
371 if has_mode {
372 let initial_mode_id = modes_ref.current_mode_id.clone();
373
374 cx.spawn({
375 let default_mode = default_mode.clone();
376 let session_id = response.session_id.clone();
377 let modes = modes.clone();
378 let conn = conn.clone();
379 async move |_| {
380 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
381 .await.log_err();
382
383 if result.is_none() {
384 modes.borrow_mut().current_mode_id = initial_mode_id;
385 }
386 }
387 }).detach();
388
389 modes_ref.current_mode_id = default_mode;
390 } else {
391 let available_modes = modes_ref
392 .available_modes
393 .iter()
394 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
395 .collect::<Vec<_>>()
396 .join("\n");
397
398 log::warn!(
399 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
400 );
401 }
402 } else {
403 log::warn!(
404 "`{name}` does not support modes, but `default_mode` was set in settings.",
405 );
406 }
407 }
408
409 if let Some(default_model) = default_model {
410 if let Some(models) = models.as_ref() {
411 let mut models_ref = models.borrow_mut();
412 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
413
414 if has_model {
415 let initial_model_id = models_ref.current_model_id.clone();
416
417 cx.spawn({
418 let default_model = default_model.clone();
419 let session_id = response.session_id.clone();
420 let models = models.clone();
421 let conn = conn.clone();
422 async move |_| {
423 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
424 .await.log_err();
425
426 if result.is_none() {
427 models.borrow_mut().current_model_id = initial_model_id;
428 }
429 }
430 }).detach();
431
432 models_ref.current_model_id = default_model;
433 } else {
434 let available_models = models_ref
435 .available_models
436 .iter()
437 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
438 .collect::<Vec<_>>()
439 .join("\n");
440
441 log::warn!(
442 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
443 );
444 }
445 } else {
446 log::warn!(
447 "`{name}` does not support model selection, but `default_model` was set in settings.",
448 );
449 }
450 }
451
452 if let Some(config_opts) = config_options.as_ref() {
453 let defaults_to_apply: Vec<_> = {
454 let config_opts_ref = config_opts.borrow();
455 config_opts_ref
456 .iter()
457 .filter_map(|config_option| {
458 let default_value = default_config_options.get(&*config_option.id.0)?;
459
460 let is_valid = match &config_option.kind {
461 acp::SessionConfigKind::Select(select) => match &select.options {
462 acp::SessionConfigSelectOptions::Ungrouped(options) => {
463 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
464 }
465 acp::SessionConfigSelectOptions::Grouped(groups) => groups
466 .iter()
467 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
468 _ => false,
469 },
470 _ => false,
471 };
472
473 if is_valid {
474 let initial_value = match &config_option.kind {
475 acp::SessionConfigKind::Select(select) => {
476 Some(select.current_value.clone())
477 }
478 _ => None,
479 };
480 Some((config_option.id.clone(), default_value.clone(), initial_value))
481 } else {
482 log::warn!(
483 "`{}` is not a valid value for config option `{}` in {}",
484 default_value,
485 config_option.id.0,
486 name
487 );
488 None
489 }
490 })
491 .collect()
492 };
493
494 for (config_id, default_value, initial_value) in defaults_to_apply {
495 cx.spawn({
496 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
497 let session_id = response.session_id.clone();
498 let config_id_clone = config_id.clone();
499 let config_opts = config_opts.clone();
500 let conn = conn.clone();
501 async move |_| {
502 let result = conn
503 .set_session_config_option(
504 acp::SetSessionConfigOptionRequest::new(
505 session_id,
506 config_id_clone.clone(),
507 default_value_id,
508 ),
509 )
510 .await
511 .log_err();
512
513 if result.is_none() {
514 if let Some(initial) = initial_value {
515 let mut opts = config_opts.borrow_mut();
516 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
517 if let acp::SessionConfigKind::Select(select) =
518 &mut opt.kind
519 {
520 select.current_value = initial;
521 }
522 }
523 }
524 }
525 }
526 })
527 .detach();
528
529 let mut opts = config_opts.borrow_mut();
530 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
531 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
532 select.current_value = acp::SessionConfigValueId::new(default_value);
533 }
534 }
535 }
536 }
537
538 let session_id = response.session_id;
539 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
540 let thread = cx.new(|cx| {
541 AcpThread::new(
542 self.server_name.clone(),
543 self.clone(),
544 project,
545 action_log,
546 session_id.clone(),
547 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
548 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
549 cx,
550 )
551 })?;
552
553
554 let session = AcpSession {
555 thread: thread.downgrade(),
556 suppress_abort_err: false,
557 session_modes: modes,
558 models,
559 config_options: config_options.map(|opts| ConfigOptions::new(opts))
560 };
561 sessions.borrow_mut().insert(session_id, session);
562
563 Ok(thread)
564 })
565 }
566
567 fn auth_methods(&self) -> &[acp::AuthMethod] {
568 &self.auth_methods
569 }
570
571 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
572 let conn = self.connection.clone();
573 cx.foreground_executor().spawn(async move {
574 conn.authenticate(acp::AuthenticateRequest::new(method_id))
575 .await?;
576 Ok(())
577 })
578 }
579
580 fn prompt(
581 &self,
582 _id: Option<acp_thread::UserMessageId>,
583 params: acp::PromptRequest,
584 cx: &mut App,
585 ) -> Task<Result<acp::PromptResponse>> {
586 let conn = self.connection.clone();
587 let sessions = self.sessions.clone();
588 let session_id = params.session_id.clone();
589 cx.foreground_executor().spawn(async move {
590 let result = conn.prompt(params).await;
591
592 let mut suppress_abort_err = false;
593
594 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
595 suppress_abort_err = session.suppress_abort_err;
596 session.suppress_abort_err = false;
597 }
598
599 match result {
600 Ok(response) => Ok(response),
601 Err(err) => {
602 if err.code == acp::ErrorCode::AuthRequired {
603 return Err(anyhow!(acp::Error::auth_required()));
604 }
605
606 if err.code != ErrorCode::InternalError {
607 anyhow::bail!(err)
608 }
609
610 let Some(data) = &err.data else {
611 anyhow::bail!(err)
612 };
613
614 // Temporary workaround until the following PR is generally available:
615 // https://github.com/google-gemini/gemini-cli/pull/6656
616
617 #[derive(Deserialize)]
618 #[serde(deny_unknown_fields)]
619 struct ErrorDetails {
620 details: Box<str>,
621 }
622
623 match serde_json::from_value(data.clone()) {
624 Ok(ErrorDetails { details }) => {
625 if suppress_abort_err
626 && (details.contains("This operation was aborted")
627 || details.contains("The user aborted a request"))
628 {
629 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
630 } else {
631 Err(anyhow!(details))
632 }
633 }
634 Err(_) => Err(anyhow!(err)),
635 }
636 }
637 }
638 })
639 }
640
641 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
642 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
643 session.suppress_abort_err = true;
644 }
645 let conn = self.connection.clone();
646 let params = acp::CancelNotification::new(session_id.clone());
647 cx.foreground_executor()
648 .spawn(async move { conn.cancel(params).await })
649 .detach();
650 }
651
652 fn session_modes(
653 &self,
654 session_id: &acp::SessionId,
655 _cx: &App,
656 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
657 let sessions = self.sessions.clone();
658 let sessions_ref = sessions.borrow();
659 let Some(session) = sessions_ref.get(session_id) else {
660 return None;
661 };
662
663 if let Some(modes) = session.session_modes.as_ref() {
664 Some(Rc::new(AcpSessionModes {
665 connection: self.connection.clone(),
666 session_id: session_id.clone(),
667 state: modes.clone(),
668 }) as _)
669 } else {
670 None
671 }
672 }
673
674 fn model_selector(
675 &self,
676 session_id: &acp::SessionId,
677 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
678 let sessions = self.sessions.clone();
679 let sessions_ref = sessions.borrow();
680 let Some(session) = sessions_ref.get(session_id) else {
681 return None;
682 };
683
684 if let Some(models) = session.models.as_ref() {
685 Some(Rc::new(AcpModelSelector::new(
686 session_id.clone(),
687 self.connection.clone(),
688 models.clone(),
689 )) as _)
690 } else {
691 None
692 }
693 }
694
695 fn session_config_options(
696 &self,
697 session_id: &acp::SessionId,
698 _cx: &App,
699 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
700 let sessions = self.sessions.borrow();
701 let session = sessions.get(session_id)?;
702
703 let config_opts = session.config_options.as_ref()?;
704
705 Some(Rc::new(AcpSessionConfigOptions {
706 session_id: session_id.clone(),
707 connection: self.connection.clone(),
708 state: config_opts.config_options.clone(),
709 watch_tx: config_opts.tx.clone(),
710 watch_rx: config_opts.rx.clone(),
711 }) as _)
712 }
713
714 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
715 self
716 }
717}
718
719struct AcpSessionModes {
720 session_id: acp::SessionId,
721 connection: Rc<acp::ClientSideConnection>,
722 state: Rc<RefCell<acp::SessionModeState>>,
723}
724
725impl acp_thread::AgentSessionModes for AcpSessionModes {
726 fn current_mode(&self) -> acp::SessionModeId {
727 self.state.borrow().current_mode_id.clone()
728 }
729
730 fn all_modes(&self) -> Vec<acp::SessionMode> {
731 self.state.borrow().available_modes.clone()
732 }
733
734 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
735 let connection = self.connection.clone();
736 let session_id = self.session_id.clone();
737 let old_mode_id;
738 {
739 let mut state = self.state.borrow_mut();
740 old_mode_id = state.current_mode_id.clone();
741 state.current_mode_id = mode_id.clone();
742 };
743 let state = self.state.clone();
744 cx.foreground_executor().spawn(async move {
745 let result = connection
746 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
747 .await;
748
749 if result.is_err() {
750 state.borrow_mut().current_mode_id = old_mode_id;
751 }
752
753 result?;
754
755 Ok(())
756 })
757 }
758}
759
760struct AcpModelSelector {
761 session_id: acp::SessionId,
762 connection: Rc<acp::ClientSideConnection>,
763 state: Rc<RefCell<acp::SessionModelState>>,
764}
765
766impl AcpModelSelector {
767 fn new(
768 session_id: acp::SessionId,
769 connection: Rc<acp::ClientSideConnection>,
770 state: Rc<RefCell<acp::SessionModelState>>,
771 ) -> Self {
772 Self {
773 session_id,
774 connection,
775 state,
776 }
777 }
778}
779
780impl acp_thread::AgentModelSelector for AcpModelSelector {
781 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
782 Task::ready(Ok(acp_thread::AgentModelList::Flat(
783 self.state
784 .borrow()
785 .available_models
786 .clone()
787 .into_iter()
788 .map(acp_thread::AgentModelInfo::from)
789 .collect(),
790 )))
791 }
792
793 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
794 let connection = self.connection.clone();
795 let session_id = self.session_id.clone();
796 let old_model_id;
797 {
798 let mut state = self.state.borrow_mut();
799 old_model_id = state.current_model_id.clone();
800 state.current_model_id = model_id.clone();
801 };
802 let state = self.state.clone();
803 cx.foreground_executor().spawn(async move {
804 let result = connection
805 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
806 .await;
807
808 if result.is_err() {
809 state.borrow_mut().current_model_id = old_model_id;
810 }
811
812 result?;
813
814 Ok(())
815 })
816 }
817
818 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
819 let state = self.state.borrow();
820 Task::ready(
821 state
822 .available_models
823 .iter()
824 .find(|m| m.model_id == state.current_model_id)
825 .cloned()
826 .map(acp_thread::AgentModelInfo::from)
827 .ok_or_else(|| anyhow::anyhow!("Model not found")),
828 )
829 }
830}
831
832struct AcpSessionConfigOptions {
833 session_id: acp::SessionId,
834 connection: Rc<acp::ClientSideConnection>,
835 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
836 watch_tx: Rc<RefCell<watch::Sender<()>>>,
837 watch_rx: watch::Receiver<()>,
838}
839
840impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
841 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
842 self.state.borrow().clone()
843 }
844
845 fn set_config_option(
846 &self,
847 config_id: acp::SessionConfigId,
848 value: acp::SessionConfigValueId,
849 cx: &mut App,
850 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
851 let connection = self.connection.clone();
852 let session_id = self.session_id.clone();
853 let state = self.state.clone();
854
855 let watch_tx = self.watch_tx.clone();
856
857 cx.foreground_executor().spawn(async move {
858 let response = connection
859 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
860 session_id, config_id, value,
861 ))
862 .await?;
863
864 *state.borrow_mut() = response.config_options.clone();
865 watch_tx.borrow_mut().send(()).ok();
866 Ok(response.config_options)
867 })
868 }
869
870 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
871 Some(self.watch_rx.clone())
872 }
873}
874
875struct ClientDelegate {
876 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
877 cx: AsyncApp,
878}
879
880#[async_trait::async_trait(?Send)]
881impl acp::Client for ClientDelegate {
882 async fn request_permission(
883 &self,
884 arguments: acp::RequestPermissionRequest,
885 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
886 let respect_always_allow_setting;
887 let thread;
888 {
889 let sessions_ref = self.sessions.borrow();
890 let session = sessions_ref
891 .get(&arguments.session_id)
892 .context("Failed to get session")?;
893 respect_always_allow_setting = session.session_modes.is_none();
894 thread = session.thread.clone();
895 }
896
897 let cx = &mut self.cx.clone();
898
899 let task = thread.update(cx, |thread, cx| {
900 thread.request_tool_call_authorization(
901 arguments.tool_call,
902 arguments.options,
903 respect_always_allow_setting,
904 cx,
905 )
906 })??;
907
908 let outcome = task.await;
909
910 Ok(acp::RequestPermissionResponse::new(outcome))
911 }
912
913 async fn write_text_file(
914 &self,
915 arguments: acp::WriteTextFileRequest,
916 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
917 let cx = &mut self.cx.clone();
918 let task = self
919 .session_thread(&arguments.session_id)?
920 .update(cx, |thread, cx| {
921 thread.write_text_file(arguments.path, arguments.content, cx)
922 })?;
923
924 task.await?;
925
926 Ok(Default::default())
927 }
928
929 async fn read_text_file(
930 &self,
931 arguments: acp::ReadTextFileRequest,
932 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
933 let task = self.session_thread(&arguments.session_id)?.update(
934 &mut self.cx.clone(),
935 |thread, cx| {
936 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
937 },
938 )?;
939
940 let content = task.await?;
941
942 Ok(acp::ReadTextFileResponse::new(content))
943 }
944
945 async fn session_notification(
946 &self,
947 notification: acp::SessionNotification,
948 ) -> Result<(), acp::Error> {
949 let sessions = self.sessions.borrow();
950 let session = sessions
951 .get(¬ification.session_id)
952 .context("Failed to get session")?;
953
954 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
955 current_mode_id,
956 ..
957 }) = ¬ification.update
958 {
959 if let Some(session_modes) = &session.session_modes {
960 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
961 } else {
962 log::error!(
963 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
964 );
965 }
966 }
967
968 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
969 config_options,
970 ..
971 }) = ¬ification.update
972 {
973 if let Some(opts) = &session.config_options {
974 *opts.config_options.borrow_mut() = config_options.clone();
975 opts.tx.borrow_mut().send(()).ok();
976 } else {
977 log::error!(
978 "Got a `ConfigOptionUpdate` notification, but the agent didn't specify `config_options` during session setup."
979 );
980 }
981 }
982
983 // Clone so we can inspect meta both before and after handing off to the thread
984 let update_clone = notification.update.clone();
985
986 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
987 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
988 if let Some(meta) = &tc.meta {
989 if let Some(terminal_info) = meta.get("terminal_info") {
990 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
991 {
992 let terminal_id = acp::TerminalId::new(id_str);
993 let cwd = terminal_info
994 .get("cwd")
995 .and_then(|v| v.as_str().map(PathBuf::from));
996
997 // Create a minimal display-only lower-level terminal and register it.
998 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
999 let builder = TerminalBuilder::new_display_only(
1000 CursorShape::default(),
1001 AlternateScroll::On,
1002 None,
1003 0,
1004 )?;
1005 let lower = cx.new(|cx| builder.subscribe(cx));
1006 thread.on_terminal_provider_event(
1007 TerminalProviderEvent::Created {
1008 terminal_id,
1009 label: tc.title.clone(),
1010 cwd,
1011 output_byte_limit: None,
1012 terminal: lower,
1013 },
1014 cx,
1015 );
1016 anyhow::Ok(())
1017 });
1018 }
1019 }
1020 }
1021 }
1022
1023 // Forward the update to the acp_thread as usual.
1024 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1025 thread.handle_session_update(notification.update.clone(), cx)
1026 })??;
1027
1028 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1029 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1030 if let Some(meta) = &tcu.meta {
1031 if let Some(term_out) = meta.get("terminal_output") {
1032 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1033 let terminal_id = acp::TerminalId::new(id_str);
1034 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1035 let data = s.as_bytes().to_vec();
1036 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1037 thread.on_terminal_provider_event(
1038 TerminalProviderEvent::Output { terminal_id, data },
1039 cx,
1040 );
1041 });
1042 }
1043 }
1044 }
1045
1046 // terminal_exit
1047 if let Some(term_exit) = meta.get("terminal_exit") {
1048 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1049 let terminal_id = acp::TerminalId::new(id_str);
1050 let status = acp::TerminalExitStatus::new()
1051 .exit_code(
1052 term_exit
1053 .get("exit_code")
1054 .and_then(|v| v.as_u64())
1055 .map(|i| i as u32),
1056 )
1057 .signal(
1058 term_exit
1059 .get("signal")
1060 .and_then(|v| v.as_str().map(|s| s.to_string())),
1061 );
1062
1063 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1064 thread.on_terminal_provider_event(
1065 TerminalProviderEvent::Exit {
1066 terminal_id,
1067 status,
1068 },
1069 cx,
1070 );
1071 });
1072 }
1073 }
1074 }
1075 }
1076
1077 Ok(())
1078 }
1079
1080 async fn create_terminal(
1081 &self,
1082 args: acp::CreateTerminalRequest,
1083 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1084 let thread = self.session_thread(&args.session_id)?;
1085 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1086
1087 let terminal_entity = acp_thread::create_terminal_entity(
1088 args.command.clone(),
1089 &args.args,
1090 args.env
1091 .into_iter()
1092 .map(|env| (env.name, env.value))
1093 .collect(),
1094 args.cwd.clone(),
1095 &project,
1096 &mut self.cx.clone(),
1097 )
1098 .await?;
1099
1100 // Register with renderer
1101 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1102 thread.register_terminal_created(
1103 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1104 format!("{} {}", args.command, args.args.join(" ")),
1105 args.cwd.clone(),
1106 args.output_byte_limit,
1107 terminal_entity,
1108 cx,
1109 )
1110 })?;
1111 let terminal_id =
1112 terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
1113 Ok(acp::CreateTerminalResponse::new(terminal_id))
1114 }
1115
1116 async fn kill_terminal_command(
1117 &self,
1118 args: acp::KillTerminalCommandRequest,
1119 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1120 self.session_thread(&args.session_id)?
1121 .update(&mut self.cx.clone(), |thread, cx| {
1122 thread.kill_terminal(args.terminal_id, cx)
1123 })??;
1124
1125 Ok(Default::default())
1126 }
1127
1128 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1129 Err(acp::Error::method_not_found())
1130 }
1131
1132 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1133 Err(acp::Error::method_not_found())
1134 }
1135
1136 async fn release_terminal(
1137 &self,
1138 args: acp::ReleaseTerminalRequest,
1139 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1140 self.session_thread(&args.session_id)?
1141 .update(&mut self.cx.clone(), |thread, cx| {
1142 thread.release_terminal(args.terminal_id, cx)
1143 })??;
1144
1145 Ok(Default::default())
1146 }
1147
1148 async fn terminal_output(
1149 &self,
1150 args: acp::TerminalOutputRequest,
1151 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1152 self.session_thread(&args.session_id)?
1153 .read_with(&mut self.cx.clone(), |thread, cx| {
1154 let out = thread
1155 .terminal(args.terminal_id)?
1156 .read(cx)
1157 .current_output(cx);
1158
1159 Ok(out)
1160 })?
1161 }
1162
1163 async fn wait_for_terminal_exit(
1164 &self,
1165 args: acp::WaitForTerminalExitRequest,
1166 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1167 let exit_status = self
1168 .session_thread(&args.session_id)?
1169 .update(&mut self.cx.clone(), |thread, cx| {
1170 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1171 })??
1172 .await;
1173
1174 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1175 }
1176}
1177
1178impl ClientDelegate {
1179 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1180 let sessions = self.sessions.borrow();
1181 sessions
1182 .get(session_id)
1183 .context("Failed to get session")
1184 .map(|session| session.thread.clone())
1185 }
1186}