1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
5use anyhow::anyhow;
6use collections::HashMap;
7use futures::AsyncBufReadExt as _;
8use futures::io::BufReader;
9use project::Project;
10use project::agent_server_store::AgentServerCommand;
11use serde::Deserialize;
12use util::ResultExt as _;
13
14use std::path::PathBuf;
15use std::{any::Any, cell::RefCell};
16use std::{path::Path, rc::Rc};
17use thiserror::Error;
18
19use anyhow::{Context as _, Result};
20use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
21
22use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
23use terminal::TerminalBuilder;
24use terminal::terminal_settings::{AlternateScroll, CursorShape};
25
26#[derive(Debug, Error)]
27#[error("Unsupported version")]
28pub struct UnsupportedVersion;
29
30pub struct AcpConnection {
31 server_name: SharedString,
32 telemetry_id: SharedString,
33 connection: Rc<acp::ClientSideConnection>,
34 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
35 auth_methods: Vec<acp::AuthMethod>,
36 agent_capabilities: acp::AgentCapabilities,
37 default_mode: Option<acp::SessionModeId>,
38 default_model: Option<acp::ModelId>,
39 root_dir: PathBuf,
40 // NB: Don't move this into the wait_task, since we need to ensure the process is
41 // killed on drop (setting kill_on_drop on the command seems to not always work).
42 child: smol::process::Child,
43 _io_task: Task<Result<(), acp::Error>>,
44 _wait_task: Task<Result<()>>,
45 _stderr_task: Task<Result<()>>,
46}
47
48pub struct AcpSession {
49 thread: WeakEntity<AcpThread>,
50 suppress_abort_err: bool,
51 models: Option<Rc<RefCell<acp::SessionModelState>>>,
52 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
53}
54
55pub async fn connect(
56 server_name: SharedString,
57 command: AgentServerCommand,
58 root_dir: &Path,
59 default_mode: Option<acp::SessionModeId>,
60 default_model: Option<acp::ModelId>,
61 is_remote: bool,
62 cx: &mut AsyncApp,
63) -> Result<Rc<dyn AgentConnection>> {
64 let conn = AcpConnection::stdio(
65 server_name,
66 command.clone(),
67 root_dir,
68 default_mode,
69 default_model,
70 is_remote,
71 cx,
72 )
73 .await?;
74 Ok(Rc::new(conn) as _)
75}
76
77const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
78
79impl AcpConnection {
80 pub async fn stdio(
81 server_name: SharedString,
82 command: AgentServerCommand,
83 root_dir: &Path,
84 default_mode: Option<acp::SessionModeId>,
85 default_model: Option<acp::ModelId>,
86 is_remote: bool,
87 cx: &mut AsyncApp,
88 ) -> Result<Self> {
89 let mut child = util::command::new_smol_command(&command.path);
90 child
91 .args(command.args.iter().map(|arg| arg.as_str()))
92 .envs(command.env.iter().flatten())
93 .stdin(std::process::Stdio::piped())
94 .stdout(std::process::Stdio::piped())
95 .stderr(std::process::Stdio::piped());
96 if !is_remote {
97 child.current_dir(root_dir);
98 }
99 let mut child = child.spawn()?;
100
101 let stdout = child.stdout.take().context("Failed to take stdout")?;
102 let stdin = child.stdin.take().context("Failed to take stdin")?;
103 let stderr = child.stderr.take().context("Failed to take stderr")?;
104 log::debug!(
105 "Spawning external agent server: {:?}, {:?}",
106 command.path,
107 command.args
108 );
109 log::trace!("Spawned (pid: {})", child.id());
110
111 let sessions = Rc::new(RefCell::new(HashMap::default()));
112
113 let (release_channel, version) = cx.update(|cx| {
114 (
115 release_channel::ReleaseChannel::try_global(cx)
116 .map(|release_channel| release_channel.display_name()),
117 release_channel::AppVersion::global(cx).to_string(),
118 )
119 })?;
120
121 let client = ClientDelegate {
122 sessions: sessions.clone(),
123 cx: cx.clone(),
124 };
125 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
126 let foreground_executor = cx.foreground_executor().clone();
127 move |fut| {
128 foreground_executor.spawn(fut).detach();
129 }
130 });
131
132 let io_task = cx.background_spawn(io_task);
133
134 let stderr_task = cx.background_spawn(async move {
135 let mut stderr = BufReader::new(stderr);
136 let mut line = String::new();
137 while let Ok(n) = stderr.read_line(&mut line).await
138 && n > 0
139 {
140 log::warn!("agent stderr: {}", line.trim());
141 line.clear();
142 }
143 Ok(())
144 });
145
146 let wait_task = cx.spawn({
147 let sessions = sessions.clone();
148 let status_fut = child.status();
149 async move |cx| {
150 let status = status_fut.await?;
151
152 for session in sessions.borrow().values() {
153 session
154 .thread
155 .update(cx, |thread, cx| {
156 thread.emit_load_error(LoadError::Exited { status }, cx)
157 })
158 .ok();
159 }
160
161 anyhow::Ok(())
162 }
163 });
164
165 let connection = Rc::new(connection);
166
167 cx.update(|cx| {
168 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
169 registry.set_active_connection(server_name.clone(), &connection, cx)
170 });
171 })?;
172
173 let response = connection
174 .initialize(
175 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
176 .client_capabilities(
177 acp::ClientCapabilities::new()
178 .fs(acp::FileSystemCapability::new()
179 .read_text_file(true)
180 .write_text_file(true))
181 .terminal(true)
182 // Experimental: Allow for rendering terminal output from the agents
183 .meta(acp::Meta::from_iter([
184 ("terminal_output".into(), true.into()),
185 ("terminal-auth".into(), true.into()),
186 ])),
187 )
188 .client_info(
189 acp::Implementation::new("zed", version)
190 .title(release_channel.map(ToOwned::to_owned)),
191 ),
192 )
193 .await?;
194
195 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
196 return Err(UnsupportedVersion.into());
197 }
198
199 let telemetry_id = response
200 .agent_info
201 // Use the one the agent provides if we have one
202 .map(|info| info.name.into())
203 // Otherwise, just use the name
204 .unwrap_or_else(|| server_name.clone());
205
206 Ok(Self {
207 auth_methods: response.auth_methods,
208 root_dir: root_dir.to_owned(),
209 connection,
210 server_name,
211 telemetry_id,
212 sessions,
213 agent_capabilities: response.agent_capabilities,
214 default_mode,
215 default_model,
216 _io_task: io_task,
217 _wait_task: wait_task,
218 _stderr_task: stderr_task,
219 child,
220 })
221 }
222
223 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
224 &self.agent_capabilities.prompt_capabilities
225 }
226
227 pub fn root_dir(&self) -> &Path {
228 &self.root_dir
229 }
230}
231
232impl Drop for AcpConnection {
233 fn drop(&mut self) {
234 // See the comment on the child field.
235 self.child.kill().log_err();
236 }
237}
238
239impl AgentConnection for AcpConnection {
240 fn telemetry_id(&self) -> SharedString {
241 self.telemetry_id.clone()
242 }
243
244 fn new_thread(
245 self: Rc<Self>,
246 project: Entity<Project>,
247 cwd: &Path,
248 cx: &mut App,
249 ) -> Task<Result<Entity<AcpThread>>> {
250 let name = self.server_name.clone();
251 let conn = self.connection.clone();
252 let sessions = self.sessions.clone();
253 let default_mode = self.default_mode.clone();
254 let default_model = self.default_model.clone();
255 let cwd = cwd.to_path_buf();
256 let context_server_store = project.read(cx).context_server_store().read(cx);
257 let mcp_servers = if project.read(cx).is_local() {
258 context_server_store
259 .configured_server_ids()
260 .iter()
261 .filter_map(|id| {
262 let configuration = context_server_store.configuration_for_server(id)?;
263 match &*configuration {
264 project::context_server_store::ContextServerConfiguration::Custom {
265 command,
266 ..
267 }
268 | project::context_server_store::ContextServerConfiguration::Extension {
269 command,
270 ..
271 } => Some(acp::McpServer::Stdio(
272 acp::McpServerStdio::new(id.0.to_string(), &command.path)
273 .args(command.args.clone())
274 .env(if let Some(env) = command.env.as_ref() {
275 env.iter()
276 .map(|(name, value)| acp::EnvVariable::new(name, value))
277 .collect()
278 } else {
279 vec![]
280 }),
281 )),
282 project::context_server_store::ContextServerConfiguration::Http {
283 url,
284 headers,
285 } => Some(acp::McpServer::Http(
286 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
287 headers
288 .iter()
289 .map(|(name, value)| acp::HttpHeader::new(name, value))
290 .collect(),
291 ),
292 )),
293 }
294 })
295 .collect()
296 } else {
297 // In SSH projects, the external agent is running on the remote
298 // machine, and currently we only run MCP servers on the local
299 // machine. So don't pass any MCP servers to the agent in that case.
300 Vec::new()
301 };
302
303 cx.spawn(async move |cx| {
304 let response = conn
305 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
306 .await
307 .map_err(|err| {
308 if err.code == acp::ErrorCode::AuthRequired {
309 let mut error = AuthRequired::new();
310
311 if err.message != acp::ErrorCode::AuthRequired.to_string() {
312 error = error.with_description(err.message);
313 }
314
315 anyhow!(error)
316 } else {
317 anyhow!(err)
318 }
319 })?;
320
321 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
322 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
323
324 if let Some(default_mode) = default_mode {
325 if let Some(modes) = modes.as_ref() {
326 let mut modes_ref = modes.borrow_mut();
327 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
328
329 if has_mode {
330 let initial_mode_id = modes_ref.current_mode_id.clone();
331
332 cx.spawn({
333 let default_mode = default_mode.clone();
334 let session_id = response.session_id.clone();
335 let modes = modes.clone();
336 let conn = conn.clone();
337 async move |_| {
338 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
339 .await.log_err();
340
341 if result.is_none() {
342 modes.borrow_mut().current_mode_id = initial_mode_id;
343 }
344 }
345 }).detach();
346
347 modes_ref.current_mode_id = default_mode;
348 } else {
349 let available_modes = modes_ref
350 .available_modes
351 .iter()
352 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
353 .collect::<Vec<_>>()
354 .join("\n");
355
356 log::warn!(
357 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
358 );
359 }
360 } else {
361 log::warn!(
362 "`{name}` does not support modes, but `default_mode` was set in settings.",
363 );
364 }
365 }
366
367 if let Some(default_model) = default_model {
368 if let Some(models) = models.as_ref() {
369 let mut models_ref = models.borrow_mut();
370 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
371
372 if has_model {
373 let initial_model_id = models_ref.current_model_id.clone();
374
375 cx.spawn({
376 let default_model = default_model.clone();
377 let session_id = response.session_id.clone();
378 let models = models.clone();
379 let conn = conn.clone();
380 async move |_| {
381 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
382 .await.log_err();
383
384 if result.is_none() {
385 models.borrow_mut().current_model_id = initial_model_id;
386 }
387 }
388 }).detach();
389
390 models_ref.current_model_id = default_model;
391 } else {
392 let available_models = models_ref
393 .available_models
394 .iter()
395 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
396 .collect::<Vec<_>>()
397 .join("\n");
398
399 log::warn!(
400 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
401 );
402 }
403 } else {
404 log::warn!(
405 "`{name}` does not support model selection, but `default_model` was set in settings.",
406 );
407 }
408 }
409
410 let session_id = response.session_id;
411 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
412 let thread = cx.new(|cx| {
413 AcpThread::new(
414 self.server_name.clone(),
415 self.clone(),
416 project,
417 action_log,
418 session_id.clone(),
419 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
420 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
421 cx,
422 )
423 })?;
424
425
426 let session = AcpSession {
427 thread: thread.downgrade(),
428 suppress_abort_err: false,
429 session_modes: modes,
430 models,
431 };
432 sessions.borrow_mut().insert(session_id, session);
433
434 Ok(thread)
435 })
436 }
437
438 fn auth_methods(&self) -> &[acp::AuthMethod] {
439 &self.auth_methods
440 }
441
442 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
443 let conn = self.connection.clone();
444 cx.foreground_executor().spawn(async move {
445 conn.authenticate(acp::AuthenticateRequest::new(method_id))
446 .await?;
447 Ok(())
448 })
449 }
450
451 fn prompt(
452 &self,
453 _id: Option<acp_thread::UserMessageId>,
454 params: acp::PromptRequest,
455 cx: &mut App,
456 ) -> Task<Result<acp::PromptResponse>> {
457 let conn = self.connection.clone();
458 let sessions = self.sessions.clone();
459 let session_id = params.session_id.clone();
460 cx.foreground_executor().spawn(async move {
461 let result = conn.prompt(params).await;
462
463 let mut suppress_abort_err = false;
464
465 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
466 suppress_abort_err = session.suppress_abort_err;
467 session.suppress_abort_err = false;
468 }
469
470 match result {
471 Ok(response) => Ok(response),
472 Err(err) => {
473 if err.code == acp::ErrorCode::AuthRequired {
474 return Err(anyhow!(acp::Error::auth_required()));
475 }
476
477 if err.code != ErrorCode::InternalError {
478 anyhow::bail!(err)
479 }
480
481 let Some(data) = &err.data else {
482 anyhow::bail!(err)
483 };
484
485 // Temporary workaround until the following PR is generally available:
486 // https://github.com/google-gemini/gemini-cli/pull/6656
487
488 #[derive(Deserialize)]
489 #[serde(deny_unknown_fields)]
490 struct ErrorDetails {
491 details: Box<str>,
492 }
493
494 match serde_json::from_value(data.clone()) {
495 Ok(ErrorDetails { details }) => {
496 if suppress_abort_err
497 && (details.contains("This operation was aborted")
498 || details.contains("The user aborted a request"))
499 {
500 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
501 } else {
502 Err(anyhow!(details))
503 }
504 }
505 Err(_) => Err(anyhow!(err)),
506 }
507 }
508 }
509 })
510 }
511
512 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
513 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
514 session.suppress_abort_err = true;
515 }
516 let conn = self.connection.clone();
517 let params = acp::CancelNotification::new(session_id.clone());
518 cx.foreground_executor()
519 .spawn(async move { conn.cancel(params).await })
520 .detach();
521 }
522
523 fn session_modes(
524 &self,
525 session_id: &acp::SessionId,
526 _cx: &App,
527 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
528 let sessions = self.sessions.clone();
529 let sessions_ref = sessions.borrow();
530 let Some(session) = sessions_ref.get(session_id) else {
531 return None;
532 };
533
534 if let Some(modes) = session.session_modes.as_ref() {
535 Some(Rc::new(AcpSessionModes {
536 connection: self.connection.clone(),
537 session_id: session_id.clone(),
538 state: modes.clone(),
539 }) as _)
540 } else {
541 None
542 }
543 }
544
545 fn model_selector(
546 &self,
547 session_id: &acp::SessionId,
548 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
549 let sessions = self.sessions.clone();
550 let sessions_ref = sessions.borrow();
551 let Some(session) = sessions_ref.get(session_id) else {
552 return None;
553 };
554
555 if let Some(models) = session.models.as_ref() {
556 Some(Rc::new(AcpModelSelector::new(
557 session_id.clone(),
558 self.connection.clone(),
559 models.clone(),
560 )) as _)
561 } else {
562 None
563 }
564 }
565
566 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
567 self
568 }
569}
570
571struct AcpSessionModes {
572 session_id: acp::SessionId,
573 connection: Rc<acp::ClientSideConnection>,
574 state: Rc<RefCell<acp::SessionModeState>>,
575}
576
577impl acp_thread::AgentSessionModes for AcpSessionModes {
578 fn current_mode(&self) -> acp::SessionModeId {
579 self.state.borrow().current_mode_id.clone()
580 }
581
582 fn all_modes(&self) -> Vec<acp::SessionMode> {
583 self.state.borrow().available_modes.clone()
584 }
585
586 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
587 let connection = self.connection.clone();
588 let session_id = self.session_id.clone();
589 let old_mode_id;
590 {
591 let mut state = self.state.borrow_mut();
592 old_mode_id = state.current_mode_id.clone();
593 state.current_mode_id = mode_id.clone();
594 };
595 let state = self.state.clone();
596 cx.foreground_executor().spawn(async move {
597 let result = connection
598 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
599 .await;
600
601 if result.is_err() {
602 state.borrow_mut().current_mode_id = old_mode_id;
603 }
604
605 result?;
606
607 Ok(())
608 })
609 }
610}
611
612struct AcpModelSelector {
613 session_id: acp::SessionId,
614 connection: Rc<acp::ClientSideConnection>,
615 state: Rc<RefCell<acp::SessionModelState>>,
616}
617
618impl AcpModelSelector {
619 fn new(
620 session_id: acp::SessionId,
621 connection: Rc<acp::ClientSideConnection>,
622 state: Rc<RefCell<acp::SessionModelState>>,
623 ) -> Self {
624 Self {
625 session_id,
626 connection,
627 state,
628 }
629 }
630}
631
632impl acp_thread::AgentModelSelector for AcpModelSelector {
633 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
634 Task::ready(Ok(acp_thread::AgentModelList::Flat(
635 self.state
636 .borrow()
637 .available_models
638 .clone()
639 .into_iter()
640 .map(acp_thread::AgentModelInfo::from)
641 .collect(),
642 )))
643 }
644
645 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
646 let connection = self.connection.clone();
647 let session_id = self.session_id.clone();
648 let old_model_id;
649 {
650 let mut state = self.state.borrow_mut();
651 old_model_id = state.current_model_id.clone();
652 state.current_model_id = model_id.clone();
653 };
654 let state = self.state.clone();
655 cx.foreground_executor().spawn(async move {
656 let result = connection
657 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
658 .await;
659
660 if result.is_err() {
661 state.borrow_mut().current_model_id = old_model_id;
662 }
663
664 result?;
665
666 Ok(())
667 })
668 }
669
670 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
671 let state = self.state.borrow();
672 Task::ready(
673 state
674 .available_models
675 .iter()
676 .find(|m| m.model_id == state.current_model_id)
677 .cloned()
678 .map(acp_thread::AgentModelInfo::from)
679 .ok_or_else(|| anyhow::anyhow!("Model not found")),
680 )
681 }
682}
683
684struct ClientDelegate {
685 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
686 cx: AsyncApp,
687}
688
689#[async_trait::async_trait(?Send)]
690impl acp::Client for ClientDelegate {
691 async fn request_permission(
692 &self,
693 arguments: acp::RequestPermissionRequest,
694 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
695 let respect_always_allow_setting;
696 let thread;
697 {
698 let sessions_ref = self.sessions.borrow();
699 let session = sessions_ref
700 .get(&arguments.session_id)
701 .context("Failed to get session")?;
702 respect_always_allow_setting = session.session_modes.is_none();
703 thread = session.thread.clone();
704 }
705
706 let cx = &mut self.cx.clone();
707
708 let task = thread.update(cx, |thread, cx| {
709 thread.request_tool_call_authorization(
710 arguments.tool_call,
711 arguments.options,
712 respect_always_allow_setting,
713 cx,
714 )
715 })??;
716
717 let outcome = task.await;
718
719 Ok(acp::RequestPermissionResponse::new(outcome))
720 }
721
722 async fn write_text_file(
723 &self,
724 arguments: acp::WriteTextFileRequest,
725 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
726 let cx = &mut self.cx.clone();
727 let task = self
728 .session_thread(&arguments.session_id)?
729 .update(cx, |thread, cx| {
730 thread.write_text_file(arguments.path, arguments.content, cx)
731 })?;
732
733 task.await?;
734
735 Ok(Default::default())
736 }
737
738 async fn read_text_file(
739 &self,
740 arguments: acp::ReadTextFileRequest,
741 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
742 let task = self.session_thread(&arguments.session_id)?.update(
743 &mut self.cx.clone(),
744 |thread, cx| {
745 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
746 },
747 )?;
748
749 let content = task.await?;
750
751 Ok(acp::ReadTextFileResponse::new(content))
752 }
753
754 async fn session_notification(
755 &self,
756 notification: acp::SessionNotification,
757 ) -> Result<(), acp::Error> {
758 let sessions = self.sessions.borrow();
759 let session = sessions
760 .get(¬ification.session_id)
761 .context("Failed to get session")?;
762
763 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
764 current_mode_id,
765 ..
766 }) = ¬ification.update
767 {
768 if let Some(session_modes) = &session.session_modes {
769 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
770 } else {
771 log::error!(
772 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
773 );
774 }
775 }
776
777 // Clone so we can inspect meta both before and after handing off to the thread
778 let update_clone = notification.update.clone();
779
780 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
781 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
782 if let Some(meta) = &tc.meta {
783 if let Some(terminal_info) = meta.get("terminal_info") {
784 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
785 {
786 let terminal_id = acp::TerminalId::new(id_str);
787 let cwd = terminal_info
788 .get("cwd")
789 .and_then(|v| v.as_str().map(PathBuf::from));
790
791 // Create a minimal display-only lower-level terminal and register it.
792 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
793 let builder = TerminalBuilder::new_display_only(
794 CursorShape::default(),
795 AlternateScroll::On,
796 None,
797 0,
798 )?;
799 let lower = cx.new(|cx| builder.subscribe(cx));
800 thread.on_terminal_provider_event(
801 TerminalProviderEvent::Created {
802 terminal_id,
803 label: tc.title.clone(),
804 cwd,
805 output_byte_limit: None,
806 terminal: lower,
807 },
808 cx,
809 );
810 anyhow::Ok(())
811 });
812 }
813 }
814 }
815 }
816
817 // Forward the update to the acp_thread as usual.
818 session.thread.update(&mut self.cx.clone(), |thread, cx| {
819 thread.handle_session_update(notification.update.clone(), cx)
820 })??;
821
822 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
823 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
824 if let Some(meta) = &tcu.meta {
825 if let Some(term_out) = meta.get("terminal_output") {
826 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
827 let terminal_id = acp::TerminalId::new(id_str);
828 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
829 let data = s.as_bytes().to_vec();
830 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
831 thread.on_terminal_provider_event(
832 TerminalProviderEvent::Output { terminal_id, data },
833 cx,
834 );
835 });
836 }
837 }
838 }
839
840 // terminal_exit
841 if let Some(term_exit) = meta.get("terminal_exit") {
842 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
843 let terminal_id = acp::TerminalId::new(id_str);
844 let status = acp::TerminalExitStatus::new()
845 .exit_code(
846 term_exit
847 .get("exit_code")
848 .and_then(|v| v.as_u64())
849 .map(|i| i as u32),
850 )
851 .signal(
852 term_exit
853 .get("signal")
854 .and_then(|v| v.as_str().map(|s| s.to_string())),
855 );
856
857 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
858 thread.on_terminal_provider_event(
859 TerminalProviderEvent::Exit {
860 terminal_id,
861 status,
862 },
863 cx,
864 );
865 });
866 }
867 }
868 }
869 }
870
871 Ok(())
872 }
873
874 async fn create_terminal(
875 &self,
876 args: acp::CreateTerminalRequest,
877 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
878 let thread = self.session_thread(&args.session_id)?;
879 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
880
881 let terminal_entity = acp_thread::create_terminal_entity(
882 args.command.clone(),
883 &args.args,
884 args.env
885 .into_iter()
886 .map(|env| (env.name, env.value))
887 .collect(),
888 args.cwd.clone(),
889 &project,
890 &mut self.cx.clone(),
891 )
892 .await?;
893
894 // Register with renderer
895 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
896 thread.register_terminal_created(
897 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
898 format!("{} {}", args.command, args.args.join(" ")),
899 args.cwd.clone(),
900 args.output_byte_limit,
901 terminal_entity,
902 cx,
903 )
904 })?;
905 let terminal_id =
906 terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
907 Ok(acp::CreateTerminalResponse::new(terminal_id))
908 }
909
910 async fn kill_terminal_command(
911 &self,
912 args: acp::KillTerminalCommandRequest,
913 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
914 self.session_thread(&args.session_id)?
915 .update(&mut self.cx.clone(), |thread, cx| {
916 thread.kill_terminal(args.terminal_id, cx)
917 })??;
918
919 Ok(Default::default())
920 }
921
922 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
923 Err(acp::Error::method_not_found())
924 }
925
926 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
927 Err(acp::Error::method_not_found())
928 }
929
930 async fn release_terminal(
931 &self,
932 args: acp::ReleaseTerminalRequest,
933 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
934 self.session_thread(&args.session_id)?
935 .update(&mut self.cx.clone(), |thread, cx| {
936 thread.release_terminal(args.terminal_id, cx)
937 })??;
938
939 Ok(Default::default())
940 }
941
942 async fn terminal_output(
943 &self,
944 args: acp::TerminalOutputRequest,
945 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
946 self.session_thread(&args.session_id)?
947 .read_with(&mut self.cx.clone(), |thread, cx| {
948 let out = thread
949 .terminal(args.terminal_id)?
950 .read(cx)
951 .current_output(cx);
952
953 Ok(out)
954 })?
955 }
956
957 async fn wait_for_terminal_exit(
958 &self,
959 args: acp::WaitForTerminalExitRequest,
960 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
961 let exit_status = self
962 .session_thread(&args.session_id)?
963 .update(&mut self.cx.clone(), |thread, cx| {
964 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
965 })??
966 .await;
967
968 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
969 }
970}
971
972impl ClientDelegate {
973 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
974 let sessions = self.sessions.borrow();
975 sessions
976 .get(session_id)
977 .context("Failed to get session")
978 .map(|session| session.thread.clone())
979 }
980}