1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::Project;
13use project::agent_server_store::{AgentServerCommand, GEMINI_NAME};
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::process::Child;
19
20use std::path::PathBuf;
21use std::process::Stdio;
22use std::{any::Any, cell::RefCell};
23use std::{path::Path, rc::Rc};
24use thiserror::Error;
25
26use anyhow::{Context as _, Result};
27use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
28
29use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
30use terminal::TerminalBuilder;
31use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
32
33#[derive(Debug, Error)]
34#[error("Unsupported version")]
35pub struct UnsupportedVersion;
36
37pub struct AcpConnection {
38 server_name: SharedString,
39 display_name: SharedString,
40 telemetry_id: SharedString,
41 connection: Rc<acp::ClientSideConnection>,
42 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
43 auth_methods: Vec<acp::AuthMethod>,
44 agent_capabilities: acp::AgentCapabilities,
45 default_mode: Option<acp::SessionModeId>,
46 default_model: Option<acp::ModelId>,
47 default_config_options: HashMap<String, String>,
48 child: Child,
49 session_list: Option<Rc<AcpSessionList>>,
50 _io_task: Task<Result<(), acp::Error>>,
51 _wait_task: Task<Result<()>>,
52 _stderr_task: Task<Result<()>>,
53}
54
55struct ConfigOptions {
56 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
57 tx: Rc<RefCell<watch::Sender<()>>>,
58 rx: watch::Receiver<()>,
59}
60
61impl ConfigOptions {
62 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
63 let (tx, rx) = watch::channel(());
64 Self {
65 config_options,
66 tx: Rc::new(RefCell::new(tx)),
67 rx,
68 }
69 }
70}
71
72pub struct AcpSession {
73 thread: WeakEntity<AcpThread>,
74 suppress_abort_err: bool,
75 models: Option<Rc<RefCell<acp::SessionModelState>>>,
76 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
77 config_options: Option<ConfigOptions>,
78}
79
80pub struct AcpSessionList {
81 connection: Rc<acp::ClientSideConnection>,
82 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
83 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
84}
85
86impl AcpSessionList {
87 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
88 let (tx, rx) = smol::channel::unbounded();
89 Self {
90 connection,
91 updates_tx: tx,
92 updates_rx: rx,
93 }
94 }
95
96 fn notify_update(&self) {
97 self.updates_tx
98 .try_send(acp_thread::SessionListUpdate::Refresh)
99 .log_err();
100 }
101
102 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
103 self.updates_tx
104 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
105 .log_err();
106 }
107}
108
109impl AgentSessionList for AcpSessionList {
110 fn list_sessions(
111 &self,
112 request: AgentSessionListRequest,
113 cx: &mut App,
114 ) -> Task<Result<AgentSessionListResponse>> {
115 let conn = self.connection.clone();
116 cx.foreground_executor().spawn(async move {
117 let acp_request = acp::ListSessionsRequest::new()
118 .cwd(request.cwd)
119 .cursor(request.cursor);
120 let response = conn.list_sessions(acp_request).await?;
121 Ok(AgentSessionListResponse {
122 sessions: response
123 .sessions
124 .into_iter()
125 .map(|s| AgentSessionInfo {
126 session_id: s.session_id,
127 cwd: Some(s.cwd),
128 title: s.title.map(Into::into),
129 updated_at: s.updated_at.and_then(|date_str| {
130 chrono::DateTime::parse_from_rfc3339(&date_str)
131 .ok()
132 .map(|dt| dt.with_timezone(&chrono::Utc))
133 }),
134 created_at: None,
135 meta: s.meta,
136 })
137 .collect(),
138 next_cursor: response.next_cursor,
139 meta: response.meta,
140 })
141 })
142 }
143
144 fn watch(
145 &self,
146 _cx: &mut App,
147 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
148 Some(self.updates_rx.clone())
149 }
150
151 fn notify_refresh(&self) {
152 self.notify_update();
153 }
154
155 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
156 self
157 }
158}
159
160pub async fn connect(
161 server_name: SharedString,
162 display_name: SharedString,
163 command: AgentServerCommand,
164 default_mode: Option<acp::SessionModeId>,
165 default_model: Option<acp::ModelId>,
166 default_config_options: HashMap<String, String>,
167 cx: &mut AsyncApp,
168) -> Result<Rc<dyn AgentConnection>> {
169 let conn = AcpConnection::stdio(
170 server_name,
171 display_name,
172 command.clone(),
173 default_mode,
174 default_model,
175 default_config_options,
176 cx,
177 )
178 .await?;
179 Ok(Rc::new(conn) as _)
180}
181
182const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
183
184impl AcpConnection {
185 pub async fn stdio(
186 server_name: SharedString,
187 display_name: SharedString,
188 command: AgentServerCommand,
189 default_mode: Option<acp::SessionModeId>,
190 default_model: Option<acp::ModelId>,
191 default_config_options: HashMap<String, String>,
192 cx: &mut AsyncApp,
193 ) -> Result<Self> {
194 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
195 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
196 let mut child =
197 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
198 child.envs(command.env.iter().flatten());
199 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
200
201 let stdout = child.stdout.take().context("Failed to take stdout")?;
202 let stdin = child.stdin.take().context("Failed to take stdin")?;
203 let stderr = child.stderr.take().context("Failed to take stderr")?;
204 log::debug!(
205 "Spawning external agent server: {:?}, {:?}",
206 command.path,
207 command.args
208 );
209 log::trace!("Spawned (pid: {})", child.id());
210
211 let sessions = Rc::new(RefCell::new(HashMap::default()));
212
213 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
214 (
215 release_channel::ReleaseChannel::try_global(cx)
216 .map(|release_channel| release_channel.display_name()),
217 release_channel::AppVersion::global(cx).to_string(),
218 )
219 });
220
221 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
222 Rc::new(RefCell::new(None));
223
224 let client = ClientDelegate {
225 sessions: sessions.clone(),
226 session_list: client_session_list.clone(),
227 cx: cx.clone(),
228 };
229 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
230 let foreground_executor = cx.foreground_executor().clone();
231 move |fut| {
232 foreground_executor.spawn(fut).detach();
233 }
234 });
235
236 let io_task = cx.background_spawn(io_task);
237
238 let stderr_task = cx.background_spawn(async move {
239 let mut stderr = BufReader::new(stderr);
240 let mut line = String::new();
241 while let Ok(n) = stderr.read_line(&mut line).await
242 && n > 0
243 {
244 log::warn!("agent stderr: {}", line.trim());
245 line.clear();
246 }
247 Ok(())
248 });
249
250 let wait_task = cx.spawn({
251 let sessions = sessions.clone();
252 let status_fut = child.status();
253 async move |cx| {
254 let status = status_fut.await?;
255
256 for session in sessions.borrow().values() {
257 session
258 .thread
259 .update(cx, |thread, cx| {
260 thread.emit_load_error(LoadError::Exited { status }, cx)
261 })
262 .ok();
263 }
264
265 anyhow::Ok(())
266 }
267 });
268
269 let connection = Rc::new(connection);
270
271 cx.update(|cx| {
272 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
273 registry.set_active_connection(server_name.clone(), &connection, cx)
274 });
275 });
276
277 let response = connection
278 .initialize(
279 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
280 .client_capabilities(
281 acp::ClientCapabilities::new()
282 .fs(acp::FileSystemCapabilities::new()
283 .read_text_file(true)
284 .write_text_file(true))
285 .terminal(true)
286 // Experimental: Allow for rendering terminal output from the agents
287 .meta(acp::Meta::from_iter([
288 ("terminal_output".into(), true.into()),
289 ("terminal-auth".into(), true.into()),
290 ])),
291 )
292 .client_info(
293 acp::Implementation::new("zed", version)
294 .title(release_channel.map(ToOwned::to_owned)),
295 ),
296 )
297 .await?;
298
299 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
300 return Err(UnsupportedVersion.into());
301 }
302
303 let telemetry_id = response
304 .agent_info
305 // Use the one the agent provides if we have one
306 .map(|info| info.name.into())
307 // Otherwise, just use the name
308 .unwrap_or_else(|| server_name.clone());
309
310 let session_list = if response
311 .agent_capabilities
312 .session_capabilities
313 .list
314 .is_some()
315 {
316 let list = Rc::new(AcpSessionList::new(connection.clone()));
317 *client_session_list.borrow_mut() = Some(list.clone());
318 Some(list)
319 } else {
320 None
321 };
322
323 // TODO: Remove this override once Google team releases their official auth methods
324 let auth_methods = if server_name == GEMINI_NAME {
325 let mut args = command.args.clone();
326 args.retain(|a| a != "--experimental-acp");
327 let value = serde_json::json!({
328 "label": "gemini /auth",
329 "command": command.path.to_string_lossy().into_owned(),
330 "args": args,
331 "env": command.env.clone().unwrap_or_default(),
332 });
333 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
334 vec![acp::AuthMethod::Agent(
335 acp::AuthMethodAgent::new("spawn-gemini-cli", "Login")
336 .description("Login with your Google or Vertex AI account")
337 .meta(meta),
338 )]
339 } else {
340 response.auth_methods
341 };
342 Ok(Self {
343 auth_methods,
344 connection,
345 server_name,
346 display_name,
347 telemetry_id,
348 sessions,
349 agent_capabilities: response.agent_capabilities,
350 default_mode,
351 default_model,
352 default_config_options,
353 session_list,
354 _io_task: io_task,
355 _wait_task: wait_task,
356 _stderr_task: stderr_task,
357 child,
358 })
359 }
360
361 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
362 &self.agent_capabilities.prompt_capabilities
363 }
364
365 fn apply_default_config_options(
366 &self,
367 session_id: &acp::SessionId,
368 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
369 cx: &mut AsyncApp,
370 ) {
371 let name = self.server_name.clone();
372 let defaults_to_apply: Vec<_> = {
373 let config_opts_ref = config_options.borrow();
374 config_opts_ref
375 .iter()
376 .filter_map(|config_option| {
377 let default_value = self.default_config_options.get(&*config_option.id.0)?;
378
379 let is_valid = match &config_option.kind {
380 acp::SessionConfigKind::Select(select) => match &select.options {
381 acp::SessionConfigSelectOptions::Ungrouped(options) => options
382 .iter()
383 .any(|opt| &*opt.value.0 == default_value.as_str()),
384 acp::SessionConfigSelectOptions::Grouped(groups) => {
385 groups.iter().any(|g| {
386 g.options
387 .iter()
388 .any(|opt| &*opt.value.0 == default_value.as_str())
389 })
390 }
391 _ => false,
392 },
393 _ => false,
394 };
395
396 if is_valid {
397 let initial_value = match &config_option.kind {
398 acp::SessionConfigKind::Select(select) => {
399 Some(select.current_value.clone())
400 }
401 _ => None,
402 };
403 Some((
404 config_option.id.clone(),
405 default_value.clone(),
406 initial_value,
407 ))
408 } else {
409 log::warn!(
410 "`{}` is not a valid value for config option `{}` in {}",
411 default_value,
412 config_option.id.0,
413 name
414 );
415 None
416 }
417 })
418 .collect()
419 };
420
421 for (config_id, default_value, initial_value) in defaults_to_apply {
422 cx.spawn({
423 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
424 let session_id = session_id.clone();
425 let config_id_clone = config_id.clone();
426 let config_opts = config_options.clone();
427 let conn = self.connection.clone();
428 async move |_| {
429 let result = conn
430 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
431 session_id,
432 config_id_clone.clone(),
433 default_value_id,
434 ))
435 .await
436 .log_err();
437
438 if result.is_none() {
439 if let Some(initial) = initial_value {
440 let mut opts = config_opts.borrow_mut();
441 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
442 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
443 select.current_value = initial;
444 }
445 }
446 }
447 }
448 }
449 })
450 .detach();
451
452 let mut opts = config_options.borrow_mut();
453 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
454 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
455 select.current_value = acp::SessionConfigValueId::new(default_value);
456 }
457 }
458 }
459 }
460}
461
462impl Drop for AcpConnection {
463 fn drop(&mut self) {
464 self.child.kill().log_err();
465 }
466}
467
468impl AgentConnection for AcpConnection {
469 fn telemetry_id(&self) -> SharedString {
470 self.telemetry_id.clone()
471 }
472
473 fn new_session(
474 self: Rc<Self>,
475 project: Entity<Project>,
476 cwd: &Path,
477 cx: &mut App,
478 ) -> Task<Result<Entity<AcpThread>>> {
479 let name = self.server_name.clone();
480 let cwd = cwd.to_path_buf();
481 let mcp_servers = mcp_servers_for_project(&project, cx);
482
483 cx.spawn(async move |cx| {
484 let response = self.connection
485 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
486 .await
487 .map_err(map_acp_error)?;
488
489 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
490
491 if let Some(default_mode) = self.default_mode.clone() {
492 if let Some(modes) = modes.as_ref() {
493 let mut modes_ref = modes.borrow_mut();
494 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
495
496 if has_mode {
497 let initial_mode_id = modes_ref.current_mode_id.clone();
498
499 cx.spawn({
500 let default_mode = default_mode.clone();
501 let session_id = response.session_id.clone();
502 let modes = modes.clone();
503 let conn = self.connection.clone();
504 async move |_| {
505 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
506 .await.log_err();
507
508 if result.is_none() {
509 modes.borrow_mut().current_mode_id = initial_mode_id;
510 }
511 }
512 }).detach();
513
514 modes_ref.current_mode_id = default_mode;
515 } else {
516 let available_modes = modes_ref
517 .available_modes
518 .iter()
519 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
520 .collect::<Vec<_>>()
521 .join("\n");
522
523 log::warn!(
524 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
525 );
526 }
527 }
528 }
529
530 if let Some(default_model) = self.default_model.clone() {
531 if let Some(models) = models.as_ref() {
532 let mut models_ref = models.borrow_mut();
533 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
534
535 if has_model {
536 let initial_model_id = models_ref.current_model_id.clone();
537
538 cx.spawn({
539 let default_model = default_model.clone();
540 let session_id = response.session_id.clone();
541 let models = models.clone();
542 let conn = self.connection.clone();
543 async move |_| {
544 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
545 .await.log_err();
546
547 if result.is_none() {
548 models.borrow_mut().current_model_id = initial_model_id;
549 }
550 }
551 }).detach();
552
553 models_ref.current_model_id = default_model;
554 } else {
555 let available_models = models_ref
556 .available_models
557 .iter()
558 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
559 .collect::<Vec<_>>()
560 .join("\n");
561
562 log::warn!(
563 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
564 );
565 }
566 }
567 }
568
569 if let Some(config_opts) = config_options.as_ref() {
570 self.apply_default_config_options(&response.session_id, config_opts, cx);
571 }
572
573 let action_log = cx.new(|_| ActionLog::new(project.clone()));
574 let thread: Entity<AcpThread> = cx.new(|cx| {
575 AcpThread::new(
576 None,
577 self.display_name.clone(),
578 Some(cwd),
579 self.clone(),
580 project,
581 action_log,
582 response.session_id.clone(),
583 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
584 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
585 cx,
586 )
587 });
588
589 self.sessions.borrow_mut().insert(
590 response.session_id,
591 AcpSession {
592 thread: thread.downgrade(),
593 suppress_abort_err: false,
594 session_modes: modes,
595 models,
596 config_options: config_options.map(ConfigOptions::new),
597 },
598 );
599
600 Ok(thread)
601 })
602 }
603
604 fn supports_load_session(&self) -> bool {
605 self.agent_capabilities.load_session
606 }
607
608 fn supports_resume_session(&self) -> bool {
609 self.agent_capabilities
610 .session_capabilities
611 .resume
612 .is_some()
613 }
614
615 fn load_session(
616 self: Rc<Self>,
617 session_id: acp::SessionId,
618 project: Entity<Project>,
619 cwd: &Path,
620 title: Option<SharedString>,
621 cx: &mut App,
622 ) -> Task<Result<Entity<AcpThread>>> {
623 if !self.agent_capabilities.load_session {
624 return Task::ready(Err(anyhow!(LoadError::Other(
625 "Loading sessions is not supported by this agent.".into()
626 ))));
627 }
628
629 let cwd = cwd.to_path_buf();
630 let mcp_servers = mcp_servers_for_project(&project, cx);
631 let action_log = cx.new(|_| ActionLog::new(project.clone()));
632 let title = title.unwrap_or_else(|| self.display_name.clone());
633 let thread: Entity<AcpThread> = cx.new(|cx| {
634 AcpThread::new(
635 None,
636 title,
637 Some(cwd.clone()),
638 self.clone(),
639 project,
640 action_log,
641 session_id.clone(),
642 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
643 cx,
644 )
645 });
646
647 self.sessions.borrow_mut().insert(
648 session_id.clone(),
649 AcpSession {
650 thread: thread.downgrade(),
651 suppress_abort_err: false,
652 session_modes: None,
653 models: None,
654 config_options: None,
655 },
656 );
657
658 cx.spawn(async move |cx| {
659 let response = match self
660 .connection
661 .load_session(
662 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
663 )
664 .await
665 {
666 Ok(response) => response,
667 Err(err) => {
668 self.sessions.borrow_mut().remove(&session_id);
669 return Err(map_acp_error(err));
670 }
671 };
672
673 let (modes, models, config_options) =
674 config_state(response.modes, response.models, response.config_options);
675
676 if let Some(config_opts) = config_options.as_ref() {
677 self.apply_default_config_options(&session_id, config_opts, cx);
678 }
679
680 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
681 session.session_modes = modes;
682 session.models = models;
683 session.config_options = config_options.map(ConfigOptions::new);
684 }
685
686 Ok(thread)
687 })
688 }
689
690 fn resume_session(
691 self: Rc<Self>,
692 session_id: acp::SessionId,
693 project: Entity<Project>,
694 cwd: &Path,
695 title: Option<SharedString>,
696 cx: &mut App,
697 ) -> Task<Result<Entity<AcpThread>>> {
698 if self
699 .agent_capabilities
700 .session_capabilities
701 .resume
702 .is_none()
703 {
704 return Task::ready(Err(anyhow!(LoadError::Other(
705 "Resuming sessions is not supported by this agent.".into()
706 ))));
707 }
708
709 let cwd = cwd.to_path_buf();
710 let mcp_servers = mcp_servers_for_project(&project, cx);
711 let action_log = cx.new(|_| ActionLog::new(project.clone()));
712 let title = title.unwrap_or_else(|| self.display_name.clone());
713 let thread: Entity<AcpThread> = cx.new(|cx| {
714 AcpThread::new(
715 None,
716 title,
717 Some(cwd.clone()),
718 self.clone(),
719 project,
720 action_log,
721 session_id.clone(),
722 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
723 cx,
724 )
725 });
726
727 self.sessions.borrow_mut().insert(
728 session_id.clone(),
729 AcpSession {
730 thread: thread.downgrade(),
731 suppress_abort_err: false,
732 session_modes: None,
733 models: None,
734 config_options: None,
735 },
736 );
737
738 cx.spawn(async move |cx| {
739 let response = match self
740 .connection
741 .resume_session(
742 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
743 .mcp_servers(mcp_servers),
744 )
745 .await
746 {
747 Ok(response) => response,
748 Err(err) => {
749 self.sessions.borrow_mut().remove(&session_id);
750 return Err(map_acp_error(err));
751 }
752 };
753
754 let (modes, models, config_options) =
755 config_state(response.modes, response.models, response.config_options);
756
757 if let Some(config_opts) = config_options.as_ref() {
758 self.apply_default_config_options(&session_id, config_opts, cx);
759 }
760
761 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
762 session.session_modes = modes;
763 session.models = models;
764 session.config_options = config_options.map(ConfigOptions::new);
765 }
766
767 Ok(thread)
768 })
769 }
770
771 fn supports_close_session(&self) -> bool {
772 self.agent_capabilities.session_capabilities.close.is_some()
773 }
774
775 fn close_session(
776 self: Rc<Self>,
777 session_id: &acp::SessionId,
778 cx: &mut App,
779 ) -> Task<Result<()>> {
780 if !self.supports_close_session() {
781 return Task::ready(Err(anyhow!(LoadError::Other(
782 "Closing sessions is not supported by this agent.".into()
783 ))));
784 }
785
786 let conn = self.connection.clone();
787 let session_id = session_id.clone();
788 cx.foreground_executor().spawn(async move {
789 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
790 .await?;
791 self.sessions.borrow_mut().remove(&session_id);
792 Ok(())
793 })
794 }
795
796 fn auth_methods(&self) -> &[acp::AuthMethod] {
797 &self.auth_methods
798 }
799
800 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
801 let conn = self.connection.clone();
802 cx.foreground_executor().spawn(async move {
803 conn.authenticate(acp::AuthenticateRequest::new(method_id))
804 .await?;
805 Ok(())
806 })
807 }
808
809 fn prompt(
810 &self,
811 _id: Option<acp_thread::UserMessageId>,
812 params: acp::PromptRequest,
813 cx: &mut App,
814 ) -> Task<Result<acp::PromptResponse>> {
815 let conn = self.connection.clone();
816 let sessions = self.sessions.clone();
817 let session_id = params.session_id.clone();
818 cx.foreground_executor().spawn(async move {
819 let result = conn.prompt(params).await;
820
821 let mut suppress_abort_err = false;
822
823 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
824 suppress_abort_err = session.suppress_abort_err;
825 session.suppress_abort_err = false;
826 }
827
828 match result {
829 Ok(response) => Ok(response),
830 Err(err) => {
831 if err.code == acp::ErrorCode::AuthRequired {
832 return Err(anyhow!(acp::Error::auth_required()));
833 }
834
835 if err.code != ErrorCode::InternalError {
836 anyhow::bail!(err)
837 }
838
839 let Some(data) = &err.data else {
840 anyhow::bail!(err)
841 };
842
843 // Temporary workaround until the following PR is generally available:
844 // https://github.com/google-gemini/gemini-cli/pull/6656
845
846 #[derive(Deserialize)]
847 #[serde(deny_unknown_fields)]
848 struct ErrorDetails {
849 details: Box<str>,
850 }
851
852 match serde_json::from_value(data.clone()) {
853 Ok(ErrorDetails { details }) => {
854 if suppress_abort_err
855 && (details.contains("This operation was aborted")
856 || details.contains("The user aborted a request"))
857 {
858 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
859 } else {
860 Err(anyhow!(details))
861 }
862 }
863 Err(_) => Err(anyhow!(err)),
864 }
865 }
866 }
867 })
868 }
869
870 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
871 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
872 session.suppress_abort_err = true;
873 }
874 let conn = self.connection.clone();
875 let params = acp::CancelNotification::new(session_id.clone());
876 cx.foreground_executor()
877 .spawn(async move { conn.cancel(params).await })
878 .detach();
879 }
880
881 fn session_modes(
882 &self,
883 session_id: &acp::SessionId,
884 _cx: &App,
885 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
886 let sessions = self.sessions.clone();
887 let sessions_ref = sessions.borrow();
888 let Some(session) = sessions_ref.get(session_id) else {
889 return None;
890 };
891
892 if let Some(modes) = session.session_modes.as_ref() {
893 Some(Rc::new(AcpSessionModes {
894 connection: self.connection.clone(),
895 session_id: session_id.clone(),
896 state: modes.clone(),
897 }) as _)
898 } else {
899 None
900 }
901 }
902
903 fn model_selector(
904 &self,
905 session_id: &acp::SessionId,
906 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
907 let sessions = self.sessions.clone();
908 let sessions_ref = sessions.borrow();
909 let Some(session) = sessions_ref.get(session_id) else {
910 return None;
911 };
912
913 if let Some(models) = session.models.as_ref() {
914 Some(Rc::new(AcpModelSelector::new(
915 session_id.clone(),
916 self.connection.clone(),
917 models.clone(),
918 )) as _)
919 } else {
920 None
921 }
922 }
923
924 fn session_config_options(
925 &self,
926 session_id: &acp::SessionId,
927 _cx: &App,
928 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
929 let sessions = self.sessions.borrow();
930 let session = sessions.get(session_id)?;
931
932 let config_opts = session.config_options.as_ref()?;
933
934 Some(Rc::new(AcpSessionConfigOptions {
935 session_id: session_id.clone(),
936 connection: self.connection.clone(),
937 state: config_opts.config_options.clone(),
938 watch_tx: config_opts.tx.clone(),
939 watch_rx: config_opts.rx.clone(),
940 }) as _)
941 }
942
943 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
944 self.session_list.clone().map(|s| s as _)
945 }
946
947 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
948 self
949 }
950}
951
952fn map_acp_error(err: acp::Error) -> anyhow::Error {
953 if err.code == acp::ErrorCode::AuthRequired {
954 let mut error = AuthRequired::new();
955
956 if err.message != acp::ErrorCode::AuthRequired.to_string() {
957 error = error.with_description(err.message);
958 }
959
960 anyhow!(error)
961 } else {
962 anyhow!(err)
963 }
964}
965
966fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
967 let context_server_store = project.read(cx).context_server_store().read(cx);
968 let is_local = project.read(cx).is_local();
969 context_server_store
970 .configured_server_ids()
971 .iter()
972 .filter_map(|id| {
973 let configuration = context_server_store.configuration_for_server(id)?;
974 match &*configuration {
975 project::context_server_store::ContextServerConfiguration::Custom {
976 command,
977 remote,
978 ..
979 }
980 | project::context_server_store::ContextServerConfiguration::Extension {
981 command,
982 remote,
983 ..
984 } if is_local || *remote => Some(acp::McpServer::Stdio(
985 acp::McpServerStdio::new(id.0.to_string(), &command.path)
986 .args(command.args.clone())
987 .env(if let Some(env) = command.env.as_ref() {
988 env.iter()
989 .map(|(name, value)| acp::EnvVariable::new(name, value))
990 .collect()
991 } else {
992 vec![]
993 }),
994 )),
995 project::context_server_store::ContextServerConfiguration::Http {
996 url,
997 headers,
998 timeout: _,
999 } => Some(acp::McpServer::Http(
1000 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1001 headers
1002 .iter()
1003 .map(|(name, value)| acp::HttpHeader::new(name, value))
1004 .collect(),
1005 ),
1006 )),
1007 _ => None,
1008 }
1009 })
1010 .collect()
1011}
1012
1013fn config_state(
1014 modes: Option<acp::SessionModeState>,
1015 models: Option<acp::SessionModelState>,
1016 config_options: Option<Vec<acp::SessionConfigOption>>,
1017) -> (
1018 Option<Rc<RefCell<acp::SessionModeState>>>,
1019 Option<Rc<RefCell<acp::SessionModelState>>>,
1020 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1021) {
1022 if let Some(opts) = config_options {
1023 return (None, None, Some(Rc::new(RefCell::new(opts))));
1024 }
1025
1026 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1027 let models = models.map(|models| Rc::new(RefCell::new(models)));
1028 (modes, models, None)
1029}
1030
1031struct AcpSessionModes {
1032 session_id: acp::SessionId,
1033 connection: Rc<acp::ClientSideConnection>,
1034 state: Rc<RefCell<acp::SessionModeState>>,
1035}
1036
1037impl acp_thread::AgentSessionModes for AcpSessionModes {
1038 fn current_mode(&self) -> acp::SessionModeId {
1039 self.state.borrow().current_mode_id.clone()
1040 }
1041
1042 fn all_modes(&self) -> Vec<acp::SessionMode> {
1043 self.state.borrow().available_modes.clone()
1044 }
1045
1046 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1047 let connection = self.connection.clone();
1048 let session_id = self.session_id.clone();
1049 let old_mode_id;
1050 {
1051 let mut state = self.state.borrow_mut();
1052 old_mode_id = state.current_mode_id.clone();
1053 state.current_mode_id = mode_id.clone();
1054 };
1055 let state = self.state.clone();
1056 cx.foreground_executor().spawn(async move {
1057 let result = connection
1058 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1059 .await;
1060
1061 if result.is_err() {
1062 state.borrow_mut().current_mode_id = old_mode_id;
1063 }
1064
1065 result?;
1066
1067 Ok(())
1068 })
1069 }
1070}
1071
1072struct AcpModelSelector {
1073 session_id: acp::SessionId,
1074 connection: Rc<acp::ClientSideConnection>,
1075 state: Rc<RefCell<acp::SessionModelState>>,
1076}
1077
1078impl AcpModelSelector {
1079 fn new(
1080 session_id: acp::SessionId,
1081 connection: Rc<acp::ClientSideConnection>,
1082 state: Rc<RefCell<acp::SessionModelState>>,
1083 ) -> Self {
1084 Self {
1085 session_id,
1086 connection,
1087 state,
1088 }
1089 }
1090}
1091
1092impl acp_thread::AgentModelSelector for AcpModelSelector {
1093 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1094 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1095 self.state
1096 .borrow()
1097 .available_models
1098 .clone()
1099 .into_iter()
1100 .map(acp_thread::AgentModelInfo::from)
1101 .collect(),
1102 )))
1103 }
1104
1105 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1106 let connection = self.connection.clone();
1107 let session_id = self.session_id.clone();
1108 let old_model_id;
1109 {
1110 let mut state = self.state.borrow_mut();
1111 old_model_id = state.current_model_id.clone();
1112 state.current_model_id = model_id.clone();
1113 };
1114 let state = self.state.clone();
1115 cx.foreground_executor().spawn(async move {
1116 let result = connection
1117 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1118 .await;
1119
1120 if result.is_err() {
1121 state.borrow_mut().current_model_id = old_model_id;
1122 }
1123
1124 result?;
1125
1126 Ok(())
1127 })
1128 }
1129
1130 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1131 let state = self.state.borrow();
1132 Task::ready(
1133 state
1134 .available_models
1135 .iter()
1136 .find(|m| m.model_id == state.current_model_id)
1137 .cloned()
1138 .map(acp_thread::AgentModelInfo::from)
1139 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1140 )
1141 }
1142}
1143
1144struct AcpSessionConfigOptions {
1145 session_id: acp::SessionId,
1146 connection: Rc<acp::ClientSideConnection>,
1147 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1148 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1149 watch_rx: watch::Receiver<()>,
1150}
1151
1152impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1153 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1154 self.state.borrow().clone()
1155 }
1156
1157 fn set_config_option(
1158 &self,
1159 config_id: acp::SessionConfigId,
1160 value: acp::SessionConfigValueId,
1161 cx: &mut App,
1162 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1163 let connection = self.connection.clone();
1164 let session_id = self.session_id.clone();
1165 let state = self.state.clone();
1166
1167 let watch_tx = self.watch_tx.clone();
1168
1169 cx.foreground_executor().spawn(async move {
1170 let response = connection
1171 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1172 session_id, config_id, value,
1173 ))
1174 .await?;
1175
1176 *state.borrow_mut() = response.config_options.clone();
1177 watch_tx.borrow_mut().send(()).ok();
1178 Ok(response.config_options)
1179 })
1180 }
1181
1182 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1183 Some(self.watch_rx.clone())
1184 }
1185}
1186
1187struct ClientDelegate {
1188 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1189 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1190 cx: AsyncApp,
1191}
1192
1193#[async_trait::async_trait(?Send)]
1194impl acp::Client for ClientDelegate {
1195 async fn request_permission(
1196 &self,
1197 arguments: acp::RequestPermissionRequest,
1198 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1199 let thread;
1200 {
1201 let sessions_ref = self.sessions.borrow();
1202 let session = sessions_ref
1203 .get(&arguments.session_id)
1204 .context("Failed to get session")?;
1205 thread = session.thread.clone();
1206 }
1207
1208 let cx = &mut self.cx.clone();
1209
1210 let task = thread.update(cx, |thread, cx| {
1211 thread.request_tool_call_authorization(
1212 arguments.tool_call,
1213 acp_thread::PermissionOptions::Flat(arguments.options),
1214 cx,
1215 )
1216 })??;
1217
1218 let outcome = task.await;
1219
1220 Ok(acp::RequestPermissionResponse::new(outcome))
1221 }
1222
1223 async fn write_text_file(
1224 &self,
1225 arguments: acp::WriteTextFileRequest,
1226 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1227 let cx = &mut self.cx.clone();
1228 let task = self
1229 .session_thread(&arguments.session_id)?
1230 .update(cx, |thread, cx| {
1231 thread.write_text_file(arguments.path, arguments.content, cx)
1232 })?;
1233
1234 task.await?;
1235
1236 Ok(Default::default())
1237 }
1238
1239 async fn read_text_file(
1240 &self,
1241 arguments: acp::ReadTextFileRequest,
1242 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1243 let task = self.session_thread(&arguments.session_id)?.update(
1244 &mut self.cx.clone(),
1245 |thread, cx| {
1246 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1247 },
1248 )?;
1249
1250 let content = task.await?;
1251
1252 Ok(acp::ReadTextFileResponse::new(content))
1253 }
1254
1255 async fn session_notification(
1256 &self,
1257 notification: acp::SessionNotification,
1258 ) -> Result<(), acp::Error> {
1259 let sessions = self.sessions.borrow();
1260 let session = sessions
1261 .get(¬ification.session_id)
1262 .context("Failed to get session")?;
1263
1264 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1265 current_mode_id,
1266 ..
1267 }) = ¬ification.update
1268 {
1269 if let Some(session_modes) = &session.session_modes {
1270 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1271 }
1272 }
1273
1274 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1275 config_options,
1276 ..
1277 }) = ¬ification.update
1278 {
1279 if let Some(opts) = &session.config_options {
1280 *opts.config_options.borrow_mut() = config_options.clone();
1281 opts.tx.borrow_mut().send(()).ok();
1282 }
1283 }
1284
1285 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1286 && let Some(session_list) = self.session_list.borrow().as_ref()
1287 {
1288 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1289 }
1290
1291 // Clone so we can inspect meta both before and after handing off to the thread
1292 let update_clone = notification.update.clone();
1293
1294 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1295 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1296 if let Some(meta) = &tc.meta {
1297 if let Some(terminal_info) = meta.get("terminal_info") {
1298 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1299 {
1300 let terminal_id = acp::TerminalId::new(id_str);
1301 let cwd = terminal_info
1302 .get("cwd")
1303 .and_then(|v| v.as_str().map(PathBuf::from));
1304
1305 // Create a minimal display-only lower-level terminal and register it.
1306 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1307 let builder = TerminalBuilder::new_display_only(
1308 CursorShape::default(),
1309 AlternateScroll::On,
1310 None,
1311 0,
1312 cx.background_executor(),
1313 thread.project().read(cx).path_style(cx),
1314 )?;
1315 let lower = cx.new(|cx| builder.subscribe(cx));
1316 thread.on_terminal_provider_event(
1317 TerminalProviderEvent::Created {
1318 terminal_id,
1319 label: tc.title.clone(),
1320 cwd,
1321 output_byte_limit: None,
1322 terminal: lower,
1323 },
1324 cx,
1325 );
1326 anyhow::Ok(())
1327 });
1328 }
1329 }
1330 }
1331 }
1332
1333 // Forward the update to the acp_thread as usual.
1334 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1335 thread.handle_session_update(notification.update.clone(), cx)
1336 })??;
1337
1338 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1339 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1340 if let Some(meta) = &tcu.meta {
1341 if let Some(term_out) = meta.get("terminal_output") {
1342 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1343 let terminal_id = acp::TerminalId::new(id_str);
1344 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1345 let data = s.as_bytes().to_vec();
1346 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1347 thread.on_terminal_provider_event(
1348 TerminalProviderEvent::Output { terminal_id, data },
1349 cx,
1350 );
1351 });
1352 }
1353 }
1354 }
1355
1356 // terminal_exit
1357 if let Some(term_exit) = meta.get("terminal_exit") {
1358 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1359 let terminal_id = acp::TerminalId::new(id_str);
1360 let status = acp::TerminalExitStatus::new()
1361 .exit_code(
1362 term_exit
1363 .get("exit_code")
1364 .and_then(|v| v.as_u64())
1365 .map(|i| i as u32),
1366 )
1367 .signal(
1368 term_exit
1369 .get("signal")
1370 .and_then(|v| v.as_str().map(|s| s.to_string())),
1371 );
1372
1373 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1374 thread.on_terminal_provider_event(
1375 TerminalProviderEvent::Exit {
1376 terminal_id,
1377 status,
1378 },
1379 cx,
1380 );
1381 });
1382 }
1383 }
1384 }
1385 }
1386
1387 Ok(())
1388 }
1389
1390 async fn create_terminal(
1391 &self,
1392 args: acp::CreateTerminalRequest,
1393 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1394 let thread = self.session_thread(&args.session_id)?;
1395 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1396
1397 let terminal_entity = acp_thread::create_terminal_entity(
1398 args.command.clone(),
1399 &args.args,
1400 args.env
1401 .into_iter()
1402 .map(|env| (env.name, env.value))
1403 .collect(),
1404 args.cwd.clone(),
1405 &project,
1406 &mut self.cx.clone(),
1407 )
1408 .await?;
1409
1410 // Register with renderer
1411 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1412 thread.register_terminal_created(
1413 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1414 format!("{} {}", args.command, args.args.join(" ")),
1415 args.cwd.clone(),
1416 args.output_byte_limit,
1417 terminal_entity,
1418 cx,
1419 )
1420 })?;
1421 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1422 Ok(acp::CreateTerminalResponse::new(terminal_id))
1423 }
1424
1425 async fn kill_terminal(
1426 &self,
1427 args: acp::KillTerminalRequest,
1428 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1429 self.session_thread(&args.session_id)?
1430 .update(&mut self.cx.clone(), |thread, cx| {
1431 thread.kill_terminal(args.terminal_id, cx)
1432 })??;
1433
1434 Ok(Default::default())
1435 }
1436
1437 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1438 Err(acp::Error::method_not_found())
1439 }
1440
1441 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1442 Err(acp::Error::method_not_found())
1443 }
1444
1445 async fn release_terminal(
1446 &self,
1447 args: acp::ReleaseTerminalRequest,
1448 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1449 self.session_thread(&args.session_id)?
1450 .update(&mut self.cx.clone(), |thread, cx| {
1451 thread.release_terminal(args.terminal_id, cx)
1452 })??;
1453
1454 Ok(Default::default())
1455 }
1456
1457 async fn terminal_output(
1458 &self,
1459 args: acp::TerminalOutputRequest,
1460 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1461 self.session_thread(&args.session_id)?
1462 .read_with(&mut self.cx.clone(), |thread, cx| {
1463 let out = thread
1464 .terminal(args.terminal_id)?
1465 .read(cx)
1466 .current_output(cx);
1467
1468 Ok(out)
1469 })?
1470 }
1471
1472 async fn wait_for_terminal_exit(
1473 &self,
1474 args: acp::WaitForTerminalExitRequest,
1475 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1476 let exit_status = self
1477 .session_thread(&args.session_id)?
1478 .update(&mut self.cx.clone(), |thread, cx| {
1479 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1480 })??
1481 .await;
1482
1483 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1484 }
1485}
1486
1487impl ClientDelegate {
1488 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1489 let sessions = self.sessions.borrow();
1490 sessions
1491 .get(session_id)
1492 .context("Failed to get session")
1493 .map(|session| session.thread.clone())
1494 }
1495}