1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::Project;
13use project::agent_server_store::AgentServerCommand;
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::process::Child;
19
20use std::path::PathBuf;
21use std::process::Stdio;
22use std::{any::Any, cell::RefCell};
23use std::{path::Path, rc::Rc};
24use thiserror::Error;
25
26use anyhow::{Context as _, Result};
27use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
28
29use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
30use terminal::TerminalBuilder;
31use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
32
33#[derive(Debug, Error)]
34#[error("Unsupported version")]
35pub struct UnsupportedVersion;
36
37pub struct AcpConnection {
38 server_name: SharedString,
39 display_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 root_dir: PathBuf,
49 child: Child,
50 session_list: Option<Rc<AcpSessionList>>,
51 _io_task: Task<Result<(), acp::Error>>,
52 _wait_task: Task<Result<()>>,
53 _stderr_task: Task<Result<()>>,
54}
55
56struct ConfigOptions {
57 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
58 tx: Rc<RefCell<watch::Sender<()>>>,
59 rx: watch::Receiver<()>,
60}
61
62impl ConfigOptions {
63 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
64 let (tx, rx) = watch::channel(());
65 Self {
66 config_options,
67 tx: Rc::new(RefCell::new(tx)),
68 rx,
69 }
70 }
71}
72
73pub struct AcpSession {
74 thread: WeakEntity<AcpThread>,
75 suppress_abort_err: bool,
76 models: Option<Rc<RefCell<acp::SessionModelState>>>,
77 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
78 config_options: Option<ConfigOptions>,
79}
80
81pub struct AcpSessionList {
82 connection: Rc<acp::ClientSideConnection>,
83 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
84 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
85}
86
87impl AcpSessionList {
88 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
89 let (tx, rx) = smol::channel::unbounded();
90 Self {
91 connection,
92 updates_tx: tx,
93 updates_rx: rx,
94 }
95 }
96
97 fn notify_update(&self) {
98 self.updates_tx
99 .try_send(acp_thread::SessionListUpdate::Refresh)
100 .log_err();
101 }
102
103 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
104 self.updates_tx
105 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
106 .log_err();
107 }
108}
109
110impl AgentSessionList for AcpSessionList {
111 fn list_sessions(
112 &self,
113 request: AgentSessionListRequest,
114 cx: &mut App,
115 ) -> Task<Result<AgentSessionListResponse>> {
116 let conn = self.connection.clone();
117 cx.foreground_executor().spawn(async move {
118 let acp_request = acp::ListSessionsRequest::new()
119 .cwd(request.cwd)
120 .cursor(request.cursor);
121 let response = conn.list_sessions(acp_request).await?;
122 Ok(AgentSessionListResponse {
123 sessions: response
124 .sessions
125 .into_iter()
126 .map(|s| AgentSessionInfo {
127 session_id: s.session_id,
128 cwd: Some(s.cwd),
129 title: s.title.map(Into::into),
130 updated_at: s.updated_at.and_then(|date_str| {
131 chrono::DateTime::parse_from_rfc3339(&date_str)
132 .ok()
133 .map(|dt| dt.with_timezone(&chrono::Utc))
134 }),
135 meta: s.meta,
136 })
137 .collect(),
138 next_cursor: response.next_cursor,
139 meta: response.meta,
140 })
141 })
142 }
143
144 fn watch(
145 &self,
146 _cx: &mut App,
147 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
148 Some(self.updates_rx.clone())
149 }
150
151 fn notify_refresh(&self) {
152 self.notify_update();
153 }
154
155 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
156 self
157 }
158}
159
160pub async fn connect(
161 server_name: SharedString,
162 display_name: SharedString,
163 command: AgentServerCommand,
164 root_dir: &Path,
165 default_mode: Option<acp::SessionModeId>,
166 default_model: Option<acp::ModelId>,
167 default_config_options: HashMap<String, String>,
168 is_remote: bool,
169 cx: &mut AsyncApp,
170) -> Result<Rc<dyn AgentConnection>> {
171 let conn = AcpConnection::stdio(
172 server_name,
173 display_name,
174 command.clone(),
175 root_dir,
176 default_mode,
177 default_model,
178 default_config_options,
179 is_remote,
180 cx,
181 )
182 .await?;
183 Ok(Rc::new(conn) as _)
184}
185
186const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
187
188impl AcpConnection {
189 pub async fn stdio(
190 server_name: SharedString,
191 display_name: SharedString,
192 command: AgentServerCommand,
193 root_dir: &Path,
194 default_mode: Option<acp::SessionModeId>,
195 default_model: Option<acp::ModelId>,
196 default_config_options: HashMap<String, String>,
197 is_remote: bool,
198 cx: &mut AsyncApp,
199 ) -> Result<Self> {
200 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
201 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
202 let mut child =
203 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
204 child.envs(command.env.iter().flatten());
205 if !is_remote {
206 child.current_dir(root_dir);
207 }
208 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
209
210 let stdout = child.stdout.take().context("Failed to take stdout")?;
211 let stdin = child.stdin.take().context("Failed to take stdin")?;
212 let stderr = child.stderr.take().context("Failed to take stderr")?;
213 log::debug!(
214 "Spawning external agent server: {:?}, {:?}",
215 command.path,
216 command.args
217 );
218 log::trace!("Spawned (pid: {})", child.id());
219
220 let sessions = Rc::new(RefCell::new(HashMap::default()));
221
222 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
223 (
224 release_channel::ReleaseChannel::try_global(cx)
225 .map(|release_channel| release_channel.display_name()),
226 release_channel::AppVersion::global(cx).to_string(),
227 )
228 });
229
230 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
231 Rc::new(RefCell::new(None));
232
233 let client = ClientDelegate {
234 sessions: sessions.clone(),
235 session_list: client_session_list.clone(),
236 cx: cx.clone(),
237 };
238 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
239 let foreground_executor = cx.foreground_executor().clone();
240 move |fut| {
241 foreground_executor.spawn(fut).detach();
242 }
243 });
244
245 let io_task = cx.background_spawn(io_task);
246
247 let stderr_task = cx.background_spawn(async move {
248 let mut stderr = BufReader::new(stderr);
249 let mut line = String::new();
250 while let Ok(n) = stderr.read_line(&mut line).await
251 && n > 0
252 {
253 log::warn!("agent stderr: {}", line.trim());
254 line.clear();
255 }
256 Ok(())
257 });
258
259 let wait_task = cx.spawn({
260 let sessions = sessions.clone();
261 let status_fut = child.status();
262 async move |cx| {
263 let status = status_fut.await?;
264
265 for session in sessions.borrow().values() {
266 session
267 .thread
268 .update(cx, |thread, cx| {
269 thread.emit_load_error(LoadError::Exited { status }, cx)
270 })
271 .ok();
272 }
273
274 anyhow::Ok(())
275 }
276 });
277
278 let connection = Rc::new(connection);
279
280 cx.update(|cx| {
281 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
282 registry.set_active_connection(server_name.clone(), &connection, cx)
283 });
284 });
285
286 let response = connection
287 .initialize(
288 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
289 .client_capabilities(
290 acp::ClientCapabilities::new()
291 .fs(acp::FileSystemCapability::new()
292 .read_text_file(true)
293 .write_text_file(true))
294 .terminal(true)
295 // Experimental: Allow for rendering terminal output from the agents
296 .meta(acp::Meta::from_iter([
297 ("terminal_output".into(), true.into()),
298 ("terminal-auth".into(), true.into()),
299 ])),
300 )
301 .client_info(
302 acp::Implementation::new("zed", version)
303 .title(release_channel.map(ToOwned::to_owned)),
304 ),
305 )
306 .await?;
307
308 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
309 return Err(UnsupportedVersion.into());
310 }
311
312 let telemetry_id = response
313 .agent_info
314 // Use the one the agent provides if we have one
315 .map(|info| info.name.into())
316 // Otherwise, just use the name
317 .unwrap_or_else(|| server_name.clone());
318
319 let session_list = if response
320 .agent_capabilities
321 .session_capabilities
322 .list
323 .is_some()
324 {
325 let list = Rc::new(AcpSessionList::new(connection.clone()));
326 *client_session_list.borrow_mut() = Some(list.clone());
327 Some(list)
328 } else {
329 None
330 };
331
332 Ok(Self {
333 auth_methods: response.auth_methods,
334 root_dir: root_dir.to_owned(),
335 connection,
336 server_name,
337 display_name,
338 telemetry_id,
339 sessions,
340 agent_capabilities: response.agent_capabilities,
341 default_mode,
342 default_model,
343 default_config_options,
344 session_list,
345 _io_task: io_task,
346 _wait_task: wait_task,
347 _stderr_task: stderr_task,
348 child,
349 })
350 }
351
352 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
353 &self.agent_capabilities.prompt_capabilities
354 }
355
356 pub fn root_dir(&self) -> &Path {
357 &self.root_dir
358 }
359}
360
361impl Drop for AcpConnection {
362 fn drop(&mut self) {
363 self.child.kill().log_err();
364 }
365}
366
367impl AgentConnection for AcpConnection {
368 fn telemetry_id(&self) -> SharedString {
369 self.telemetry_id.clone()
370 }
371
372 fn new_session(
373 self: Rc<Self>,
374 project: Entity<Project>,
375 cwd: &Path,
376 cx: &mut App,
377 ) -> Task<Result<Entity<AcpThread>>> {
378 let name = self.server_name.clone();
379 let cwd = cwd.to_path_buf();
380 let mcp_servers = mcp_servers_for_project(&project, cx);
381
382 cx.spawn(async move |cx| {
383 let response = self.connection
384 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
385 .await
386 .map_err(map_acp_error)?;
387
388 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
389
390 if let Some(default_mode) = self.default_mode.clone() {
391 if let Some(modes) = modes.as_ref() {
392 let mut modes_ref = modes.borrow_mut();
393 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
394
395 if has_mode {
396 let initial_mode_id = modes_ref.current_mode_id.clone();
397
398 cx.spawn({
399 let default_mode = default_mode.clone();
400 let session_id = response.session_id.clone();
401 let modes = modes.clone();
402 let conn = self.connection.clone();
403 async move |_| {
404 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
405 .await.log_err();
406
407 if result.is_none() {
408 modes.borrow_mut().current_mode_id = initial_mode_id;
409 }
410 }
411 }).detach();
412
413 modes_ref.current_mode_id = default_mode;
414 } else {
415 let available_modes = modes_ref
416 .available_modes
417 .iter()
418 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
419 .collect::<Vec<_>>()
420 .join("\n");
421
422 log::warn!(
423 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
424 );
425 }
426 }
427 }
428
429 if let Some(default_model) = self.default_model.clone() {
430 if let Some(models) = models.as_ref() {
431 let mut models_ref = models.borrow_mut();
432 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
433
434 if has_model {
435 let initial_model_id = models_ref.current_model_id.clone();
436
437 cx.spawn({
438 let default_model = default_model.clone();
439 let session_id = response.session_id.clone();
440 let models = models.clone();
441 let conn = self.connection.clone();
442 async move |_| {
443 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
444 .await.log_err();
445
446 if result.is_none() {
447 models.borrow_mut().current_model_id = initial_model_id;
448 }
449 }
450 }).detach();
451
452 models_ref.current_model_id = default_model;
453 } else {
454 let available_models = models_ref
455 .available_models
456 .iter()
457 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
458 .collect::<Vec<_>>()
459 .join("\n");
460
461 log::warn!(
462 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
463 );
464 }
465 }
466 }
467
468 if let Some(config_opts) = config_options.as_ref() {
469 let defaults_to_apply: Vec<_> = {
470 let config_opts_ref = config_opts.borrow();
471 config_opts_ref
472 .iter()
473 .filter_map(|config_option| {
474 let default_value = self.default_config_options.get(&*config_option.id.0)?;
475
476 let is_valid = match &config_option.kind {
477 acp::SessionConfigKind::Select(select) => match &select.options {
478 acp::SessionConfigSelectOptions::Ungrouped(options) => {
479 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
480 }
481 acp::SessionConfigSelectOptions::Grouped(groups) => groups
482 .iter()
483 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
484 _ => false,
485 },
486 _ => false,
487 };
488
489 if is_valid {
490 let initial_value = match &config_option.kind {
491 acp::SessionConfigKind::Select(select) => {
492 Some(select.current_value.clone())
493 }
494 _ => None,
495 };
496 Some((config_option.id.clone(), default_value.clone(), initial_value))
497 } else {
498 log::warn!(
499 "`{}` is not a valid value for config option `{}` in {}",
500 default_value,
501 config_option.id.0,
502 name
503 );
504 None
505 }
506 })
507 .collect()
508 };
509
510 for (config_id, default_value, initial_value) in defaults_to_apply {
511 cx.spawn({
512 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
513 let session_id = response.session_id.clone();
514 let config_id_clone = config_id.clone();
515 let config_opts = config_opts.clone();
516 let conn = self.connection.clone();
517 async move |_| {
518 let result = conn
519 .set_session_config_option(
520 acp::SetSessionConfigOptionRequest::new(
521 session_id,
522 config_id_clone.clone(),
523 default_value_id,
524 ),
525 )
526 .await
527 .log_err();
528
529 if result.is_none() {
530 if let Some(initial) = initial_value {
531 let mut opts = config_opts.borrow_mut();
532 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
533 if let acp::SessionConfigKind::Select(select) =
534 &mut opt.kind
535 {
536 select.current_value = initial;
537 }
538 }
539 }
540 }
541 }
542 })
543 .detach();
544
545 let mut opts = config_opts.borrow_mut();
546 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
547 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
548 select.current_value = acp::SessionConfigValueId::new(default_value);
549 }
550 }
551 }
552 }
553
554 let action_log = cx.new(|_| ActionLog::new(project.clone()));
555 let thread: Entity<AcpThread> = cx.new(|cx| {
556 AcpThread::new(
557 None,
558 self.display_name.clone(),
559 self.clone(),
560 project,
561 action_log,
562 response.session_id.clone(),
563 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
564 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
565 cx,
566 )
567 });
568
569 self.sessions.borrow_mut().insert(
570 response.session_id,
571 AcpSession {
572 thread: thread.downgrade(),
573 suppress_abort_err: false,
574 session_modes: modes,
575 models,
576 config_options: config_options.map(ConfigOptions::new),
577 },
578 );
579
580 Ok(thread)
581 })
582 }
583
584 fn supports_load_session(&self) -> bool {
585 self.agent_capabilities.load_session
586 }
587
588 fn supports_resume_session(&self) -> bool {
589 self.agent_capabilities
590 .session_capabilities
591 .resume
592 .is_some()
593 }
594
595 fn load_session(
596 self: Rc<Self>,
597 session: AgentSessionInfo,
598 project: Entity<Project>,
599 cwd: &Path,
600 cx: &mut App,
601 ) -> Task<Result<Entity<AcpThread>>> {
602 if !self.agent_capabilities.load_session {
603 return Task::ready(Err(anyhow!(LoadError::Other(
604 "Loading sessions is not supported by this agent.".into()
605 ))));
606 }
607
608 let cwd = cwd.to_path_buf();
609 let mcp_servers = mcp_servers_for_project(&project, cx);
610 let action_log = cx.new(|_| ActionLog::new(project.clone()));
611 let title = session
612 .title
613 .clone()
614 .unwrap_or_else(|| self.display_name.clone());
615 let thread: Entity<AcpThread> = cx.new(|cx| {
616 AcpThread::new(
617 None,
618 title,
619 self.clone(),
620 project,
621 action_log,
622 session.session_id.clone(),
623 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
624 cx,
625 )
626 });
627
628 self.sessions.borrow_mut().insert(
629 session.session_id.clone(),
630 AcpSession {
631 thread: thread.downgrade(),
632 suppress_abort_err: false,
633 session_modes: None,
634 models: None,
635 config_options: None,
636 },
637 );
638
639 cx.spawn(async move |_| {
640 let response = match self
641 .connection
642 .load_session(
643 acp::LoadSessionRequest::new(session.session_id.clone(), cwd)
644 .mcp_servers(mcp_servers),
645 )
646 .await
647 {
648 Ok(response) => response,
649 Err(err) => {
650 self.sessions.borrow_mut().remove(&session.session_id);
651 return Err(map_acp_error(err));
652 }
653 };
654
655 let (modes, models, config_options) =
656 config_state(response.modes, response.models, response.config_options);
657 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
658 session.session_modes = modes;
659 session.models = models;
660 session.config_options = config_options.map(ConfigOptions::new);
661 }
662
663 Ok(thread)
664 })
665 }
666
667 fn resume_session(
668 self: Rc<Self>,
669 session: AgentSessionInfo,
670 project: Entity<Project>,
671 cwd: &Path,
672 cx: &mut App,
673 ) -> Task<Result<Entity<AcpThread>>> {
674 if self
675 .agent_capabilities
676 .session_capabilities
677 .resume
678 .is_none()
679 {
680 return Task::ready(Err(anyhow!(LoadError::Other(
681 "Resuming sessions is not supported by this agent.".into()
682 ))));
683 }
684
685 let cwd = cwd.to_path_buf();
686 let mcp_servers = mcp_servers_for_project(&project, cx);
687 let action_log = cx.new(|_| ActionLog::new(project.clone()));
688 let title = session
689 .title
690 .clone()
691 .unwrap_or_else(|| self.display_name.clone());
692 let thread: Entity<AcpThread> = cx.new(|cx| {
693 AcpThread::new(
694 None,
695 title,
696 self.clone(),
697 project,
698 action_log,
699 session.session_id.clone(),
700 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
701 cx,
702 )
703 });
704
705 self.sessions.borrow_mut().insert(
706 session.session_id.clone(),
707 AcpSession {
708 thread: thread.downgrade(),
709 suppress_abort_err: false,
710 session_modes: None,
711 models: None,
712 config_options: None,
713 },
714 );
715
716 cx.spawn(async move |_| {
717 let response = match self
718 .connection
719 .resume_session(
720 acp::ResumeSessionRequest::new(session.session_id.clone(), cwd)
721 .mcp_servers(mcp_servers),
722 )
723 .await
724 {
725 Ok(response) => response,
726 Err(err) => {
727 self.sessions.borrow_mut().remove(&session.session_id);
728 return Err(map_acp_error(err));
729 }
730 };
731
732 let (modes, models, config_options) =
733 config_state(response.modes, response.models, response.config_options);
734 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
735 session.session_modes = modes;
736 session.models = models;
737 session.config_options = config_options.map(ConfigOptions::new);
738 }
739
740 Ok(thread)
741 })
742 }
743
744 fn auth_methods(&self) -> &[acp::AuthMethod] {
745 &self.auth_methods
746 }
747
748 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
749 let conn = self.connection.clone();
750 cx.foreground_executor().spawn(async move {
751 conn.authenticate(acp::AuthenticateRequest::new(method_id))
752 .await?;
753 Ok(())
754 })
755 }
756
757 fn prompt(
758 &self,
759 _id: Option<acp_thread::UserMessageId>,
760 params: acp::PromptRequest,
761 cx: &mut App,
762 ) -> Task<Result<acp::PromptResponse>> {
763 let conn = self.connection.clone();
764 let sessions = self.sessions.clone();
765 let session_id = params.session_id.clone();
766 cx.foreground_executor().spawn(async move {
767 let result = conn.prompt(params).await;
768
769 let mut suppress_abort_err = false;
770
771 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
772 suppress_abort_err = session.suppress_abort_err;
773 session.suppress_abort_err = false;
774 }
775
776 match result {
777 Ok(response) => Ok(response),
778 Err(err) => {
779 if err.code == acp::ErrorCode::AuthRequired {
780 return Err(anyhow!(acp::Error::auth_required()));
781 }
782
783 if err.code != ErrorCode::InternalError {
784 anyhow::bail!(err)
785 }
786
787 let Some(data) = &err.data else {
788 anyhow::bail!(err)
789 };
790
791 // Temporary workaround until the following PR is generally available:
792 // https://github.com/google-gemini/gemini-cli/pull/6656
793
794 #[derive(Deserialize)]
795 #[serde(deny_unknown_fields)]
796 struct ErrorDetails {
797 details: Box<str>,
798 }
799
800 match serde_json::from_value(data.clone()) {
801 Ok(ErrorDetails { details }) => {
802 if suppress_abort_err
803 && (details.contains("This operation was aborted")
804 || details.contains("The user aborted a request"))
805 {
806 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
807 } else {
808 Err(anyhow!(details))
809 }
810 }
811 Err(_) => Err(anyhow!(err)),
812 }
813 }
814 }
815 })
816 }
817
818 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
819 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
820 session.suppress_abort_err = true;
821 }
822 let conn = self.connection.clone();
823 let params = acp::CancelNotification::new(session_id.clone());
824 cx.foreground_executor()
825 .spawn(async move { conn.cancel(params).await })
826 .detach();
827 }
828
829 fn session_modes(
830 &self,
831 session_id: &acp::SessionId,
832 _cx: &App,
833 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
834 let sessions = self.sessions.clone();
835 let sessions_ref = sessions.borrow();
836 let Some(session) = sessions_ref.get(session_id) else {
837 return None;
838 };
839
840 if let Some(modes) = session.session_modes.as_ref() {
841 Some(Rc::new(AcpSessionModes {
842 connection: self.connection.clone(),
843 session_id: session_id.clone(),
844 state: modes.clone(),
845 }) as _)
846 } else {
847 None
848 }
849 }
850
851 fn model_selector(
852 &self,
853 session_id: &acp::SessionId,
854 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
855 let sessions = self.sessions.clone();
856 let sessions_ref = sessions.borrow();
857 let Some(session) = sessions_ref.get(session_id) else {
858 return None;
859 };
860
861 if let Some(models) = session.models.as_ref() {
862 Some(Rc::new(AcpModelSelector::new(
863 session_id.clone(),
864 self.connection.clone(),
865 models.clone(),
866 )) as _)
867 } else {
868 None
869 }
870 }
871
872 fn session_config_options(
873 &self,
874 session_id: &acp::SessionId,
875 _cx: &App,
876 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
877 let sessions = self.sessions.borrow();
878 let session = sessions.get(session_id)?;
879
880 let config_opts = session.config_options.as_ref()?;
881
882 Some(Rc::new(AcpSessionConfigOptions {
883 session_id: session_id.clone(),
884 connection: self.connection.clone(),
885 state: config_opts.config_options.clone(),
886 watch_tx: config_opts.tx.clone(),
887 watch_rx: config_opts.rx.clone(),
888 }) as _)
889 }
890
891 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
892 self.session_list.clone().map(|s| s as _)
893 }
894
895 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
896 self
897 }
898}
899
900fn map_acp_error(err: acp::Error) -> anyhow::Error {
901 if err.code == acp::ErrorCode::AuthRequired {
902 let mut error = AuthRequired::new();
903
904 if err.message != acp::ErrorCode::AuthRequired.to_string() {
905 error = error.with_description(err.message);
906 }
907
908 anyhow!(error)
909 } else {
910 anyhow!(err)
911 }
912}
913
914fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
915 let context_server_store = project.read(cx).context_server_store().read(cx);
916 let is_local = project.read(cx).is_local();
917 context_server_store
918 .configured_server_ids()
919 .iter()
920 .filter_map(|id| {
921 let configuration = context_server_store.configuration_for_server(id)?;
922 match &*configuration {
923 project::context_server_store::ContextServerConfiguration::Custom {
924 command,
925 remote,
926 ..
927 }
928 | project::context_server_store::ContextServerConfiguration::Extension {
929 command,
930 remote,
931 ..
932 } if is_local || *remote => Some(acp::McpServer::Stdio(
933 acp::McpServerStdio::new(id.0.to_string(), &command.path)
934 .args(command.args.clone())
935 .env(if let Some(env) = command.env.as_ref() {
936 env.iter()
937 .map(|(name, value)| acp::EnvVariable::new(name, value))
938 .collect()
939 } else {
940 vec![]
941 }),
942 )),
943 project::context_server_store::ContextServerConfiguration::Http {
944 url,
945 headers,
946 timeout: _,
947 } => Some(acp::McpServer::Http(
948 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
949 headers
950 .iter()
951 .map(|(name, value)| acp::HttpHeader::new(name, value))
952 .collect(),
953 ),
954 )),
955 _ => None,
956 }
957 })
958 .collect()
959}
960
961fn config_state(
962 modes: Option<acp::SessionModeState>,
963 models: Option<acp::SessionModelState>,
964 config_options: Option<Vec<acp::SessionConfigOption>>,
965) -> (
966 Option<Rc<RefCell<acp::SessionModeState>>>,
967 Option<Rc<RefCell<acp::SessionModelState>>>,
968 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
969) {
970 if let Some(opts) = config_options {
971 return (None, None, Some(Rc::new(RefCell::new(opts))));
972 }
973
974 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
975 let models = models.map(|models| Rc::new(RefCell::new(models)));
976 (modes, models, None)
977}
978
979struct AcpSessionModes {
980 session_id: acp::SessionId,
981 connection: Rc<acp::ClientSideConnection>,
982 state: Rc<RefCell<acp::SessionModeState>>,
983}
984
985impl acp_thread::AgentSessionModes for AcpSessionModes {
986 fn current_mode(&self) -> acp::SessionModeId {
987 self.state.borrow().current_mode_id.clone()
988 }
989
990 fn all_modes(&self) -> Vec<acp::SessionMode> {
991 self.state.borrow().available_modes.clone()
992 }
993
994 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
995 let connection = self.connection.clone();
996 let session_id = self.session_id.clone();
997 let old_mode_id;
998 {
999 let mut state = self.state.borrow_mut();
1000 old_mode_id = state.current_mode_id.clone();
1001 state.current_mode_id = mode_id.clone();
1002 };
1003 let state = self.state.clone();
1004 cx.foreground_executor().spawn(async move {
1005 let result = connection
1006 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1007 .await;
1008
1009 if result.is_err() {
1010 state.borrow_mut().current_mode_id = old_mode_id;
1011 }
1012
1013 result?;
1014
1015 Ok(())
1016 })
1017 }
1018}
1019
1020struct AcpModelSelector {
1021 session_id: acp::SessionId,
1022 connection: Rc<acp::ClientSideConnection>,
1023 state: Rc<RefCell<acp::SessionModelState>>,
1024}
1025
1026impl AcpModelSelector {
1027 fn new(
1028 session_id: acp::SessionId,
1029 connection: Rc<acp::ClientSideConnection>,
1030 state: Rc<RefCell<acp::SessionModelState>>,
1031 ) -> Self {
1032 Self {
1033 session_id,
1034 connection,
1035 state,
1036 }
1037 }
1038}
1039
1040impl acp_thread::AgentModelSelector for AcpModelSelector {
1041 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1042 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1043 self.state
1044 .borrow()
1045 .available_models
1046 .clone()
1047 .into_iter()
1048 .map(acp_thread::AgentModelInfo::from)
1049 .collect(),
1050 )))
1051 }
1052
1053 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1054 let connection = self.connection.clone();
1055 let session_id = self.session_id.clone();
1056 let old_model_id;
1057 {
1058 let mut state = self.state.borrow_mut();
1059 old_model_id = state.current_model_id.clone();
1060 state.current_model_id = model_id.clone();
1061 };
1062 let state = self.state.clone();
1063 cx.foreground_executor().spawn(async move {
1064 let result = connection
1065 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1066 .await;
1067
1068 if result.is_err() {
1069 state.borrow_mut().current_model_id = old_model_id;
1070 }
1071
1072 result?;
1073
1074 Ok(())
1075 })
1076 }
1077
1078 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1079 let state = self.state.borrow();
1080 Task::ready(
1081 state
1082 .available_models
1083 .iter()
1084 .find(|m| m.model_id == state.current_model_id)
1085 .cloned()
1086 .map(acp_thread::AgentModelInfo::from)
1087 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1088 )
1089 }
1090}
1091
1092struct AcpSessionConfigOptions {
1093 session_id: acp::SessionId,
1094 connection: Rc<acp::ClientSideConnection>,
1095 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1096 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1097 watch_rx: watch::Receiver<()>,
1098}
1099
1100impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1101 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1102 self.state.borrow().clone()
1103 }
1104
1105 fn set_config_option(
1106 &self,
1107 config_id: acp::SessionConfigId,
1108 value: acp::SessionConfigValueId,
1109 cx: &mut App,
1110 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1111 let connection = self.connection.clone();
1112 let session_id = self.session_id.clone();
1113 let state = self.state.clone();
1114
1115 let watch_tx = self.watch_tx.clone();
1116
1117 cx.foreground_executor().spawn(async move {
1118 let response = connection
1119 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1120 session_id, config_id, value,
1121 ))
1122 .await?;
1123
1124 *state.borrow_mut() = response.config_options.clone();
1125 watch_tx.borrow_mut().send(()).ok();
1126 Ok(response.config_options)
1127 })
1128 }
1129
1130 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1131 Some(self.watch_rx.clone())
1132 }
1133}
1134
1135struct ClientDelegate {
1136 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1137 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1138 cx: AsyncApp,
1139}
1140
1141#[async_trait::async_trait(?Send)]
1142impl acp::Client for ClientDelegate {
1143 async fn request_permission(
1144 &self,
1145 arguments: acp::RequestPermissionRequest,
1146 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1147 let thread;
1148 {
1149 let sessions_ref = self.sessions.borrow();
1150 let session = sessions_ref
1151 .get(&arguments.session_id)
1152 .context("Failed to get session")?;
1153 thread = session.thread.clone();
1154 }
1155
1156 let cx = &mut self.cx.clone();
1157
1158 let task = thread.update(cx, |thread, cx| {
1159 thread.request_tool_call_authorization(
1160 arguments.tool_call,
1161 acp_thread::PermissionOptions::Flat(arguments.options),
1162 cx,
1163 )
1164 })??;
1165
1166 let outcome = task.await;
1167
1168 Ok(acp::RequestPermissionResponse::new(outcome))
1169 }
1170
1171 async fn write_text_file(
1172 &self,
1173 arguments: acp::WriteTextFileRequest,
1174 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1175 let cx = &mut self.cx.clone();
1176 let task = self
1177 .session_thread(&arguments.session_id)?
1178 .update(cx, |thread, cx| {
1179 thread.write_text_file(arguments.path, arguments.content, cx)
1180 })?;
1181
1182 task.await?;
1183
1184 Ok(Default::default())
1185 }
1186
1187 async fn read_text_file(
1188 &self,
1189 arguments: acp::ReadTextFileRequest,
1190 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1191 let task = self.session_thread(&arguments.session_id)?.update(
1192 &mut self.cx.clone(),
1193 |thread, cx| {
1194 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1195 },
1196 )?;
1197
1198 let content = task.await?;
1199
1200 Ok(acp::ReadTextFileResponse::new(content))
1201 }
1202
1203 async fn session_notification(
1204 &self,
1205 notification: acp::SessionNotification,
1206 ) -> Result<(), acp::Error> {
1207 let sessions = self.sessions.borrow();
1208 let session = sessions
1209 .get(¬ification.session_id)
1210 .context("Failed to get session")?;
1211
1212 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1213 current_mode_id,
1214 ..
1215 }) = ¬ification.update
1216 {
1217 if let Some(session_modes) = &session.session_modes {
1218 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1219 }
1220 }
1221
1222 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1223 config_options,
1224 ..
1225 }) = ¬ification.update
1226 {
1227 if let Some(opts) = &session.config_options {
1228 *opts.config_options.borrow_mut() = config_options.clone();
1229 opts.tx.borrow_mut().send(()).ok();
1230 }
1231 }
1232
1233 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1234 && let Some(session_list) = self.session_list.borrow().as_ref()
1235 {
1236 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1237 }
1238
1239 // Clone so we can inspect meta both before and after handing off to the thread
1240 let update_clone = notification.update.clone();
1241
1242 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1243 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1244 if let Some(meta) = &tc.meta {
1245 if let Some(terminal_info) = meta.get("terminal_info") {
1246 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1247 {
1248 let terminal_id = acp::TerminalId::new(id_str);
1249 let cwd = terminal_info
1250 .get("cwd")
1251 .and_then(|v| v.as_str().map(PathBuf::from));
1252
1253 // Create a minimal display-only lower-level terminal and register it.
1254 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1255 let builder = TerminalBuilder::new_display_only(
1256 CursorShape::default(),
1257 AlternateScroll::On,
1258 None,
1259 0,
1260 cx.background_executor(),
1261 thread.project().read(cx).path_style(cx),
1262 )?;
1263 let lower = cx.new(|cx| builder.subscribe(cx));
1264 thread.on_terminal_provider_event(
1265 TerminalProviderEvent::Created {
1266 terminal_id,
1267 label: tc.title.clone(),
1268 cwd,
1269 output_byte_limit: None,
1270 terminal: lower,
1271 },
1272 cx,
1273 );
1274 anyhow::Ok(())
1275 });
1276 }
1277 }
1278 }
1279 }
1280
1281 // Forward the update to the acp_thread as usual.
1282 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1283 thread.handle_session_update(notification.update.clone(), cx)
1284 })??;
1285
1286 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1287 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1288 if let Some(meta) = &tcu.meta {
1289 if let Some(term_out) = meta.get("terminal_output") {
1290 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1291 let terminal_id = acp::TerminalId::new(id_str);
1292 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1293 let data = s.as_bytes().to_vec();
1294 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1295 thread.on_terminal_provider_event(
1296 TerminalProviderEvent::Output { terminal_id, data },
1297 cx,
1298 );
1299 });
1300 }
1301 }
1302 }
1303
1304 // terminal_exit
1305 if let Some(term_exit) = meta.get("terminal_exit") {
1306 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1307 let terminal_id = acp::TerminalId::new(id_str);
1308 let status = acp::TerminalExitStatus::new()
1309 .exit_code(
1310 term_exit
1311 .get("exit_code")
1312 .and_then(|v| v.as_u64())
1313 .map(|i| i as u32),
1314 )
1315 .signal(
1316 term_exit
1317 .get("signal")
1318 .and_then(|v| v.as_str().map(|s| s.to_string())),
1319 );
1320
1321 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1322 thread.on_terminal_provider_event(
1323 TerminalProviderEvent::Exit {
1324 terminal_id,
1325 status,
1326 },
1327 cx,
1328 );
1329 });
1330 }
1331 }
1332 }
1333 }
1334
1335 Ok(())
1336 }
1337
1338 async fn create_terminal(
1339 &self,
1340 args: acp::CreateTerminalRequest,
1341 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1342 let thread = self.session_thread(&args.session_id)?;
1343 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1344
1345 let terminal_entity = acp_thread::create_terminal_entity(
1346 args.command.clone(),
1347 &args.args,
1348 args.env
1349 .into_iter()
1350 .map(|env| (env.name, env.value))
1351 .collect(),
1352 args.cwd.clone(),
1353 &project,
1354 &mut self.cx.clone(),
1355 )
1356 .await?;
1357
1358 // Register with renderer
1359 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1360 thread.register_terminal_created(
1361 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1362 format!("{} {}", args.command, args.args.join(" ")),
1363 args.cwd.clone(),
1364 args.output_byte_limit,
1365 terminal_entity,
1366 cx,
1367 )
1368 })?;
1369 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1370 Ok(acp::CreateTerminalResponse::new(terminal_id))
1371 }
1372
1373 async fn kill_terminal_command(
1374 &self,
1375 args: acp::KillTerminalCommandRequest,
1376 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1377 self.session_thread(&args.session_id)?
1378 .update(&mut self.cx.clone(), |thread, cx| {
1379 thread.kill_terminal(args.terminal_id, cx)
1380 })??;
1381
1382 Ok(Default::default())
1383 }
1384
1385 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1386 Err(acp::Error::method_not_found())
1387 }
1388
1389 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1390 Err(acp::Error::method_not_found())
1391 }
1392
1393 async fn release_terminal(
1394 &self,
1395 args: acp::ReleaseTerminalRequest,
1396 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1397 self.session_thread(&args.session_id)?
1398 .update(&mut self.cx.clone(), |thread, cx| {
1399 thread.release_terminal(args.terminal_id, cx)
1400 })??;
1401
1402 Ok(Default::default())
1403 }
1404
1405 async fn terminal_output(
1406 &self,
1407 args: acp::TerminalOutputRequest,
1408 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1409 self.session_thread(&args.session_id)?
1410 .read_with(&mut self.cx.clone(), |thread, cx| {
1411 let out = thread
1412 .terminal(args.terminal_id)?
1413 .read(cx)
1414 .current_output(cx);
1415
1416 Ok(out)
1417 })?
1418 }
1419
1420 async fn wait_for_terminal_exit(
1421 &self,
1422 args: acp::WaitForTerminalExitRequest,
1423 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1424 let exit_status = self
1425 .session_thread(&args.session_id)?
1426 .update(&mut self.cx.clone(), |thread, cx| {
1427 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1428 })??
1429 .await;
1430
1431 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1432 }
1433}
1434
1435impl ClientDelegate {
1436 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1437 let sessions = self.sessions.borrow();
1438 sessions
1439 .get(session_id)
1440 .context("Failed to get session")
1441 .map(|session| session.thread.clone())
1442 }
1443}