1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::Project;
13use project::agent_server_store::AgentServerCommand;
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::process::Child;
19
20use std::path::PathBuf;
21use std::process::Stdio;
22use std::{any::Any, cell::RefCell};
23use std::{path::Path, rc::Rc};
24use thiserror::Error;
25
26use anyhow::{Context as _, Result};
27use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
28
29use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
30use terminal::TerminalBuilder;
31use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
32
33#[derive(Debug, Error)]
34#[error("Unsupported version")]
35pub struct UnsupportedVersion;
36
37pub struct AcpConnection {
38 server_name: SharedString,
39 display_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 child: Child,
49 session_list: Option<Rc<AcpSessionList>>,
50 _io_task: Task<Result<(), acp::Error>>,
51 _wait_task: Task<Result<()>>,
52 _stderr_task: Task<Result<()>>,
53}
54
55struct ConfigOptions {
56 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
57 tx: Rc<RefCell<watch::Sender<()>>>,
58 rx: watch::Receiver<()>,
59}
60
61impl ConfigOptions {
62 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
63 let (tx, rx) = watch::channel(());
64 Self {
65 config_options,
66 tx: Rc::new(RefCell::new(tx)),
67 rx,
68 }
69 }
70}
71
72pub struct AcpSession {
73 thread: WeakEntity<AcpThread>,
74 suppress_abort_err: bool,
75 models: Option<Rc<RefCell<acp::SessionModelState>>>,
76 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
77 config_options: Option<ConfigOptions>,
78}
79
80pub struct AcpSessionList {
81 connection: Rc<acp::ClientSideConnection>,
82 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
83 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
84}
85
86impl AcpSessionList {
87 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
88 let (tx, rx) = smol::channel::unbounded();
89 Self {
90 connection,
91 updates_tx: tx,
92 updates_rx: rx,
93 }
94 }
95
96 fn notify_update(&self) {
97 self.updates_tx
98 .try_send(acp_thread::SessionListUpdate::Refresh)
99 .log_err();
100 }
101
102 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
105 .log_err();
106 }
107}
108
109impl AgentSessionList for AcpSessionList {
110 fn list_sessions(
111 &self,
112 request: AgentSessionListRequest,
113 cx: &mut App,
114 ) -> Task<Result<AgentSessionListResponse>> {
115 let conn = self.connection.clone();
116 cx.foreground_executor().spawn(async move {
117 let acp_request = acp::ListSessionsRequest::new()
118 .cwd(request.cwd)
119 .cursor(request.cursor);
120 let response = conn.list_sessions(acp_request).await?;
121 Ok(AgentSessionListResponse {
122 sessions: response
123 .sessions
124 .into_iter()
125 .map(|s| AgentSessionInfo {
126 session_id: s.session_id,
127 cwd: Some(s.cwd),
128 title: s.title.map(Into::into),
129 updated_at: s.updated_at.and_then(|date_str| {
130 chrono::DateTime::parse_from_rfc3339(&date_str)
131 .ok()
132 .map(|dt| dt.with_timezone(&chrono::Utc))
133 }),
134 meta: s.meta,
135 })
136 .collect(),
137 next_cursor: response.next_cursor,
138 meta: response.meta,
139 })
140 })
141 }
142
143 fn watch(
144 &self,
145 _cx: &mut App,
146 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
147 Some(self.updates_rx.clone())
148 }
149
150 fn notify_refresh(&self) {
151 self.notify_update();
152 }
153
154 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
155 self
156 }
157}
158
159pub async fn connect(
160 server_name: SharedString,
161 display_name: SharedString,
162 command: AgentServerCommand,
163 default_mode: Option<acp::SessionModeId>,
164 default_model: Option<acp::ModelId>,
165 default_config_options: HashMap<String, String>,
166 cx: &mut AsyncApp,
167) -> Result<Rc<dyn AgentConnection>> {
168 let conn = AcpConnection::stdio(
169 server_name,
170 display_name,
171 command.clone(),
172 default_mode,
173 default_model,
174 default_config_options,
175 cx,
176 )
177 .await?;
178 Ok(Rc::new(conn) as _)
179}
180
181const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
182
183impl AcpConnection {
184 pub async fn stdio(
185 server_name: SharedString,
186 display_name: SharedString,
187 command: AgentServerCommand,
188 default_mode: Option<acp::SessionModeId>,
189 default_model: Option<acp::ModelId>,
190 default_config_options: HashMap<String, String>,
191 cx: &mut AsyncApp,
192 ) -> Result<Self> {
193 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
194 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
195 let mut child =
196 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
197 child.envs(command.env.iter().flatten());
198 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
199
200 let stdout = child.stdout.take().context("Failed to take stdout")?;
201 let stdin = child.stdin.take().context("Failed to take stdin")?;
202 let stderr = child.stderr.take().context("Failed to take stderr")?;
203 log::debug!(
204 "Spawning external agent server: {:?}, {:?}",
205 command.path,
206 command.args
207 );
208 log::trace!("Spawned (pid: {})", child.id());
209
210 let sessions = Rc::new(RefCell::new(HashMap::default()));
211
212 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
213 (
214 release_channel::ReleaseChannel::try_global(cx)
215 .map(|release_channel| release_channel.display_name()),
216 release_channel::AppVersion::global(cx).to_string(),
217 )
218 });
219
220 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
221 Rc::new(RefCell::new(None));
222
223 let client = ClientDelegate {
224 sessions: sessions.clone(),
225 session_list: client_session_list.clone(),
226 cx: cx.clone(),
227 };
228 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
229 let foreground_executor = cx.foreground_executor().clone();
230 move |fut| {
231 foreground_executor.spawn(fut).detach();
232 }
233 });
234
235 let io_task = cx.background_spawn(io_task);
236
237 let stderr_task = cx.background_spawn(async move {
238 let mut stderr = BufReader::new(stderr);
239 let mut line = String::new();
240 while let Ok(n) = stderr.read_line(&mut line).await
241 && n > 0
242 {
243 log::warn!("agent stderr: {}", line.trim());
244 line.clear();
245 }
246 Ok(())
247 });
248
249 let wait_task = cx.spawn({
250 let sessions = sessions.clone();
251 let status_fut = child.status();
252 async move |cx| {
253 let status = status_fut.await?;
254
255 for session in sessions.borrow().values() {
256 session
257 .thread
258 .update(cx, |thread, cx| {
259 thread.emit_load_error(LoadError::Exited { status }, cx)
260 })
261 .ok();
262 }
263
264 anyhow::Ok(())
265 }
266 });
267
268 let connection = Rc::new(connection);
269
270 cx.update(|cx| {
271 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
272 registry.set_active_connection(server_name.clone(), &connection, cx)
273 });
274 });
275
276 let response = connection
277 .initialize(
278 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
279 .client_capabilities(
280 acp::ClientCapabilities::new()
281 .fs(acp::FileSystemCapability::new()
282 .read_text_file(true)
283 .write_text_file(true))
284 .terminal(true)
285 // Experimental: Allow for rendering terminal output from the agents
286 .meta(acp::Meta::from_iter([
287 ("terminal_output".into(), true.into()),
288 ("terminal-auth".into(), true.into()),
289 ])),
290 )
291 .client_info(
292 acp::Implementation::new("zed", version)
293 .title(release_channel.map(ToOwned::to_owned)),
294 ),
295 )
296 .await?;
297
298 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
299 return Err(UnsupportedVersion.into());
300 }
301
302 let telemetry_id = response
303 .agent_info
304 // Use the one the agent provides if we have one
305 .map(|info| info.name.into())
306 // Otherwise, just use the name
307 .unwrap_or_else(|| server_name.clone());
308
309 let session_list = if response
310 .agent_capabilities
311 .session_capabilities
312 .list
313 .is_some()
314 {
315 let list = Rc::new(AcpSessionList::new(connection.clone()));
316 *client_session_list.borrow_mut() = Some(list.clone());
317 Some(list)
318 } else {
319 None
320 };
321
322 Ok(Self {
323 auth_methods: response.auth_methods,
324 connection,
325 server_name,
326 display_name,
327 telemetry_id,
328 sessions,
329 agent_capabilities: response.agent_capabilities,
330 default_mode,
331 default_model,
332 default_config_options,
333 session_list,
334 _io_task: io_task,
335 _wait_task: wait_task,
336 _stderr_task: stderr_task,
337 child,
338 })
339 }
340
341 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
342 &self.agent_capabilities.prompt_capabilities
343 }
344}
345
346impl Drop for AcpConnection {
347 fn drop(&mut self) {
348 self.child.kill().log_err();
349 }
350}
351
352impl AgentConnection for AcpConnection {
353 fn telemetry_id(&self) -> SharedString {
354 self.telemetry_id.clone()
355 }
356
357 fn new_session(
358 self: Rc<Self>,
359 project: Entity<Project>,
360 cwd: &Path,
361 cx: &mut App,
362 ) -> Task<Result<Entity<AcpThread>>> {
363 let name = self.server_name.clone();
364 let cwd = cwd.to_path_buf();
365 let mcp_servers = mcp_servers_for_project(&project, cx);
366
367 cx.spawn(async move |cx| {
368 let response = self.connection
369 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
370 .await
371 .map_err(map_acp_error)?;
372
373 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
374
375 if let Some(default_mode) = self.default_mode.clone() {
376 if let Some(modes) = modes.as_ref() {
377 let mut modes_ref = modes.borrow_mut();
378 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
379
380 if has_mode {
381 let initial_mode_id = modes_ref.current_mode_id.clone();
382
383 cx.spawn({
384 let default_mode = default_mode.clone();
385 let session_id = response.session_id.clone();
386 let modes = modes.clone();
387 let conn = self.connection.clone();
388 async move |_| {
389 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
390 .await.log_err();
391
392 if result.is_none() {
393 modes.borrow_mut().current_mode_id = initial_mode_id;
394 }
395 }
396 }).detach();
397
398 modes_ref.current_mode_id = default_mode;
399 } else {
400 let available_modes = modes_ref
401 .available_modes
402 .iter()
403 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
404 .collect::<Vec<_>>()
405 .join("\n");
406
407 log::warn!(
408 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
409 );
410 }
411 }
412 }
413
414 if let Some(default_model) = self.default_model.clone() {
415 if let Some(models) = models.as_ref() {
416 let mut models_ref = models.borrow_mut();
417 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
418
419 if has_model {
420 let initial_model_id = models_ref.current_model_id.clone();
421
422 cx.spawn({
423 let default_model = default_model.clone();
424 let session_id = response.session_id.clone();
425 let models = models.clone();
426 let conn = self.connection.clone();
427 async move |_| {
428 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
429 .await.log_err();
430
431 if result.is_none() {
432 models.borrow_mut().current_model_id = initial_model_id;
433 }
434 }
435 }).detach();
436
437 models_ref.current_model_id = default_model;
438 } else {
439 let available_models = models_ref
440 .available_models
441 .iter()
442 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
443 .collect::<Vec<_>>()
444 .join("\n");
445
446 log::warn!(
447 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
448 );
449 }
450 }
451 }
452
453 if let Some(config_opts) = config_options.as_ref() {
454 let defaults_to_apply: Vec<_> = {
455 let config_opts_ref = config_opts.borrow();
456 config_opts_ref
457 .iter()
458 .filter_map(|config_option| {
459 let default_value = self.default_config_options.get(&*config_option.id.0)?;
460
461 let is_valid = match &config_option.kind {
462 acp::SessionConfigKind::Select(select) => match &select.options {
463 acp::SessionConfigSelectOptions::Ungrouped(options) => {
464 options.iter().any(|opt| &*opt.value.0 == default_value.as_str())
465 }
466 acp::SessionConfigSelectOptions::Grouped(groups) => groups
467 .iter()
468 .any(|g| g.options.iter().any(|opt| &*opt.value.0 == default_value.as_str())),
469 _ => false,
470 },
471 _ => false,
472 };
473
474 if is_valid {
475 let initial_value = match &config_option.kind {
476 acp::SessionConfigKind::Select(select) => {
477 Some(select.current_value.clone())
478 }
479 _ => None,
480 };
481 Some((config_option.id.clone(), default_value.clone(), initial_value))
482 } else {
483 log::warn!(
484 "`{}` is not a valid value for config option `{}` in {}",
485 default_value,
486 config_option.id.0,
487 name
488 );
489 None
490 }
491 })
492 .collect()
493 };
494
495 for (config_id, default_value, initial_value) in defaults_to_apply {
496 cx.spawn({
497 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
498 let session_id = response.session_id.clone();
499 let config_id_clone = config_id.clone();
500 let config_opts = config_opts.clone();
501 let conn = self.connection.clone();
502 async move |_| {
503 let result = conn
504 .set_session_config_option(
505 acp::SetSessionConfigOptionRequest::new(
506 session_id,
507 config_id_clone.clone(),
508 default_value_id,
509 ),
510 )
511 .await
512 .log_err();
513
514 if result.is_none() {
515 if let Some(initial) = initial_value {
516 let mut opts = config_opts.borrow_mut();
517 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
518 if let acp::SessionConfigKind::Select(select) =
519 &mut opt.kind
520 {
521 select.current_value = initial;
522 }
523 }
524 }
525 }
526 }
527 })
528 .detach();
529
530 let mut opts = config_opts.borrow_mut();
531 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
532 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
533 select.current_value = acp::SessionConfigValueId::new(default_value);
534 }
535 }
536 }
537 }
538
539 let action_log = cx.new(|_| ActionLog::new(project.clone()));
540 let thread: Entity<AcpThread> = cx.new(|cx| {
541 AcpThread::new(
542 None,
543 self.display_name.clone(),
544 self.clone(),
545 project,
546 action_log,
547 response.session_id.clone(),
548 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
549 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
550 cx,
551 )
552 });
553
554 self.sessions.borrow_mut().insert(
555 response.session_id,
556 AcpSession {
557 thread: thread.downgrade(),
558 suppress_abort_err: false,
559 session_modes: modes,
560 models,
561 config_options: config_options.map(ConfigOptions::new),
562 },
563 );
564
565 Ok(thread)
566 })
567 }
568
569 fn supports_load_session(&self) -> bool {
570 self.agent_capabilities.load_session
571 }
572
573 fn supports_resume_session(&self) -> bool {
574 self.agent_capabilities
575 .session_capabilities
576 .resume
577 .is_some()
578 }
579
580 fn load_session(
581 self: Rc<Self>,
582 session: AgentSessionInfo,
583 project: Entity<Project>,
584 cwd: &Path,
585 cx: &mut App,
586 ) -> Task<Result<Entity<AcpThread>>> {
587 if !self.agent_capabilities.load_session {
588 return Task::ready(Err(anyhow!(LoadError::Other(
589 "Loading sessions is not supported by this agent.".into()
590 ))));
591 }
592
593 let cwd = cwd.to_path_buf();
594 let mcp_servers = mcp_servers_for_project(&project, cx);
595 let action_log = cx.new(|_| ActionLog::new(project.clone()));
596 let title = session
597 .title
598 .clone()
599 .unwrap_or_else(|| self.display_name.clone());
600 let thread: Entity<AcpThread> = cx.new(|cx| {
601 AcpThread::new(
602 None,
603 title,
604 self.clone(),
605 project,
606 action_log,
607 session.session_id.clone(),
608 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
609 cx,
610 )
611 });
612
613 self.sessions.borrow_mut().insert(
614 session.session_id.clone(),
615 AcpSession {
616 thread: thread.downgrade(),
617 suppress_abort_err: false,
618 session_modes: None,
619 models: None,
620 config_options: None,
621 },
622 );
623
624 cx.spawn(async move |_| {
625 let response = match self
626 .connection
627 .load_session(
628 acp::LoadSessionRequest::new(session.session_id.clone(), cwd)
629 .mcp_servers(mcp_servers),
630 )
631 .await
632 {
633 Ok(response) => response,
634 Err(err) => {
635 self.sessions.borrow_mut().remove(&session.session_id);
636 return Err(map_acp_error(err));
637 }
638 };
639
640 let (modes, models, config_options) =
641 config_state(response.modes, response.models, response.config_options);
642 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
643 session.session_modes = modes;
644 session.models = models;
645 session.config_options = config_options.map(ConfigOptions::new);
646 }
647
648 Ok(thread)
649 })
650 }
651
652 fn resume_session(
653 self: Rc<Self>,
654 session: AgentSessionInfo,
655 project: Entity<Project>,
656 cwd: &Path,
657 cx: &mut App,
658 ) -> Task<Result<Entity<AcpThread>>> {
659 if self
660 .agent_capabilities
661 .session_capabilities
662 .resume
663 .is_none()
664 {
665 return Task::ready(Err(anyhow!(LoadError::Other(
666 "Resuming sessions is not supported by this agent.".into()
667 ))));
668 }
669
670 let cwd = cwd.to_path_buf();
671 let mcp_servers = mcp_servers_for_project(&project, cx);
672 let action_log = cx.new(|_| ActionLog::new(project.clone()));
673 let title = session
674 .title
675 .clone()
676 .unwrap_or_else(|| self.display_name.clone());
677 let thread: Entity<AcpThread> = cx.new(|cx| {
678 AcpThread::new(
679 None,
680 title,
681 self.clone(),
682 project,
683 action_log,
684 session.session_id.clone(),
685 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
686 cx,
687 )
688 });
689
690 self.sessions.borrow_mut().insert(
691 session.session_id.clone(),
692 AcpSession {
693 thread: thread.downgrade(),
694 suppress_abort_err: false,
695 session_modes: None,
696 models: None,
697 config_options: None,
698 },
699 );
700
701 cx.spawn(async move |_| {
702 let response = match self
703 .connection
704 .resume_session(
705 acp::ResumeSessionRequest::new(session.session_id.clone(), cwd)
706 .mcp_servers(mcp_servers),
707 )
708 .await
709 {
710 Ok(response) => response,
711 Err(err) => {
712 self.sessions.borrow_mut().remove(&session.session_id);
713 return Err(map_acp_error(err));
714 }
715 };
716
717 let (modes, models, config_options) =
718 config_state(response.modes, response.models, response.config_options);
719 if let Some(session) = self.sessions.borrow_mut().get_mut(&session.session_id) {
720 session.session_modes = modes;
721 session.models = models;
722 session.config_options = config_options.map(ConfigOptions::new);
723 }
724
725 Ok(thread)
726 })
727 }
728
729 fn auth_methods(&self) -> &[acp::AuthMethod] {
730 &self.auth_methods
731 }
732
733 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
734 let conn = self.connection.clone();
735 cx.foreground_executor().spawn(async move {
736 conn.authenticate(acp::AuthenticateRequest::new(method_id))
737 .await?;
738 Ok(())
739 })
740 }
741
742 fn prompt(
743 &self,
744 _id: Option<acp_thread::UserMessageId>,
745 params: acp::PromptRequest,
746 cx: &mut App,
747 ) -> Task<Result<acp::PromptResponse>> {
748 let conn = self.connection.clone();
749 let sessions = self.sessions.clone();
750 let session_id = params.session_id.clone();
751 cx.foreground_executor().spawn(async move {
752 let result = conn.prompt(params).await;
753
754 let mut suppress_abort_err = false;
755
756 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
757 suppress_abort_err = session.suppress_abort_err;
758 session.suppress_abort_err = false;
759 }
760
761 match result {
762 Ok(response) => Ok(response),
763 Err(err) => {
764 if err.code == acp::ErrorCode::AuthRequired {
765 return Err(anyhow!(acp::Error::auth_required()));
766 }
767
768 if err.code != ErrorCode::InternalError {
769 anyhow::bail!(err)
770 }
771
772 let Some(data) = &err.data else {
773 anyhow::bail!(err)
774 };
775
776 // Temporary workaround until the following PR is generally available:
777 // https://github.com/google-gemini/gemini-cli/pull/6656
778
779 #[derive(Deserialize)]
780 #[serde(deny_unknown_fields)]
781 struct ErrorDetails {
782 details: Box<str>,
783 }
784
785 match serde_json::from_value(data.clone()) {
786 Ok(ErrorDetails { details }) => {
787 if suppress_abort_err
788 && (details.contains("This operation was aborted")
789 || details.contains("The user aborted a request"))
790 {
791 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
792 } else {
793 Err(anyhow!(details))
794 }
795 }
796 Err(_) => Err(anyhow!(err)),
797 }
798 }
799 }
800 })
801 }
802
803 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
804 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
805 session.suppress_abort_err = true;
806 }
807 let conn = self.connection.clone();
808 let params = acp::CancelNotification::new(session_id.clone());
809 cx.foreground_executor()
810 .spawn(async move { conn.cancel(params).await })
811 .detach();
812 }
813
814 fn session_modes(
815 &self,
816 session_id: &acp::SessionId,
817 _cx: &App,
818 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
819 let sessions = self.sessions.clone();
820 let sessions_ref = sessions.borrow();
821 let Some(session) = sessions_ref.get(session_id) else {
822 return None;
823 };
824
825 if let Some(modes) = session.session_modes.as_ref() {
826 Some(Rc::new(AcpSessionModes {
827 connection: self.connection.clone(),
828 session_id: session_id.clone(),
829 state: modes.clone(),
830 }) as _)
831 } else {
832 None
833 }
834 }
835
836 fn model_selector(
837 &self,
838 session_id: &acp::SessionId,
839 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
840 let sessions = self.sessions.clone();
841 let sessions_ref = sessions.borrow();
842 let Some(session) = sessions_ref.get(session_id) else {
843 return None;
844 };
845
846 if let Some(models) = session.models.as_ref() {
847 Some(Rc::new(AcpModelSelector::new(
848 session_id.clone(),
849 self.connection.clone(),
850 models.clone(),
851 )) as _)
852 } else {
853 None
854 }
855 }
856
857 fn session_config_options(
858 &self,
859 session_id: &acp::SessionId,
860 _cx: &App,
861 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
862 let sessions = self.sessions.borrow();
863 let session = sessions.get(session_id)?;
864
865 let config_opts = session.config_options.as_ref()?;
866
867 Some(Rc::new(AcpSessionConfigOptions {
868 session_id: session_id.clone(),
869 connection: self.connection.clone(),
870 state: config_opts.config_options.clone(),
871 watch_tx: config_opts.tx.clone(),
872 watch_rx: config_opts.rx.clone(),
873 }) as _)
874 }
875
876 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
877 self.session_list.clone().map(|s| s as _)
878 }
879
880 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
881 self
882 }
883}
884
885fn map_acp_error(err: acp::Error) -> anyhow::Error {
886 if err.code == acp::ErrorCode::AuthRequired {
887 let mut error = AuthRequired::new();
888
889 if err.message != acp::ErrorCode::AuthRequired.to_string() {
890 error = error.with_description(err.message);
891 }
892
893 anyhow!(error)
894 } else {
895 anyhow!(err)
896 }
897}
898
899fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
900 let context_server_store = project.read(cx).context_server_store().read(cx);
901 let is_local = project.read(cx).is_local();
902 context_server_store
903 .configured_server_ids()
904 .iter()
905 .filter_map(|id| {
906 let configuration = context_server_store.configuration_for_server(id)?;
907 match &*configuration {
908 project::context_server_store::ContextServerConfiguration::Custom {
909 command,
910 remote,
911 ..
912 }
913 | project::context_server_store::ContextServerConfiguration::Extension {
914 command,
915 remote,
916 ..
917 } if is_local || *remote => Some(acp::McpServer::Stdio(
918 acp::McpServerStdio::new(id.0.to_string(), &command.path)
919 .args(command.args.clone())
920 .env(if let Some(env) = command.env.as_ref() {
921 env.iter()
922 .map(|(name, value)| acp::EnvVariable::new(name, value))
923 .collect()
924 } else {
925 vec![]
926 }),
927 )),
928 project::context_server_store::ContextServerConfiguration::Http {
929 url,
930 headers,
931 timeout: _,
932 } => Some(acp::McpServer::Http(
933 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
934 headers
935 .iter()
936 .map(|(name, value)| acp::HttpHeader::new(name, value))
937 .collect(),
938 ),
939 )),
940 _ => None,
941 }
942 })
943 .collect()
944}
945
946fn config_state(
947 modes: Option<acp::SessionModeState>,
948 models: Option<acp::SessionModelState>,
949 config_options: Option<Vec<acp::SessionConfigOption>>,
950) -> (
951 Option<Rc<RefCell<acp::SessionModeState>>>,
952 Option<Rc<RefCell<acp::SessionModelState>>>,
953 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
954) {
955 if let Some(opts) = config_options {
956 return (None, None, Some(Rc::new(RefCell::new(opts))));
957 }
958
959 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
960 let models = models.map(|models| Rc::new(RefCell::new(models)));
961 (modes, models, None)
962}
963
964struct AcpSessionModes {
965 session_id: acp::SessionId,
966 connection: Rc<acp::ClientSideConnection>,
967 state: Rc<RefCell<acp::SessionModeState>>,
968}
969
970impl acp_thread::AgentSessionModes for AcpSessionModes {
971 fn current_mode(&self) -> acp::SessionModeId {
972 self.state.borrow().current_mode_id.clone()
973 }
974
975 fn all_modes(&self) -> Vec<acp::SessionMode> {
976 self.state.borrow().available_modes.clone()
977 }
978
979 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
980 let connection = self.connection.clone();
981 let session_id = self.session_id.clone();
982 let old_mode_id;
983 {
984 let mut state = self.state.borrow_mut();
985 old_mode_id = state.current_mode_id.clone();
986 state.current_mode_id = mode_id.clone();
987 };
988 let state = self.state.clone();
989 cx.foreground_executor().spawn(async move {
990 let result = connection
991 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
992 .await;
993
994 if result.is_err() {
995 state.borrow_mut().current_mode_id = old_mode_id;
996 }
997
998 result?;
999
1000 Ok(())
1001 })
1002 }
1003}
1004
1005struct AcpModelSelector {
1006 session_id: acp::SessionId,
1007 connection: Rc<acp::ClientSideConnection>,
1008 state: Rc<RefCell<acp::SessionModelState>>,
1009}
1010
1011impl AcpModelSelector {
1012 fn new(
1013 session_id: acp::SessionId,
1014 connection: Rc<acp::ClientSideConnection>,
1015 state: Rc<RefCell<acp::SessionModelState>>,
1016 ) -> Self {
1017 Self {
1018 session_id,
1019 connection,
1020 state,
1021 }
1022 }
1023}
1024
1025impl acp_thread::AgentModelSelector for AcpModelSelector {
1026 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1027 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1028 self.state
1029 .borrow()
1030 .available_models
1031 .clone()
1032 .into_iter()
1033 .map(acp_thread::AgentModelInfo::from)
1034 .collect(),
1035 )))
1036 }
1037
1038 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1039 let connection = self.connection.clone();
1040 let session_id = self.session_id.clone();
1041 let old_model_id;
1042 {
1043 let mut state = self.state.borrow_mut();
1044 old_model_id = state.current_model_id.clone();
1045 state.current_model_id = model_id.clone();
1046 };
1047 let state = self.state.clone();
1048 cx.foreground_executor().spawn(async move {
1049 let result = connection
1050 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1051 .await;
1052
1053 if result.is_err() {
1054 state.borrow_mut().current_model_id = old_model_id;
1055 }
1056
1057 result?;
1058
1059 Ok(())
1060 })
1061 }
1062
1063 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1064 let state = self.state.borrow();
1065 Task::ready(
1066 state
1067 .available_models
1068 .iter()
1069 .find(|m| m.model_id == state.current_model_id)
1070 .cloned()
1071 .map(acp_thread::AgentModelInfo::from)
1072 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1073 )
1074 }
1075}
1076
1077struct AcpSessionConfigOptions {
1078 session_id: acp::SessionId,
1079 connection: Rc<acp::ClientSideConnection>,
1080 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1081 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1082 watch_rx: watch::Receiver<()>,
1083}
1084
1085impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1086 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1087 self.state.borrow().clone()
1088 }
1089
1090 fn set_config_option(
1091 &self,
1092 config_id: acp::SessionConfigId,
1093 value: acp::SessionConfigValueId,
1094 cx: &mut App,
1095 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1096 let connection = self.connection.clone();
1097 let session_id = self.session_id.clone();
1098 let state = self.state.clone();
1099
1100 let watch_tx = self.watch_tx.clone();
1101
1102 cx.foreground_executor().spawn(async move {
1103 let response = connection
1104 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1105 session_id, config_id, value,
1106 ))
1107 .await?;
1108
1109 *state.borrow_mut() = response.config_options.clone();
1110 watch_tx.borrow_mut().send(()).ok();
1111 Ok(response.config_options)
1112 })
1113 }
1114
1115 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1116 Some(self.watch_rx.clone())
1117 }
1118}
1119
1120struct ClientDelegate {
1121 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1122 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1123 cx: AsyncApp,
1124}
1125
1126#[async_trait::async_trait(?Send)]
1127impl acp::Client for ClientDelegate {
1128 async fn request_permission(
1129 &self,
1130 arguments: acp::RequestPermissionRequest,
1131 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1132 let thread;
1133 {
1134 let sessions_ref = self.sessions.borrow();
1135 let session = sessions_ref
1136 .get(&arguments.session_id)
1137 .context("Failed to get session")?;
1138 thread = session.thread.clone();
1139 }
1140
1141 let cx = &mut self.cx.clone();
1142
1143 let task = thread.update(cx, |thread, cx| {
1144 thread.request_tool_call_authorization(
1145 arguments.tool_call,
1146 acp_thread::PermissionOptions::Flat(arguments.options),
1147 cx,
1148 )
1149 })??;
1150
1151 let outcome = task.await;
1152
1153 Ok(acp::RequestPermissionResponse::new(outcome))
1154 }
1155
1156 async fn write_text_file(
1157 &self,
1158 arguments: acp::WriteTextFileRequest,
1159 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1160 let cx = &mut self.cx.clone();
1161 let task = self
1162 .session_thread(&arguments.session_id)?
1163 .update(cx, |thread, cx| {
1164 thread.write_text_file(arguments.path, arguments.content, cx)
1165 })?;
1166
1167 task.await?;
1168
1169 Ok(Default::default())
1170 }
1171
1172 async fn read_text_file(
1173 &self,
1174 arguments: acp::ReadTextFileRequest,
1175 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1176 let task = self.session_thread(&arguments.session_id)?.update(
1177 &mut self.cx.clone(),
1178 |thread, cx| {
1179 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1180 },
1181 )?;
1182
1183 let content = task.await?;
1184
1185 Ok(acp::ReadTextFileResponse::new(content))
1186 }
1187
1188 async fn session_notification(
1189 &self,
1190 notification: acp::SessionNotification,
1191 ) -> Result<(), acp::Error> {
1192 let sessions = self.sessions.borrow();
1193 let session = sessions
1194 .get(¬ification.session_id)
1195 .context("Failed to get session")?;
1196
1197 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1198 current_mode_id,
1199 ..
1200 }) = ¬ification.update
1201 {
1202 if let Some(session_modes) = &session.session_modes {
1203 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1204 }
1205 }
1206
1207 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1208 config_options,
1209 ..
1210 }) = ¬ification.update
1211 {
1212 if let Some(opts) = &session.config_options {
1213 *opts.config_options.borrow_mut() = config_options.clone();
1214 opts.tx.borrow_mut().send(()).ok();
1215 }
1216 }
1217
1218 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1219 && let Some(session_list) = self.session_list.borrow().as_ref()
1220 {
1221 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1222 }
1223
1224 // Clone so we can inspect meta both before and after handing off to the thread
1225 let update_clone = notification.update.clone();
1226
1227 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1228 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1229 if let Some(meta) = &tc.meta {
1230 if let Some(terminal_info) = meta.get("terminal_info") {
1231 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1232 {
1233 let terminal_id = acp::TerminalId::new(id_str);
1234 let cwd = terminal_info
1235 .get("cwd")
1236 .and_then(|v| v.as_str().map(PathBuf::from));
1237
1238 // Create a minimal display-only lower-level terminal and register it.
1239 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1240 let builder = TerminalBuilder::new_display_only(
1241 CursorShape::default(),
1242 AlternateScroll::On,
1243 None,
1244 0,
1245 cx.background_executor(),
1246 thread.project().read(cx).path_style(cx),
1247 )?;
1248 let lower = cx.new(|cx| builder.subscribe(cx));
1249 thread.on_terminal_provider_event(
1250 TerminalProviderEvent::Created {
1251 terminal_id,
1252 label: tc.title.clone(),
1253 cwd,
1254 output_byte_limit: None,
1255 terminal: lower,
1256 },
1257 cx,
1258 );
1259 anyhow::Ok(())
1260 });
1261 }
1262 }
1263 }
1264 }
1265
1266 // Forward the update to the acp_thread as usual.
1267 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1268 thread.handle_session_update(notification.update.clone(), cx)
1269 })??;
1270
1271 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1272 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1273 if let Some(meta) = &tcu.meta {
1274 if let Some(term_out) = meta.get("terminal_output") {
1275 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1276 let terminal_id = acp::TerminalId::new(id_str);
1277 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1278 let data = s.as_bytes().to_vec();
1279 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1280 thread.on_terminal_provider_event(
1281 TerminalProviderEvent::Output { terminal_id, data },
1282 cx,
1283 );
1284 });
1285 }
1286 }
1287 }
1288
1289 // terminal_exit
1290 if let Some(term_exit) = meta.get("terminal_exit") {
1291 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1292 let terminal_id = acp::TerminalId::new(id_str);
1293 let status = acp::TerminalExitStatus::new()
1294 .exit_code(
1295 term_exit
1296 .get("exit_code")
1297 .and_then(|v| v.as_u64())
1298 .map(|i| i as u32),
1299 )
1300 .signal(
1301 term_exit
1302 .get("signal")
1303 .and_then(|v| v.as_str().map(|s| s.to_string())),
1304 );
1305
1306 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1307 thread.on_terminal_provider_event(
1308 TerminalProviderEvent::Exit {
1309 terminal_id,
1310 status,
1311 },
1312 cx,
1313 );
1314 });
1315 }
1316 }
1317 }
1318 }
1319
1320 Ok(())
1321 }
1322
1323 async fn create_terminal(
1324 &self,
1325 args: acp::CreateTerminalRequest,
1326 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1327 let thread = self.session_thread(&args.session_id)?;
1328 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1329
1330 let terminal_entity = acp_thread::create_terminal_entity(
1331 args.command.clone(),
1332 &args.args,
1333 args.env
1334 .into_iter()
1335 .map(|env| (env.name, env.value))
1336 .collect(),
1337 args.cwd.clone(),
1338 &project,
1339 &mut self.cx.clone(),
1340 )
1341 .await?;
1342
1343 // Register with renderer
1344 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1345 thread.register_terminal_created(
1346 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1347 format!("{} {}", args.command, args.args.join(" ")),
1348 args.cwd.clone(),
1349 args.output_byte_limit,
1350 terminal_entity,
1351 cx,
1352 )
1353 })?;
1354 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1355 Ok(acp::CreateTerminalResponse::new(terminal_id))
1356 }
1357
1358 async fn kill_terminal_command(
1359 &self,
1360 args: acp::KillTerminalCommandRequest,
1361 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
1362 self.session_thread(&args.session_id)?
1363 .update(&mut self.cx.clone(), |thread, cx| {
1364 thread.kill_terminal(args.terminal_id, cx)
1365 })??;
1366
1367 Ok(Default::default())
1368 }
1369
1370 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1371 Err(acp::Error::method_not_found())
1372 }
1373
1374 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1375 Err(acp::Error::method_not_found())
1376 }
1377
1378 async fn release_terminal(
1379 &self,
1380 args: acp::ReleaseTerminalRequest,
1381 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1382 self.session_thread(&args.session_id)?
1383 .update(&mut self.cx.clone(), |thread, cx| {
1384 thread.release_terminal(args.terminal_id, cx)
1385 })??;
1386
1387 Ok(Default::default())
1388 }
1389
1390 async fn terminal_output(
1391 &self,
1392 args: acp::TerminalOutputRequest,
1393 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1394 self.session_thread(&args.session_id)?
1395 .read_with(&mut self.cx.clone(), |thread, cx| {
1396 let out = thread
1397 .terminal(args.terminal_id)?
1398 .read(cx)
1399 .current_output(cx);
1400
1401 Ok(out)
1402 })?
1403 }
1404
1405 async fn wait_for_terminal_exit(
1406 &self,
1407 args: acp::WaitForTerminalExitRequest,
1408 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1409 let exit_status = self
1410 .session_thread(&args.session_id)?
1411 .update(&mut self.cx.clone(), |thread, cx| {
1412 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1413 })??
1414 .await;
1415
1416 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1417 }
1418}
1419
1420impl ClientDelegate {
1421 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1422 let sessions = self.sessions.borrow();
1423 sessions
1424 .get(session_id)
1425 .context("Failed to get session")
1426 .map(|session| session.thread.clone())
1427 }
1428}