1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
5use anyhow::anyhow;
6use collections::HashMap;
7use futures::AsyncBufReadExt as _;
8use futures::io::BufReader;
9use project::Project;
10use project::agent_server_store::AgentServerCommand;
11use serde::Deserialize;
12use settings::Settings as _;
13use task::ShellBuilder;
14#[cfg(windows)]
15use task::ShellKind;
16use util::ResultExt as _;
17
18use std::path::PathBuf;
19use std::{any::Any, cell::RefCell};
20use std::{path::Path, rc::Rc};
21use thiserror::Error;
22
23use anyhow::{Context as _, Result};
24use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
25
26use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
27use terminal::TerminalBuilder;
28use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
29
30#[derive(Debug, Error)]
31#[error("Unsupported version")]
32pub struct UnsupportedVersion;
33
34pub struct AcpConnection {
35 server_name: SharedString,
36 telemetry_id: SharedString,
37 connection: Rc<acp::ClientSideConnection>,
38 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
39 auth_methods: Vec<acp::AuthMethod>,
40 agent_capabilities: acp::AgentCapabilities,
41 default_mode: Option<acp::SessionModeId>,
42 default_model: Option<acp::ModelId>,
43 root_dir: PathBuf,
44 // NB: Don't move this into the wait_task, since we need to ensure the process is
45 // killed on drop (setting kill_on_drop on the command seems to not always work).
46 child: smol::process::Child,
47 _io_task: Task<Result<(), acp::Error>>,
48 _wait_task: Task<Result<()>>,
49 _stderr_task: Task<Result<()>>,
50}
51
52pub struct AcpSession {
53 thread: WeakEntity<AcpThread>,
54 suppress_abort_err: bool,
55 models: Option<Rc<RefCell<acp::SessionModelState>>>,
56 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
57}
58
59pub async fn connect(
60 server_name: SharedString,
61 command: AgentServerCommand,
62 root_dir: &Path,
63 default_mode: Option<acp::SessionModeId>,
64 default_model: Option<acp::ModelId>,
65 is_remote: bool,
66 cx: &mut AsyncApp,
67) -> Result<Rc<dyn AgentConnection>> {
68 let conn = AcpConnection::stdio(
69 server_name,
70 command.clone(),
71 root_dir,
72 default_mode,
73 default_model,
74 is_remote,
75 cx,
76 )
77 .await?;
78 Ok(Rc::new(conn) as _)
79}
80
81const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
82
83impl AcpConnection {
84 pub async fn stdio(
85 server_name: SharedString,
86 command: AgentServerCommand,
87 root_dir: &Path,
88 default_mode: Option<acp::SessionModeId>,
89 default_model: Option<acp::ModelId>,
90 is_remote: bool,
91 cx: &mut AsyncApp,
92 ) -> Result<Self> {
93 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone())?;
94 let builder = ShellBuilder::new(&shell, cfg!(windows));
95 #[cfg(windows)]
96 let kind = builder.kind();
97 let (cmd, args) = builder.build(Some(command.path.display().to_string()), &command.args);
98
99 let mut child = util::command::new_smol_command(cmd);
100 #[cfg(windows)]
101 if kind == ShellKind::Cmd {
102 use smol::process::windows::CommandExt;
103 for arg in args {
104 child.raw_arg(arg);
105 }
106 } else {
107 child.args(args);
108 }
109 #[cfg(not(windows))]
110 child.args(args);
111
112 child
113 .envs(command.env.iter().flatten())
114 .stdin(std::process::Stdio::piped())
115 .stdout(std::process::Stdio::piped())
116 .stderr(std::process::Stdio::piped());
117 if !is_remote {
118 child.current_dir(root_dir);
119 }
120 let mut child = child.spawn()?;
121
122 let stdout = child.stdout.take().context("Failed to take stdout")?;
123 let stdin = child.stdin.take().context("Failed to take stdin")?;
124 let stderr = child.stderr.take().context("Failed to take stderr")?;
125 log::debug!(
126 "Spawning external agent server: {:?}, {:?}",
127 command.path,
128 command.args
129 );
130 log::trace!("Spawned (pid: {})", child.id());
131
132 let sessions = Rc::new(RefCell::new(HashMap::default()));
133
134 let (release_channel, version) = cx.update(|cx| {
135 (
136 release_channel::ReleaseChannel::try_global(cx)
137 .map(|release_channel| release_channel.display_name()),
138 release_channel::AppVersion::global(cx).to_string(),
139 )
140 })?;
141
142 let client = ClientDelegate {
143 sessions: sessions.clone(),
144 cx: cx.clone(),
145 };
146 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
147 let foreground_executor = cx.foreground_executor().clone();
148 move |fut| {
149 foreground_executor.spawn(fut).detach();
150 }
151 });
152
153 let io_task = cx.background_spawn(io_task);
154
155 let stderr_task = cx.background_spawn(async move {
156 let mut stderr = BufReader::new(stderr);
157 let mut line = String::new();
158 while let Ok(n) = stderr.read_line(&mut line).await
159 && n > 0
160 {
161 log::warn!("agent stderr: {}", line.trim());
162 line.clear();
163 }
164 Ok(())
165 });
166
167 let wait_task = cx.spawn({
168 let sessions = sessions.clone();
169 let status_fut = child.status();
170 async move |cx| {
171 let status = status_fut.await?;
172
173 for session in sessions.borrow().values() {
174 session
175 .thread
176 .update(cx, |thread, cx| {
177 thread.emit_load_error(LoadError::Exited { status }, cx)
178 })
179 .ok();
180 }
181
182 anyhow::Ok(())
183 }
184 });
185
186 let connection = Rc::new(connection);
187
188 cx.update(|cx| {
189 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
190 registry.set_active_connection(server_name.clone(), &connection, cx)
191 });
192 })?;
193
194 let response = connection
195 .initialize(
196 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
197 .client_capabilities(
198 acp::ClientCapabilities::new()
199 .fs(acp::FileSystemCapability::new()
200 .read_text_file(true)
201 .write_text_file(true))
202 .terminal(true)
203 // Experimental: Allow for rendering terminal output from the agents
204 .meta(acp::Meta::from_iter([
205 ("terminal_output".into(), true.into()),
206 ("terminal-auth".into(), true.into()),
207 ])),
208 )
209 .client_info(
210 acp::Implementation::new("zed", version)
211 .title(release_channel.map(ToOwned::to_owned)),
212 ),
213 )
214 .await?;
215
216 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
217 return Err(UnsupportedVersion.into());
218 }
219
220 let telemetry_id = response
221 .agent_info
222 // Use the one the agent provides if we have one
223 .map(|info| info.name.into())
224 // Otherwise, just use the name
225 .unwrap_or_else(|| server_name.clone());
226
227 Ok(Self {
228 auth_methods: response.auth_methods,
229 root_dir: root_dir.to_owned(),
230 connection,
231 server_name,
232 telemetry_id,
233 sessions,
234 agent_capabilities: response.agent_capabilities,
235 default_mode,
236 default_model,
237 _io_task: io_task,
238 _wait_task: wait_task,
239 _stderr_task: stderr_task,
240 child,
241 })
242 }
243
244 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
245 &self.agent_capabilities.prompt_capabilities
246 }
247
248 pub fn root_dir(&self) -> &Path {
249 &self.root_dir
250 }
251}
252
253impl Drop for AcpConnection {
254 fn drop(&mut self) {
255 // See the comment on the child field.
256 self.child.kill().log_err();
257 }
258}
259
260impl AgentConnection for AcpConnection {
261 fn telemetry_id(&self) -> SharedString {
262 self.telemetry_id.clone()
263 }
264
265 fn new_thread(
266 self: Rc<Self>,
267 project: Entity<Project>,
268 cwd: &Path,
269 cx: &mut App,
270 ) -> Task<Result<Entity<AcpThread>>> {
271 let name = self.server_name.clone();
272 let conn = self.connection.clone();
273 let sessions = self.sessions.clone();
274 let default_mode = self.default_mode.clone();
275 let default_model = self.default_model.clone();
276 let cwd = cwd.to_path_buf();
277 let context_server_store = project.read(cx).context_server_store().read(cx);
278 let mcp_servers = if project.read(cx).is_local() {
279 context_server_store
280 .configured_server_ids()
281 .iter()
282 .filter_map(|id| {
283 let configuration = context_server_store.configuration_for_server(id)?;
284 match &*configuration {
285 project::context_server_store::ContextServerConfiguration::Custom {
286 command,
287 ..
288 }
289 | project::context_server_store::ContextServerConfiguration::Extension {
290 command,
291 ..
292 } => Some(acp::McpServer::Stdio(
293 acp::McpServerStdio::new(id.0.to_string(), &command.path)
294 .args(command.args.clone())
295 .env(if let Some(env) = command.env.as_ref() {
296 env.iter()
297 .map(|(name, value)| acp::EnvVariable::new(name, value))
298 .collect()
299 } else {
300 vec![]
301 }),
302 )),
303 project::context_server_store::ContextServerConfiguration::Http {
304 url,
305 headers,
306 } => Some(acp::McpServer::Http(
307 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
308 headers
309 .iter()
310 .map(|(name, value)| acp::HttpHeader::new(name, value))
311 .collect(),
312 ),
313 )),
314 }
315 })
316 .collect()
317 } else {
318 // In SSH projects, the external agent is running on the remote
319 // machine, and currently we only run MCP servers on the local
320 // machine. So don't pass any MCP servers to the agent in that case.
321 Vec::new()
322 };
323
324 cx.spawn(async move |cx| {
325 let response = conn
326 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
327 .await
328 .map_err(|err| {
329 if err.code == acp::ErrorCode::AuthRequired {
330 let mut error = AuthRequired::new();
331
332 if err.message != acp::ErrorCode::AuthRequired.to_string() {
333 error = error.with_description(err.message);
334 }
335
336 anyhow!(error)
337 } else {
338 anyhow!(err)
339 }
340 })?;
341
342 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
343 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
344
345 if let Some(default_mode) = default_mode {
346 if let Some(modes) = modes.as_ref() {
347 let mut modes_ref = modes.borrow_mut();
348 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
349
350 if has_mode {
351 let initial_mode_id = modes_ref.current_mode_id.clone();
352
353 cx.spawn({
354 let default_mode = default_mode.clone();
355 let session_id = response.session_id.clone();
356 let modes = modes.clone();
357 let conn = conn.clone();
358 async move |_| {
359 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
360 .await.log_err();
361
362 if result.is_none() {
363 modes.borrow_mut().current_mode_id = initial_mode_id;
364 }
365 }
366 }).detach();
367
368 modes_ref.current_mode_id = default_mode;
369 } else {
370 let available_modes = modes_ref
371 .available_modes
372 .iter()
373 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
374 .collect::<Vec<_>>()
375 .join("\n");
376
377 log::warn!(
378 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
379 );
380 }
381 } else {
382 log::warn!(
383 "`{name}` does not support modes, but `default_mode` was set in settings.",
384 );
385 }
386 }
387
388 if let Some(default_model) = default_model {
389 if let Some(models) = models.as_ref() {
390 let mut models_ref = models.borrow_mut();
391 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
392
393 if has_model {
394 let initial_model_id = models_ref.current_model_id.clone();
395
396 cx.spawn({
397 let default_model = default_model.clone();
398 let session_id = response.session_id.clone();
399 let models = models.clone();
400 let conn = conn.clone();
401 async move |_| {
402 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
403 .await.log_err();
404
405 if result.is_none() {
406 models.borrow_mut().current_model_id = initial_model_id;
407 }
408 }
409 }).detach();
410
411 models_ref.current_model_id = default_model;
412 } else {
413 let available_models = models_ref
414 .available_models
415 .iter()
416 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
417 .collect::<Vec<_>>()
418 .join("\n");
419
420 log::warn!(
421 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
422 );
423 }
424 } else {
425 log::warn!(
426 "`{name}` does not support model selection, but `default_model` was set in settings.",
427 );
428 }
429 }
430
431 let session_id = response.session_id;
432 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
433 let thread = cx.new(|cx| {
434 AcpThread::new(
435 self.server_name.clone(),
436 self.clone(),
437 project,
438 action_log,
439 session_id.clone(),
440 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
441 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
442 cx,
443 )
444 })?;
445
446
447 let session = AcpSession {
448 thread: thread.downgrade(),
449 suppress_abort_err: false,
450 session_modes: modes,
451 models,
452 };
453 sessions.borrow_mut().insert(session_id, session);
454
455 Ok(thread)
456 })
457 }
458
459 fn auth_methods(&self) -> &[acp::AuthMethod] {
460 &self.auth_methods
461 }
462
463 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
464 let conn = self.connection.clone();
465 cx.foreground_executor().spawn(async move {
466 conn.authenticate(acp::AuthenticateRequest::new(method_id))
467 .await?;
468 Ok(())
469 })
470 }
471
472 fn prompt(
473 &self,
474 _id: Option<acp_thread::UserMessageId>,
475 params: acp::PromptRequest,
476 cx: &mut App,
477 ) -> Task<Result<acp::PromptResponse>> {
478 let conn = self.connection.clone();
479 let sessions = self.sessions.clone();
480 let session_id = params.session_id.clone();
481 cx.foreground_executor().spawn(async move {
482 let result = conn.prompt(params).await;
483
484 let mut suppress_abort_err = false;
485
486 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
487 suppress_abort_err = session.suppress_abort_err;
488 session.suppress_abort_err = false;
489 }
490
491 match result {
492 Ok(response) => Ok(response),
493 Err(err) => {
494 if err.code == acp::ErrorCode::AuthRequired {
495 return Err(anyhow!(acp::Error::auth_required()));
496 }
497
498 if err.code != ErrorCode::InternalError {
499 anyhow::bail!(err)
500 }
501
502 let Some(data) = &err.data else {
503 anyhow::bail!(err)
504 };
505
506 // Temporary workaround until the following PR is generally available:
507 // https://github.com/google-gemini/gemini-cli/pull/6656
508
509 #[derive(Deserialize)]
510 #[serde(deny_unknown_fields)]
511 struct ErrorDetails {
512 details: Box<str>,
513 }
514
515 match serde_json::from_value(data.clone()) {
516 Ok(ErrorDetails { details }) => {
517 if suppress_abort_err
518 && (details.contains("This operation was aborted")
519 || details.contains("The user aborted a request"))
520 {
521 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
522 } else {
523 Err(anyhow!(details))
524 }
525 }
526 Err(_) => Err(anyhow!(err)),
527 }
528 }
529 }
530 })
531 }
532
533 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
534 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
535 session.suppress_abort_err = true;
536 }
537 let conn = self.connection.clone();
538 let params = acp::CancelNotification::new(session_id.clone());
539 cx.foreground_executor()
540 .spawn(async move { conn.cancel(params).await })
541 .detach();
542 }
543
544 fn session_modes(
545 &self,
546 session_id: &acp::SessionId,
547 _cx: &App,
548 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
549 let sessions = self.sessions.clone();
550 let sessions_ref = sessions.borrow();
551 let Some(session) = sessions_ref.get(session_id) else {
552 return None;
553 };
554
555 if let Some(modes) = session.session_modes.as_ref() {
556 Some(Rc::new(AcpSessionModes {
557 connection: self.connection.clone(),
558 session_id: session_id.clone(),
559 state: modes.clone(),
560 }) as _)
561 } else {
562 None
563 }
564 }
565
566 fn model_selector(
567 &self,
568 session_id: &acp::SessionId,
569 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
570 let sessions = self.sessions.clone();
571 let sessions_ref = sessions.borrow();
572 let Some(session) = sessions_ref.get(session_id) else {
573 return None;
574 };
575
576 if let Some(models) = session.models.as_ref() {
577 Some(Rc::new(AcpModelSelector::new(
578 session_id.clone(),
579 self.connection.clone(),
580 models.clone(),
581 )) as _)
582 } else {
583 None
584 }
585 }
586
587 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
588 self
589 }
590}
591
592struct AcpSessionModes {
593 session_id: acp::SessionId,
594 connection: Rc<acp::ClientSideConnection>,
595 state: Rc<RefCell<acp::SessionModeState>>,
596}
597
598impl acp_thread::AgentSessionModes for AcpSessionModes {
599 fn current_mode(&self) -> acp::SessionModeId {
600 self.state.borrow().current_mode_id.clone()
601 }
602
603 fn all_modes(&self) -> Vec<acp::SessionMode> {
604 self.state.borrow().available_modes.clone()
605 }
606
607 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
608 let connection = self.connection.clone();
609 let session_id = self.session_id.clone();
610 let old_mode_id;
611 {
612 let mut state = self.state.borrow_mut();
613 old_mode_id = state.current_mode_id.clone();
614 state.current_mode_id = mode_id.clone();
615 };
616 let state = self.state.clone();
617 cx.foreground_executor().spawn(async move {
618 let result = connection
619 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
620 .await;
621
622 if result.is_err() {
623 state.borrow_mut().current_mode_id = old_mode_id;
624 }
625
626 result?;
627
628 Ok(())
629 })
630 }
631}
632
633struct AcpModelSelector {
634 session_id: acp::SessionId,
635 connection: Rc<acp::ClientSideConnection>,
636 state: Rc<RefCell<acp::SessionModelState>>,
637}
638
639impl AcpModelSelector {
640 fn new(
641 session_id: acp::SessionId,
642 connection: Rc<acp::ClientSideConnection>,
643 state: Rc<RefCell<acp::SessionModelState>>,
644 ) -> Self {
645 Self {
646 session_id,
647 connection,
648 state,
649 }
650 }
651}
652
653impl acp_thread::AgentModelSelector for AcpModelSelector {
654 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
655 Task::ready(Ok(acp_thread::AgentModelList::Flat(
656 self.state
657 .borrow()
658 .available_models
659 .clone()
660 .into_iter()
661 .map(acp_thread::AgentModelInfo::from)
662 .collect(),
663 )))
664 }
665
666 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
667 let connection = self.connection.clone();
668 let session_id = self.session_id.clone();
669 let old_model_id;
670 {
671 let mut state = self.state.borrow_mut();
672 old_model_id = state.current_model_id.clone();
673 state.current_model_id = model_id.clone();
674 };
675 let state = self.state.clone();
676 cx.foreground_executor().spawn(async move {
677 let result = connection
678 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
679 .await;
680
681 if result.is_err() {
682 state.borrow_mut().current_model_id = old_model_id;
683 }
684
685 result?;
686
687 Ok(())
688 })
689 }
690
691 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
692 let state = self.state.borrow();
693 Task::ready(
694 state
695 .available_models
696 .iter()
697 .find(|m| m.model_id == state.current_model_id)
698 .cloned()
699 .map(acp_thread::AgentModelInfo::from)
700 .ok_or_else(|| anyhow::anyhow!("Model not found")),
701 )
702 }
703}
704
705struct ClientDelegate {
706 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
707 cx: AsyncApp,
708}
709
710#[async_trait::async_trait(?Send)]
711impl acp::Client for ClientDelegate {
712 async fn request_permission(
713 &self,
714 arguments: acp::RequestPermissionRequest,
715 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
716 let respect_always_allow_setting;
717 let thread;
718 {
719 let sessions_ref = self.sessions.borrow();
720 let session = sessions_ref
721 .get(&arguments.session_id)
722 .context("Failed to get session")?;
723 respect_always_allow_setting = session.session_modes.is_none();
724 thread = session.thread.clone();
725 }
726
727 let cx = &mut self.cx.clone();
728
729 let task = thread.update(cx, |thread, cx| {
730 thread.request_tool_call_authorization(
731 arguments.tool_call,
732 arguments.options,
733 respect_always_allow_setting,
734 cx,
735 )
736 })??;
737
738 let outcome = task.await;
739
740 Ok(acp::RequestPermissionResponse::new(outcome))
741 }
742
743 async fn write_text_file(
744 &self,
745 arguments: acp::WriteTextFileRequest,
746 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
747 let cx = &mut self.cx.clone();
748 let task = self
749 .session_thread(&arguments.session_id)?
750 .update(cx, |thread, cx| {
751 thread.write_text_file(arguments.path, arguments.content, cx)
752 })?;
753
754 task.await?;
755
756 Ok(Default::default())
757 }
758
759 async fn read_text_file(
760 &self,
761 arguments: acp::ReadTextFileRequest,
762 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
763 let task = self.session_thread(&arguments.session_id)?.update(
764 &mut self.cx.clone(),
765 |thread, cx| {
766 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
767 },
768 )?;
769
770 let content = task.await?;
771
772 Ok(acp::ReadTextFileResponse::new(content))
773 }
774
775 async fn session_notification(
776 &self,
777 notification: acp::SessionNotification,
778 ) -> Result<(), acp::Error> {
779 let sessions = self.sessions.borrow();
780 let session = sessions
781 .get(¬ification.session_id)
782 .context("Failed to get session")?;
783
784 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
785 current_mode_id,
786 ..
787 }) = ¬ification.update
788 {
789 if let Some(session_modes) = &session.session_modes {
790 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
791 } else {
792 log::error!(
793 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
794 );
795 }
796 }
797
798 // Clone so we can inspect meta both before and after handing off to the thread
799 let update_clone = notification.update.clone();
800
801 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
802 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
803 if let Some(meta) = &tc.meta {
804 if let Some(terminal_info) = meta.get("terminal_info") {
805 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
806 {
807 let terminal_id = acp::TerminalId::new(id_str);
808 let cwd = terminal_info
809 .get("cwd")
810 .and_then(|v| v.as_str().map(PathBuf::from));
811
812 // Create a minimal display-only lower-level terminal and register it.
813 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
814 let builder = TerminalBuilder::new_display_only(
815 CursorShape::default(),
816 AlternateScroll::On,
817 None,
818 0,
819 )?;
820 let lower = cx.new(|cx| builder.subscribe(cx));
821 thread.on_terminal_provider_event(
822 TerminalProviderEvent::Created {
823 terminal_id,
824 label: tc.title.clone(),
825 cwd,
826 output_byte_limit: None,
827 terminal: lower,
828 },
829 cx,
830 );
831 anyhow::Ok(())
832 });
833 }
834 }
835 }
836 }
837
838 // Forward the update to the acp_thread as usual.
839 session.thread.update(&mut self.cx.clone(), |thread, cx| {
840 thread.handle_session_update(notification.update.clone(), cx)
841 })??;
842
843 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
844 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
845 if let Some(meta) = &tcu.meta {
846 if let Some(term_out) = meta.get("terminal_output") {
847 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
848 let terminal_id = acp::TerminalId::new(id_str);
849 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
850 let data = s.as_bytes().to_vec();
851 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
852 thread.on_terminal_provider_event(
853 TerminalProviderEvent::Output { terminal_id, data },
854 cx,
855 );
856 });
857 }
858 }
859 }
860
861 // terminal_exit
862 if let Some(term_exit) = meta.get("terminal_exit") {
863 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
864 let terminal_id = acp::TerminalId::new(id_str);
865 let status = acp::TerminalExitStatus::new()
866 .exit_code(
867 term_exit
868 .get("exit_code")
869 .and_then(|v| v.as_u64())
870 .map(|i| i as u32),
871 )
872 .signal(
873 term_exit
874 .get("signal")
875 .and_then(|v| v.as_str().map(|s| s.to_string())),
876 );
877
878 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
879 thread.on_terminal_provider_event(
880 TerminalProviderEvent::Exit {
881 terminal_id,
882 status,
883 },
884 cx,
885 );
886 });
887 }
888 }
889 }
890 }
891
892 Ok(())
893 }
894
895 async fn create_terminal(
896 &self,
897 args: acp::CreateTerminalRequest,
898 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
899 let thread = self.session_thread(&args.session_id)?;
900 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
901
902 let terminal_entity = acp_thread::create_terminal_entity(
903 args.command.clone(),
904 &args.args,
905 args.env
906 .into_iter()
907 .map(|env| (env.name, env.value))
908 .collect(),
909 args.cwd.clone(),
910 &project,
911 &mut self.cx.clone(),
912 )
913 .await?;
914
915 // Register with renderer
916 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
917 thread.register_terminal_created(
918 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
919 format!("{} {}", args.command, args.args.join(" ")),
920 args.cwd.clone(),
921 args.output_byte_limit,
922 terminal_entity,
923 cx,
924 )
925 })?;
926 let terminal_id =
927 terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
928 Ok(acp::CreateTerminalResponse::new(terminal_id))
929 }
930
931 async fn kill_terminal_command(
932 &self,
933 args: acp::KillTerminalCommandRequest,
934 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
935 self.session_thread(&args.session_id)?
936 .update(&mut self.cx.clone(), |thread, cx| {
937 thread.kill_terminal(args.terminal_id, cx)
938 })??;
939
940 Ok(Default::default())
941 }
942
943 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
944 Err(acp::Error::method_not_found())
945 }
946
947 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
948 Err(acp::Error::method_not_found())
949 }
950
951 async fn release_terminal(
952 &self,
953 args: acp::ReleaseTerminalRequest,
954 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
955 self.session_thread(&args.session_id)?
956 .update(&mut self.cx.clone(), |thread, cx| {
957 thread.release_terminal(args.terminal_id, cx)
958 })??;
959
960 Ok(Default::default())
961 }
962
963 async fn terminal_output(
964 &self,
965 args: acp::TerminalOutputRequest,
966 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
967 self.session_thread(&args.session_id)?
968 .read_with(&mut self.cx.clone(), |thread, cx| {
969 let out = thread
970 .terminal(args.terminal_id)?
971 .read(cx)
972 .current_output(cx);
973
974 Ok(out)
975 })?
976 }
977
978 async fn wait_for_terminal_exit(
979 &self,
980 args: acp::WaitForTerminalExitRequest,
981 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
982 let exit_status = self
983 .session_thread(&args.session_id)?
984 .update(&mut self.cx.clone(), |thread, cx| {
985 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
986 })??
987 .await;
988
989 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
990 }
991}
992
993impl ClientDelegate {
994 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
995 let sessions = self.sessions.borrow();
996 sessions
997 .get(session_id)
998 .context("Failed to get session")
999 .map(|session| session.thread.clone())
1000 }
1001}