1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::Project;
13use project::agent_server_store::AgentServerCommand;
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::process::Child;
19
20use std::path::PathBuf;
21use std::process::Stdio;
22use std::{any::Any, cell::RefCell};
23use std::{path::Path, rc::Rc};
24use thiserror::Error;
25
26use anyhow::{Context as _, Result};
27use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
28
29use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
30use terminal::TerminalBuilder;
31use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
32
33#[derive(Debug, Error)]
34#[error("Unsupported version")]
35pub struct UnsupportedVersion;
36
37pub struct AcpConnection {
38 server_name: SharedString,
39 telemetry_id: SharedString,
40 connection: Rc<acp::ClientSideConnection>,
41 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
42 auth_methods: Vec<acp::AuthMethod>,
43 agent_capabilities: acp::AgentCapabilities,
44 default_mode: Option<acp::SessionModeId>,
45 default_model: Option<acp::ModelId>,
46 default_config_options: HashMap<String, String>,
47 root_dir: PathBuf,
48 child: Child,
49 session_list: Option<Rc<AcpSessionList>>,
50 _io_task: Task<Result<(), acp::Error>>,
51 _wait_task: Task<Result<()>>,
52 _stderr_task: Task<Result<()>>,
53}
54
55struct ConfigOptions {
56 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
57 tx: Rc<RefCell<watch::Sender<()>>>,
58 rx: watch::Receiver<()>,
59}
60
61impl ConfigOptions {
62 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
63 let (tx, rx) = watch::channel(());
64 Self {
65 config_options,
66 tx: Rc::new(RefCell::new(tx)),
67 rx,
68 }
69 }
70}
71
72pub struct AcpSession {
73 thread: WeakEntity<AcpThread>,
74 suppress_abort_err: bool,
75 models: Option<Rc<RefCell<acp::SessionModelState>>>,
76 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
77 config_options: Option<ConfigOptions>,
78}
79
80pub struct AcpSessionList {
81 connection: Rc<acp::ClientSideConnection>,
82 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
83 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
84}
85
86impl AcpSessionList {
87 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
88 let (tx, rx) = smol::channel::unbounded();
89 Self {
90 connection,
91 updates_tx: tx,
92 updates_rx: rx,
93 }
94 }
95
96 fn notify_update(&self) {
97 self.updates_tx
98 .try_send(acp_thread::SessionListUpdate::Refresh)
99 .log_err();
100 }
101
102 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
105 .log_err();
106 }
107}
108
109impl AgentSessionList for AcpSessionList {
110 fn list_sessions(
111 &self,
112 request: AgentSessionListRequest,
113 cx: &mut App,
114 ) -> Task<Result<AgentSessionListResponse>> {
115 let conn = self.connection.clone();
116 cx.foreground_executor().spawn(async move {
117 let acp_request = acp::ListSessionsRequest::new()
118 .cwd(request.cwd)
119 .cursor(request.cursor);
120 let response = conn.list_sessions(acp_request).await?;
121 Ok(AgentSessionListResponse {
122 sessions: response
123 .sessions
124 .into_iter()
125 .map(|s| AgentSessionInfo {
126 session_id: s.session_id,
127 cwd: Some(s.cwd),
128 title: s.title.map(Into::into),
129 updated_at: s.updated_at.and_then(|date_str| {
130 chrono::DateTime::parse_from_rfc3339(&date_str)
131 .ok()
132 .map(|dt| dt.with_timezone(&chrono::Utc))
133 }),
134 meta: s.meta,
135 })
136 .collect(),
137 next_cursor: response.next_cursor,
138 meta: response.meta,
139 })
140 })
141 }
142
143 fn watch(
144 &self,
145 _cx: &mut App,
146 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
147 Some(self.updates_rx.clone())
148 }
149
150 fn notify_refresh(&self) {
151 self.notify_update();
152 }
153
154 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
155 self
156 }
157}
158
159pub async fn connect(
160 server_name: SharedString,
161 command: AgentServerCommand,
162 root_dir: &Path,
163 default_mode: Option<acp::SessionModeId>,
164 default_model: Option<acp::ModelId>,
165 default_config_options: HashMap<String, String>,
166 is_remote: bool,
167 cx: &mut AsyncApp,
168) -> Result<Rc<dyn AgentConnection>> {
169 let conn = AcpConnection::stdio(
170 server_name,
171 command.clone(),
172 root_dir,
173 default_mode,
174 default_model,
175 default_config_options,
176 is_remote,
177 cx,
178 )
179 .await?;
180 Ok(Rc::new(conn) as _)
181}
182
183const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
184
185impl AcpConnection {
186 pub async fn stdio(
187 server_name: SharedString,
188 command: AgentServerCommand,
189 root_dir: &Path,
190 default_mode: Option<acp::SessionModeId>,
191 default_model: Option<acp::ModelId>,
192 default_config_options: HashMap<String, String>,
193 is_remote: bool,
194 cx: &mut AsyncApp,
195 ) -> Result<Self> {
196 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
197 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
198 let mut child =
199 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
200 child.envs(command.env.iter().flatten());
201 if !is_remote {
202 child.current_dir(root_dir);
203 }
204 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
205
206 let stdout = child.stdout.take().context("Failed to take stdout")?;
207 let stdin = child.stdin.take().context("Failed to take stdin")?;
208 let stderr = child.stderr.take().context("Failed to take stderr")?;
209 log::debug!(
210 "Spawning external agent server: {:?}, {:?}",
211 command.path,
212 command.args
213 );
214 log::trace!("Spawned (pid: {})", child.id());
215
216 let sessions = Rc::new(RefCell::new(HashMap::default()));
217
218 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
219 (
220 release_channel::ReleaseChannel::try_global(cx)
221 .map(|release_channel| release_channel.display_name()),
222 release_channel::AppVersion::global(cx).to_string(),
223 )
224 });
225
226 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
227 Rc::new(RefCell::new(None));
228
229 let client = ClientDelegate {
230 sessions: sessions.clone(),
231 session_list: client_session_list.clone(),
232 cx: cx.clone(),
233 };
234 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
235 let foreground_executor = cx.foreground_executor().clone();
236 move |fut| {
237 foreground_executor.spawn(fut).detach();
238 }
239 });
240
241 let io_task = cx.background_spawn(io_task);
242
243 let stderr_task = cx.background_spawn(async move {
244 let mut stderr = BufReader::new(stderr);
245 let mut line = String::new();
246 while let Ok(n) = stderr.read_line(&mut line).await
247 && n > 0
248 {
249 log::warn!("agent stderr: {}", line.trim());
250 line.clear();
251 }
252 Ok(())
253 });
254
255 let wait_task = cx.spawn({
256 let sessions = sessions.clone();
257 let status_fut = child.status();
258 async move |cx| {
259 let status = status_fut.await?;
260
261 for session in sessions.borrow().values() {
262 session
263 .thread
264 .update(cx, |thread, cx| {
265 thread.emit_load_error(LoadError::Exited { status }, cx)
266 })
267 .ok();
268 }
269
270 anyhow::Ok(())
271 }
272 });
273
274 let connection = Rc::new(connection);
275
276 cx.update(|cx| {
277 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
278 registry.set_active_connection(server_name.clone(), &connection, cx)
279 });
280 });
281
282 let response = connection
283 .initialize(
284 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
285 .client_capabilities(
286 acp::ClientCapabilities::new()
287 .fs(acp::FileSystemCapability::new()
288 .read_text_file(true)
289 .write_text_file(true))
290 .terminal(true)
291 // Experimental: Allow for rendering terminal output from the agents
292 .meta(acp::Meta::from_iter([
293 ("terminal_output".into(), true.into()),
294 ("terminal-auth".into(), true.into()),
295 ])),
296 )
297 .client_info(
298 acp::Implementation::new("zed", version)
299 .title(release_channel.map(ToOwned::to_owned)),
300 ),
301 )
302 .await?;
303
304 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
305 return Err(UnsupportedVersion.into());
306 }
307
308 let telemetry_id = response
309 .agent_info
310 // Use the one the agent provides if we have one
311 .map(|info| info.name.into())
312 // Otherwise, just use the name
313 .unwrap_or_else(|| server_name.clone());
314
315 let session_list = if response
316 .agent_capabilities
317 .session_capabilities
318 .list
319 .is_some()
320 {
321 let list = Rc::new(AcpSessionList::new(connection.clone()));
322 *client_session_list.borrow_mut() = Some(list.clone());
323 Some(list)
324 } else {
325 None
326 };
327
328 Ok(Self {
329 auth_methods: response.auth_methods,
330 root_dir: root_dir.to_owned(),
331 connection,
332 server_name,
333 telemetry_id,
334 sessions,
335 agent_capabilities: response.agent_capabilities,
336 default_mode,
337 default_model,
338 default_config_options,
339 session_list,
340 _io_task: io_task,
341 _wait_task: wait_task,
342 _stderr_task: stderr_task,
343 child,
344 })
345 }
346
347 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
348 &self.agent_capabilities.prompt_capabilities
349 }
350
351 pub fn root_dir(&self) -> &Path {
352 &self.root_dir
353 }
354}
355
356impl Drop for AcpConnection {
357 fn drop(&mut self) {
358 self.child.kill().log_err();
359 }
360}
361
362impl AgentConnection for AcpConnection {
363 fn telemetry_id(&self) -> SharedString {
364 self.telemetry_id.clone()
365 }
366
367 fn new_session(
368 self: Rc<Self>,
369 project: Entity<Project>,
370 cwd: &Path,
371 cx: &mut App,
372 ) -> Task<Result<Entity<AcpThread>>> {
373 let name = self.server_name.clone();
374 let cwd = cwd.to_path_buf();
375 let mcp_servers = mcp_servers_for_project(&project, cx);
376
377 cx.spawn(async move |cx| {
378 let response = self.connection
379 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
380 .await
381 .map_err(map_acp_error)?;
382
383 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
384
385 if let Some(default_mode) = self.default_mode.clone() {
386 if let Some(modes) = modes.as_ref() {
387 let mut modes_ref = modes.borrow_mut();
388 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
389
390 if has_mode {
391 let initial_mode_id = modes_ref.current_mode_id.clone();
392
393 cx.spawn({
394 let default_mode = default_mode.clone();
395 let session_id = response.session_id.clone();
396 let modes = modes.clone();
397 let conn = self.connection.clone();
398 async move |_| {
399 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
400 .await.log_err();
401
402 if result.is_none() {
403 modes.borrow_mut().current_mode_id = initial_mode_id;
404 }
405 }
406 }).detach();
407
408 modes_ref.current_mode_id = default_mode;
409 } else {
410 let available_modes = modes_ref
411 .available_modes
412 .iter()
413 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
414 .collect::<Vec<_>>()
415 .join("\n");
416
417 log::warn!(
418 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
419 );
420 }
421 }
422 }
423
424 if let Some(default_model) = self.default_model.clone() {
425 if let Some(models) = models.as_ref() {
426 let mut models_ref = models.borrow_mut();
427 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
428
429 if has_model {
430 let initial_model_id = models_ref.current_model_id.clone();
431
432 cx.spawn({
433 let default_model = default_model.clone();
434 let session_id = response.session_id.clone();
435 let models = models.clone();
436 let conn = self.connection.clone();
437 async move |_| {
438 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
439 .await.log_err();
440
441 if result.is_none() {
442 models.borrow_mut().current_model_id = initial_model_id;
443 }
444 }
445 }).detach();
446
447 models_ref.current_model_id = default_model;
448 } else {
449 let available_models = models_ref
450 .available_models
451 .iter()
452 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
453 .collect::<Vec<_>>()
454 .join("\n");
455
456 log::warn!(
457 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
458 );
459 }
460 }
461 }
462
463 if let Some(config_opts) = config_options.as_ref() {
464 let defaults_to_apply: Vec<_> = {
465 let config_opts_ref = config_opts.borrow();
466 config_opts_ref
467 .iter()
468 .filter_map(|config_option| {
469 let default_value = self.default_config_options.get(&*config_option.id.0)?;
470
471 let is_valid = match &config_option.kind {
472 acp::SessionConfigKind::Select(select) => match &select.options {
473 acp::SessionConfigSelectOptions::Ungrouped(options) => {
474 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
475 }
476 acp::SessionConfigSelectOptions::Grouped(groups) => groups
477 .iter()
478 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
479 _ => false,
480 },
481 _ => false,
482 };
483
484 if is_valid {
485 let initial_value = match &config_option.kind {
486 acp::SessionConfigKind::Select(select) => {
487 Some(select.current_value.clone())
488 }
489 _ => None,
490 };
491 Some((config_option.id.clone(), default_value.clone(), initial_value))
492 } else {
493 log::warn!(
494 "`{}` is not a valid value for config option `{}` in {}",
495 default_value,
496 config_option.id.0,
497 name
498 );
499 None
500 }
501 })
502 .collect()
503 };
504
505 for (config_id, default_value, initial_value) in defaults_to_apply {
506 cx.spawn({
507 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
508 let session_id = response.session_id.clone();
509 let config_id_clone = config_id.clone();
510 let config_opts = config_opts.clone();
511 let conn = self.connection.clone();
512 async move |_| {
513 let result = conn
514 .set_session_config_option(
515 acp::SetSessionConfigOptionRequest::new(
516 session_id,
517 config_id_clone.clone(),
518 default_value_id,
519 ),
520 )
521 .await
522 .log_err();
523
524 if result.is_none() {
525 if let Some(initial) = initial_value {
526 let mut opts = config_opts.borrow_mut();
527 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
528 if let acp::SessionConfigKind::Select(select) =
529 &mut opt.kind
530 {
531 select.current_value = initial;
532 }
533 }
534 }
535 }
536 }
537 })
538 .detach();
539
540 let mut opts = config_opts.borrow_mut();
541 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
542 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
543 select.current_value = acp::SessionConfigValueId::new(default_value);
544 }
545 }
546 }
547 }
548
549 let action_log = cx.new(|_| ActionLog::new(project.clone()));
550 let thread: Entity<AcpThread> = cx.new(|cx| {
551 AcpThread::new(
552 None,
553 self.server_name.clone(),
554 self.clone(),
555 project,
556 action_log,
557 response.session_id.clone(),
558 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
559 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
560 cx,
561 )
562 });
563
564 self.sessions.borrow_mut().insert(
565 response.session_id,
566 AcpSession {
567 thread: thread.downgrade(),
568 suppress_abort_err: false,
569 session_modes: modes,
570 models,
571 config_options: config_options.map(ConfigOptions::new),
572 },
573 );
574
575 Ok(thread)
576 })
577 }
578
579 fn supports_load_session(&self) -> bool {
580 self.agent_capabilities.load_session
581 }
582
583 fn supports_resume_session(&self) -> bool {
584 self.agent_capabilities
585 .session_capabilities
586 .resume
587 .is_some()
588 }
589
590 fn load_session(
591 self: Rc<Self>,
592 session: AgentSessionInfo,
593 project: Entity<Project>,
594 cwd: &Path,
595 cx: &mut App,
596 ) -> Task<Result<Entity<AcpThread>>> {
597 if !self.agent_capabilities.load_session {
598 return Task::ready(Err(anyhow!(LoadError::Other(
599 "Loading sessions is not supported by this agent.".into()
600 ))));
601 }
602
603 let cwd = cwd.to_path_buf();
604 let mcp_servers = mcp_servers_for_project(&project, cx);
605 let action_log = cx.new(|_| ActionLog::new(project.clone()));
606 let thread: Entity<AcpThread> = cx.new(|cx| {
607 AcpThread::new(
608 None,
609 self.server_name.clone(),
610 self.clone(),
611 project,
612 action_log,
613 session.session_id.clone(),
614 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
615 cx,
616 )
617 });
618
619 self.sessions.borrow_mut().insert(
620 session.session_id.clone(),
621 AcpSession {
622 thread: thread.downgrade(),
623 suppress_abort_err: false,
624 session_modes: None,
625 models: None,
626 config_options: None,
627 },
628 );
629
630 cx.spawn(async move |_| {
631 let response = match self
632 .connection
633 .load_session(
634 acp::LoadSessionRequest::new(session.session_id.clone(), cwd)
635 .mcp_servers(mcp_servers),
636 )
637 .await
638 {
639 Ok(response) => response,
640 Err(err) => {
641 self.sessions.borrow_mut().remove(&session.session_id);
642 return Err(map_acp_error(err));
643 }
644 };
645
646 let (modes, models, config_options) =
647 config_state(response.modes, response.models, response.config_options);
648 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
649 session.session_modes = modes;
650 session.models = models;
651 session.config_options = config_options.map(ConfigOptions::new);
652 }
653
654 Ok(thread)
655 })
656 }
657
658 fn resume_session(
659 self: Rc<Self>,
660 session: AgentSessionInfo,
661 project: Entity<Project>,
662 cwd: &Path,
663 cx: &mut App,
664 ) -> Task<Result<Entity<AcpThread>>> {
665 if self
666 .agent_capabilities
667 .session_capabilities
668 .resume
669 .is_none()
670 {
671 return Task::ready(Err(anyhow!(LoadError::Other(
672 "Resuming sessions is not supported by this agent.".into()
673 ))));
674 }
675
676 let cwd = cwd.to_path_buf();
677 let mcp_servers = mcp_servers_for_project(&project, cx);
678 let action_log = cx.new(|_| ActionLog::new(project.clone()));
679 let thread: Entity<AcpThread> = cx.new(|cx| {
680 AcpThread::new(
681 None,
682 self.server_name.clone(),
683 self.clone(),
684 project,
685 action_log,
686 session.session_id.clone(),
687 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
688 cx,
689 )
690 });
691
692 self.sessions.borrow_mut().insert(
693 session.session_id.clone(),
694 AcpSession {
695 thread: thread.downgrade(),
696 suppress_abort_err: false,
697 session_modes: None,
698 models: None,
699 config_options: None,
700 },
701 );
702
703 cx.spawn(async move |_| {
704 let response = match self
705 .connection
706 .resume_session(
707 acp::ResumeSessionRequest::new(session.session_id.clone(), cwd)
708 .mcp_servers(mcp_servers),
709 )
710 .await
711 {
712 Ok(response) => response,
713 Err(err) => {
714 self.sessions.borrow_mut().remove(&session.session_id);
715 return Err(map_acp_error(err));
716 }
717 };
718
719 let (modes, models, config_options) =
720 config_state(response.modes, response.models, response.config_options);
721 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
722 session.session_modes = modes;
723 session.models = models;
724 session.config_options = config_options.map(ConfigOptions::new);
725 }
726
727 Ok(thread)
728 })
729 }
730
731 fn auth_methods(&self) -> &[acp::AuthMethod] {
732 &self.auth_methods
733 }
734
735 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
736 let conn = self.connection.clone();
737 cx.foreground_executor().spawn(async move {
738 conn.authenticate(acp::AuthenticateRequest::new(method_id))
739 .await?;
740 Ok(())
741 })
742 }
743
744 fn prompt(
745 &self,
746 _id: Option<acp_thread::UserMessageId>,
747 params: acp::PromptRequest,
748 cx: &mut App,
749 ) -> Task<Result<acp::PromptResponse>> {
750 let conn = self.connection.clone();
751 let sessions = self.sessions.clone();
752 let session_id = params.session_id.clone();
753 cx.foreground_executor().spawn(async move {
754 let result = conn.prompt(params).await;
755
756 let mut suppress_abort_err = false;
757
758 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
759 suppress_abort_err = session.suppress_abort_err;
760 session.suppress_abort_err = false;
761 }
762
763 match result {
764 Ok(response) => Ok(response),
765 Err(err) => {
766 if err.code == acp::ErrorCode::AuthRequired {
767 return Err(anyhow!(acp::Error::auth_required()));
768 }
769
770 if err.code != ErrorCode::InternalError {
771 anyhow::bail!(err)
772 }
773
774 let Some(data) = &err.data else {
775 anyhow::bail!(err)
776 };
777
778 // Temporary workaround until the following PR is generally available:
779 // https://github.com/google-gemini/gemini-cli/pull/6656
780
781 #[derive(Deserialize)]
782 #[serde(deny_unknown_fields)]
783 struct ErrorDetails {
784 details: Box<str>,
785 }
786
787 match serde_json::from_value(data.clone()) {
788 Ok(ErrorDetails { details }) => {
789 if suppress_abort_err
790 && (details.contains("This operation was aborted")
791 || details.contains("The user aborted a request"))
792 {
793 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
794 } else {
795 Err(anyhow!(details))
796 }
797 }
798 Err(_) => Err(anyhow!(err)),
799 }
800 }
801 }
802 })
803 }
804
805 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
806 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
807 session.suppress_abort_err = true;
808 }
809 let conn = self.connection.clone();
810 let params = acp::CancelNotification::new(session_id.clone());
811 cx.foreground_executor()
812 .spawn(async move { conn.cancel(params).await })
813 .detach();
814 }
815
816 fn session_modes(
817 &self,
818 session_id: &acp::SessionId,
819 _cx: &App,
820 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
821 let sessions = self.sessions.clone();
822 let sessions_ref = sessions.borrow();
823 let Some(session) = sessions_ref.get(session_id) else {
824 return None;
825 };
826
827 if let Some(modes) = session.session_modes.as_ref() {
828 Some(Rc::new(AcpSessionModes {
829 connection: self.connection.clone(),
830 session_id: session_id.clone(),
831 state: modes.clone(),
832 }) as _)
833 } else {
834 None
835 }
836 }
837
838 fn model_selector(
839 &self,
840 session_id: &acp::SessionId,
841 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
842 let sessions = self.sessions.clone();
843 let sessions_ref = sessions.borrow();
844 let Some(session) = sessions_ref.get(session_id) else {
845 return None;
846 };
847
848 if let Some(models) = session.models.as_ref() {
849 Some(Rc::new(AcpModelSelector::new(
850 session_id.clone(),
851 self.connection.clone(),
852 models.clone(),
853 )) as _)
854 } else {
855 None
856 }
857 }
858
859 fn session_config_options(
860 &self,
861 session_id: &acp::SessionId,
862 _cx: &App,
863 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
864 let sessions = self.sessions.borrow();
865 let session = sessions.get(session_id)?;
866
867 let config_opts = session.config_options.as_ref()?;
868
869 Some(Rc::new(AcpSessionConfigOptions {
870 session_id: session_id.clone(),
871 connection: self.connection.clone(),
872 state: config_opts.config_options.clone(),
873 watch_tx: config_opts.tx.clone(),
874 watch_rx: config_opts.rx.clone(),
875 }) as _)
876 }
877
878 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
879 self.session_list.clone().map(|s| s as _)
880 }
881
882 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
883 self
884 }
885}
886
887fn map_acp_error(err: acp::Error) -> anyhow::Error {
888 if err.code == acp::ErrorCode::AuthRequired {
889 let mut error = AuthRequired::new();
890
891 if err.message != acp::ErrorCode::AuthRequired.to_string() {
892 error = error.with_description(err.message);
893 }
894
895 anyhow!(error)
896 } else {
897 anyhow!(err)
898 }
899}
900
901fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
902 let context_server_store = project.read(cx).context_server_store().read(cx);
903 let is_local = project.read(cx).is_local();
904 context_server_store
905 .configured_server_ids()
906 .iter()
907 .filter_map(|id| {
908 let configuration = context_server_store.configuration_for_server(id)?;
909 match &*configuration {
910 project::context_server_store::ContextServerConfiguration::Custom {
911 command,
912 remote,
913 ..
914 }
915 | project::context_server_store::ContextServerConfiguration::Extension {
916 command,
917 remote,
918 ..
919 } if is_local || *remote => Some(acp::McpServer::Stdio(
920 acp::McpServerStdio::new(id.0.to_string(), &command.path)
921 .args(command.args.clone())
922 .env(if let Some(env) = command.env.as_ref() {
923 env.iter()
924 .map(|(name, value)| acp::EnvVariable::new(name, value))
925 .collect()
926 } else {
927 vec![]
928 }),
929 )),
930 project::context_server_store::ContextServerConfiguration::Http {
931 url,
932 headers,
933 timeout: _,
934 } => Some(acp::McpServer::Http(
935 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
936 headers
937 .iter()
938 .map(|(name, value)| acp::HttpHeader::new(name, value))
939 .collect(),
940 ),
941 )),
942 _ => None,
943 }
944 })
945 .collect()
946}
947
948fn config_state(
949 modes: Option<acp::SessionModeState>,
950 models: Option<acp::SessionModelState>,
951 config_options: Option<Vec<acp::SessionConfigOption>>,
952) -> (
953 Option<Rc<RefCell<acp::SessionModeState>>>,
954 Option<Rc<RefCell<acp::SessionModelState>>>,
955 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
956) {
957 if let Some(opts) = config_options {
958 return (None, None, Some(Rc::new(RefCell::new(opts))));
959 }
960
961 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
962 let models = models.map(|models| Rc::new(RefCell::new(models)));
963 (modes, models, None)
964}
965
966struct AcpSessionModes {
967 session_id: acp::SessionId,
968 connection: Rc<acp::ClientSideConnection>,
969 state: Rc<RefCell<acp::SessionModeState>>,
970}
971
972impl acp_thread::AgentSessionModes for AcpSessionModes {
973 fn current_mode(&self) -> acp::SessionModeId {
974 self.state.borrow().current_mode_id.clone()
975 }
976
977 fn all_modes(&self) -> Vec<acp::SessionMode> {
978 self.state.borrow().available_modes.clone()
979 }
980
981 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
982 let connection = self.connection.clone();
983 let session_id = self.session_id.clone();
984 let old_mode_id;
985 {
986 let mut state = self.state.borrow_mut();
987 old_mode_id = state.current_mode_id.clone();
988 state.current_mode_id = mode_id.clone();
989 };
990 let state = self.state.clone();
991 cx.foreground_executor().spawn(async move {
992 let result = connection
993 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
994 .await;
995
996 if result.is_err() {
997 state.borrow_mut().current_mode_id = old_mode_id;
998 }
999
1000 result?;
1001
1002 Ok(())
1003 })
1004 }
1005}
1006
1007struct AcpModelSelector {
1008 session_id: acp::SessionId,
1009 connection: Rc<acp::ClientSideConnection>,
1010 state: Rc<RefCell<acp::SessionModelState>>,
1011}
1012
1013impl AcpModelSelector {
1014 fn new(
1015 session_id: acp::SessionId,
1016 connection: Rc<acp::ClientSideConnection>,
1017 state: Rc<RefCell<acp::SessionModelState>>,
1018 ) -> Self {
1019 Self {
1020 session_id,
1021 connection,
1022 state,
1023 }
1024 }
1025}
1026
1027impl acp_thread::AgentModelSelector for AcpModelSelector {
1028 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1029 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1030 self.state
1031 .borrow()
1032 .available_models
1033 .clone()
1034 .into_iter()
1035 .map(acp_thread::AgentModelInfo::from)
1036 .collect(),
1037 )))
1038 }
1039
1040 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1041 let connection = self.connection.clone();
1042 let session_id = self.session_id.clone();
1043 let old_model_id;
1044 {
1045 let mut state = self.state.borrow_mut();
1046 old_model_id = state.current_model_id.clone();
1047 state.current_model_id = model_id.clone();
1048 };
1049 let state = self.state.clone();
1050 cx.foreground_executor().spawn(async move {
1051 let result = connection
1052 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1053 .await;
1054
1055 if result.is_err() {
1056 state.borrow_mut().current_model_id = old_model_id;
1057 }
1058
1059 result?;
1060
1061 Ok(())
1062 })
1063 }
1064
1065 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1066 let state = self.state.borrow();
1067 Task::ready(
1068 state
1069 .available_models
1070 .iter()
1071 .find(|m| m.model_id == state.current_model_id)
1072 .cloned()
1073 .map(acp_thread::AgentModelInfo::from)
1074 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1075 )
1076 }
1077}
1078
1079struct AcpSessionConfigOptions {
1080 session_id: acp::SessionId,
1081 connection: Rc<acp::ClientSideConnection>,
1082 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1083 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1084 watch_rx: watch::Receiver<()>,
1085}
1086
1087impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1088 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1089 self.state.borrow().clone()
1090 }
1091
1092 fn set_config_option(
1093 &self,
1094 config_id: acp::SessionConfigId,
1095 value: acp::SessionConfigValueId,
1096 cx: &mut App,
1097 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1098 let connection = self.connection.clone();
1099 let session_id = self.session_id.clone();
1100 let state = self.state.clone();
1101
1102 let watch_tx = self.watch_tx.clone();
1103
1104 cx.foreground_executor().spawn(async move {
1105 let response = connection
1106 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1107 session_id, config_id, value,
1108 ))
1109 .await?;
1110
1111 *state.borrow_mut() = response.config_options.clone();
1112 watch_tx.borrow_mut().send(()).ok();
1113 Ok(response.config_options)
1114 })
1115 }
1116
1117 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1118 Some(self.watch_rx.clone())
1119 }
1120}
1121
1122struct ClientDelegate {
1123 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1124 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1125 cx: AsyncApp,
1126}
1127
1128#[async_trait::async_trait(?Send)]
1129impl acp::Client for ClientDelegate {
1130 async fn request_permission(
1131 &self,
1132 arguments: acp::RequestPermissionRequest,
1133 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1134 let thread;
1135 {
1136 let sessions_ref = self.sessions.borrow();
1137 let session = sessions_ref
1138 .get(&arguments.session_id)
1139 .context("Failed to get session")?;
1140 thread = session.thread.clone();
1141 }
1142
1143 let cx = &mut self.cx.clone();
1144
1145 let task = thread.update(cx, |thread, cx| {
1146 thread.request_tool_call_authorization(
1147 arguments.tool_call,
1148 acp_thread::PermissionOptions::Flat(arguments.options),
1149 cx,
1150 )
1151 })??;
1152
1153 let outcome = task.await;
1154
1155 Ok(acp::RequestPermissionResponse::new(outcome))
1156 }
1157
1158 async fn write_text_file(
1159 &self,
1160 arguments: acp::WriteTextFileRequest,
1161 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1162 let cx = &mut self.cx.clone();
1163 let task = self
1164 .session_thread(&arguments.session_id)?
1165 .update(cx, |thread, cx| {
1166 thread.write_text_file(arguments.path, arguments.content, cx)
1167 })?;
1168
1169 task.await?;
1170
1171 Ok(Default::default())
1172 }
1173
1174 async fn read_text_file(
1175 &self,
1176 arguments: acp::ReadTextFileRequest,
1177 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1178 let task = self.session_thread(&arguments.session_id)?.update(
1179 &mut self.cx.clone(),
1180 |thread, cx| {
1181 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1182 },
1183 )?;
1184
1185 let content = task.await?;
1186
1187 Ok(acp::ReadTextFileResponse::new(content))
1188 }
1189
1190 async fn session_notification(
1191 &self,
1192 notification: acp::SessionNotification,
1193 ) -> Result<(), acp::Error> {
1194 let sessions = self.sessions.borrow();
1195 let session = sessions
1196 .get(¬ification.session_id)
1197 .context("Failed to get session")?;
1198
1199 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1200 current_mode_id,
1201 ..
1202 }) = ¬ification.update
1203 {
1204 if let Some(session_modes) = &session.session_modes {
1205 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1206 }
1207 }
1208
1209 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1210 config_options,
1211 ..
1212 }) = ¬ification.update
1213 {
1214 if let Some(opts) = &session.config_options {
1215 *opts.config_options.borrow_mut() = config_options.clone();
1216 opts.tx.borrow_mut().send(()).ok();
1217 }
1218 }
1219
1220 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1221 && let Some(session_list) = self.session_list.borrow().as_ref()
1222 {
1223 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1224 }
1225
1226 // Clone so we can inspect meta both before and after handing off to the thread
1227 let update_clone = notification.update.clone();
1228
1229 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1230 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1231 if let Some(meta) = &tc.meta {
1232 if let Some(terminal_info) = meta.get("terminal_info") {
1233 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1234 {
1235 let terminal_id = acp::TerminalId::new(id_str);
1236 let cwd = terminal_info
1237 .get("cwd")
1238 .and_then(|v| v.as_str().map(PathBuf::from));
1239
1240 // Create a minimal display-only lower-level terminal and register it.
1241 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1242 let builder = TerminalBuilder::new_display_only(
1243 CursorShape::default(),
1244 AlternateScroll::On,
1245 None,
1246 0,
1247 cx.background_executor(),
1248 thread.project().read(cx).path_style(cx),
1249 )?;
1250 let lower = cx.new(|cx| builder.subscribe(cx));
1251 thread.on_terminal_provider_event(
1252 TerminalProviderEvent::Created {
1253 terminal_id,
1254 label: tc.title.clone(),
1255 cwd,
1256 output_byte_limit: None,
1257 terminal: lower,
1258 },
1259 cx,
1260 );
1261 anyhow::Ok(())
1262 });
1263 }
1264 }
1265 }
1266 }
1267
1268 // Forward the update to the acp_thread as usual.
1269 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1270 thread.handle_session_update(notification.update.clone(), cx)
1271 })??;
1272
1273 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1274 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1275 if let Some(meta) = &tcu.meta {
1276 if let Some(term_out) = meta.get("terminal_output") {
1277 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1278 let terminal_id = acp::TerminalId::new(id_str);
1279 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1280 let data = s.as_bytes().to_vec();
1281 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1282 thread.on_terminal_provider_event(
1283 TerminalProviderEvent::Output { terminal_id, data },
1284 cx,
1285 );
1286 });
1287 }
1288 }
1289 }
1290
1291 // terminal_exit
1292 if let Some(term_exit) = meta.get("terminal_exit") {
1293 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1294 let terminal_id = acp::TerminalId::new(id_str);
1295 let status = acp::TerminalExitStatus::new()
1296 .exit_code(
1297 term_exit
1298 .get("exit_code")
1299 .and_then(|v| v.as_u64())
1300 .map(|i| i as u32),
1301 )
1302 .signal(
1303 term_exit
1304 .get("signal")
1305 .and_then(|v| v.as_str().map(|s| s.to_string())),
1306 );
1307
1308 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1309 thread.on_terminal_provider_event(
1310 TerminalProviderEvent::Exit {
1311 terminal_id,
1312 status,
1313 },
1314 cx,
1315 );
1316 });
1317 }
1318 }
1319 }
1320 }
1321
1322 Ok(())
1323 }
1324
1325 async fn create_terminal(
1326 &self,
1327 args: acp::CreateTerminalRequest,
1328 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1329 let thread = self.session_thread(&args.session_id)?;
1330 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1331
1332 let terminal_entity = acp_thread::create_terminal_entity(
1333 args.command.clone(),
1334 &args.args,
1335 args.env
1336 .into_iter()
1337 .map(|env| (env.name, env.value))
1338 .collect(),
1339 args.cwd.clone(),
1340 &project,
1341 &mut self.cx.clone(),
1342 )
1343 .await?;
1344
1345 // Register with renderer
1346 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1347 thread.register_terminal_created(
1348 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1349 format!("{} {}", args.command, args.args.join(" ")),
1350 args.cwd.clone(),
1351 args.output_byte_limit,
1352 terminal_entity,
1353 cx,
1354 )
1355 })?;
1356 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1357 Ok(acp::CreateTerminalResponse::new(terminal_id))
1358 }
1359
1360 async fn kill_terminal_command(
1361 &self,
1362 args: acp::KillTerminalCommandRequest,
1363 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1364 self.session_thread(&args.session_id)?
1365 .update(&mut self.cx.clone(), |thread, cx| {
1366 thread.kill_terminal(args.terminal_id, cx)
1367 })??;
1368
1369 Ok(Default::default())
1370 }
1371
1372 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1373 Err(acp::Error::method_not_found())
1374 }
1375
1376 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1377 Err(acp::Error::method_not_found())
1378 }
1379
1380 async fn release_terminal(
1381 &self,
1382 args: acp::ReleaseTerminalRequest,
1383 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1384 self.session_thread(&args.session_id)?
1385 .update(&mut self.cx.clone(), |thread, cx| {
1386 thread.release_terminal(args.terminal_id, cx)
1387 })??;
1388
1389 Ok(Default::default())
1390 }
1391
1392 async fn terminal_output(
1393 &self,
1394 args: acp::TerminalOutputRequest,
1395 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1396 self.session_thread(&args.session_id)?
1397 .read_with(&mut self.cx.clone(), |thread, cx| {
1398 let out = thread
1399 .terminal(args.terminal_id)?
1400 .read(cx)
1401 .current_output(cx);
1402
1403 Ok(out)
1404 })?
1405 }
1406
1407 async fn wait_for_terminal_exit(
1408 &self,
1409 args: acp::WaitForTerminalExitRequest,
1410 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1411 let exit_status = self
1412 .session_thread(&args.session_id)?
1413 .update(&mut self.cx.clone(), |thread, cx| {
1414 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1415 })??
1416 .await;
1417
1418 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1419 }
1420}
1421
1422impl ClientDelegate {
1423 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1424 let sessions = self.sessions.borrow();
1425 sessions
1426 .get(session_id)
1427 .context("Failed to get session")
1428 .map(|session| session.thread.clone())
1429 }
1430}