1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
5use anyhow::anyhow;
6use collections::HashMap;
7use futures::AsyncBufReadExt as _;
8use futures::io::BufReader;
9use project::Project;
10use project::agent_server_store::AgentServerCommand;
11use serde::Deserialize;
12use util::ResultExt as _;
13
14use std::path::PathBuf;
15use std::{any::Any, cell::RefCell};
16use std::{path::Path, rc::Rc};
17use thiserror::Error;
18
19use anyhow::{Context as _, Result};
20use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
21
22use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
23use terminal::TerminalBuilder;
24use terminal::terminal_settings::{AlternateScroll, CursorShape};
25
26#[derive(Debug, Error)]
27#[error("Unsupported version")]
28pub struct UnsupportedVersion;
29
30pub struct AcpConnection {
31 server_name: SharedString,
32 telemetry_id: &'static str,
33 connection: Rc<acp::ClientSideConnection>,
34 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
35 auth_methods: Vec<acp::AuthMethod>,
36 agent_capabilities: acp::AgentCapabilities,
37 default_mode: Option<acp::SessionModeId>,
38 default_model: Option<acp::ModelId>,
39 root_dir: PathBuf,
40 // NB: Don't move this into the wait_task, since we need to ensure the process is
41 // killed on drop (setting kill_on_drop on the command seems to not always work).
42 child: smol::process::Child,
43 _io_task: Task<Result<(), acp::Error>>,
44 _wait_task: Task<Result<()>>,
45 _stderr_task: Task<Result<()>>,
46}
47
48pub struct AcpSession {
49 thread: WeakEntity<AcpThread>,
50 suppress_abort_err: bool,
51 models: Option<Rc<RefCell<acp::SessionModelState>>>,
52 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
53}
54
55pub async fn connect(
56 server_name: SharedString,
57 telemetry_id: &'static str,
58 command: AgentServerCommand,
59 root_dir: &Path,
60 default_mode: Option<acp::SessionModeId>,
61 default_model: Option<acp::ModelId>,
62 is_remote: bool,
63 cx: &mut AsyncApp,
64) -> Result<Rc<dyn AgentConnection>> {
65 let conn = AcpConnection::stdio(
66 server_name,
67 telemetry_id,
68 command.clone(),
69 root_dir,
70 default_mode,
71 default_model,
72 is_remote,
73 cx,
74 )
75 .await?;
76 Ok(Rc::new(conn) as _)
77}
78
79const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
80
81impl AcpConnection {
82 pub async fn stdio(
83 server_name: SharedString,
84 telemetry_id: &'static str,
85 command: AgentServerCommand,
86 root_dir: &Path,
87 default_mode: Option<acp::SessionModeId>,
88 default_model: Option<acp::ModelId>,
89 is_remote: bool,
90 cx: &mut AsyncApp,
91 ) -> Result<Self> {
92 let mut child = util::command::new_smol_command(&command.path);
93 child
94 .args(command.args.iter().map(|arg| arg.as_str()))
95 .envs(command.env.iter().flatten())
96 .stdin(std::process::Stdio::piped())
97 .stdout(std::process::Stdio::piped())
98 .stderr(std::process::Stdio::piped());
99 if !is_remote {
100 child.current_dir(root_dir);
101 }
102 let mut child = child.spawn()?;
103
104 let stdout = child.stdout.take().context("Failed to take stdout")?;
105 let stdin = child.stdin.take().context("Failed to take stdin")?;
106 let stderr = child.stderr.take().context("Failed to take stderr")?;
107 log::debug!(
108 "Spawning external agent server: {:?}, {:?}",
109 command.path,
110 command.args
111 );
112 log::trace!("Spawned (pid: {})", child.id());
113
114 let sessions = Rc::new(RefCell::new(HashMap::default()));
115
116 let (release_channel, version) = cx.update(|cx| {
117 (
118 release_channel::ReleaseChannel::try_global(cx)
119 .map(|release_channel| release_channel.display_name()),
120 release_channel::AppVersion::global(cx).to_string(),
121 )
122 })?;
123
124 let client = ClientDelegate {
125 sessions: sessions.clone(),
126 cx: cx.clone(),
127 };
128 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
129 let foreground_executor = cx.foreground_executor().clone();
130 move |fut| {
131 foreground_executor.spawn(fut).detach();
132 }
133 });
134
135 let io_task = cx.background_spawn(io_task);
136
137 let stderr_task = cx.background_spawn(async move {
138 let mut stderr = BufReader::new(stderr);
139 let mut line = String::new();
140 while let Ok(n) = stderr.read_line(&mut line).await
141 && n > 0
142 {
143 log::warn!("agent stderr: {}", line.trim());
144 line.clear();
145 }
146 Ok(())
147 });
148
149 let wait_task = cx.spawn({
150 let sessions = sessions.clone();
151 let status_fut = child.status();
152 async move |cx| {
153 let status = status_fut.await?;
154
155 for session in sessions.borrow().values() {
156 session
157 .thread
158 .update(cx, |thread, cx| {
159 thread.emit_load_error(LoadError::Exited { status }, cx)
160 })
161 .ok();
162 }
163
164 anyhow::Ok(())
165 }
166 });
167
168 let connection = Rc::new(connection);
169
170 cx.update(|cx| {
171 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
172 registry.set_active_connection(server_name.clone(), &connection, cx)
173 });
174 })?;
175
176 let mut client_info = acp::Implementation::new("zed", version);
177 if let Some(release_channel) = release_channel {
178 client_info = client_info.title(release_channel);
179 }
180 let response = connection
181 .initialize(
182 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
183 .client_capabilities(
184 acp::ClientCapabilities::new()
185 .fs(acp::FileSystemCapability::new()
186 .read_text_file(true)
187 .write_text_file(true))
188 .terminal(true)
189 // Experimental: Allow for rendering terminal output from the agents
190 .meta(acp::Meta::from_iter([
191 ("terminal_output".into(), true.into()),
192 ("terminal-auth".into(), true.into()),
193 ])),
194 )
195 .client_info(client_info),
196 )
197 .await?;
198
199 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
200 return Err(UnsupportedVersion.into());
201 }
202
203 Ok(Self {
204 auth_methods: response.auth_methods,
205 root_dir: root_dir.to_owned(),
206 connection,
207 server_name,
208 telemetry_id,
209 sessions,
210 agent_capabilities: response.agent_capabilities,
211 default_mode,
212 default_model,
213 _io_task: io_task,
214 _wait_task: wait_task,
215 _stderr_task: stderr_task,
216 child,
217 })
218 }
219
220 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
221 &self.agent_capabilities.prompt_capabilities
222 }
223
224 pub fn root_dir(&self) -> &Path {
225 &self.root_dir
226 }
227}
228
229impl Drop for AcpConnection {
230 fn drop(&mut self) {
231 // See the comment on the child field.
232 self.child.kill().log_err();
233 }
234}
235
236impl AgentConnection for AcpConnection {
237 fn telemetry_id(&self) -> &'static str {
238 self.telemetry_id
239 }
240
241 fn new_thread(
242 self: Rc<Self>,
243 project: Entity<Project>,
244 cwd: &Path,
245 cx: &mut App,
246 ) -> Task<Result<Entity<AcpThread>>> {
247 let name = self.server_name.clone();
248 let conn = self.connection.clone();
249 let sessions = self.sessions.clone();
250 let default_mode = self.default_mode.clone();
251 let default_model = self.default_model.clone();
252 let cwd = cwd.to_path_buf();
253 let context_server_store = project.read(cx).context_server_store().read(cx);
254 let mcp_servers = if project.read(cx).is_local() {
255 context_server_store
256 .configured_server_ids()
257 .iter()
258 .filter_map(|id| {
259 let configuration = context_server_store.configuration_for_server(id)?;
260 match &*configuration {
261 project::context_server_store::ContextServerConfiguration::Custom {
262 command,
263 ..
264 }
265 | project::context_server_store::ContextServerConfiguration::Extension {
266 command,
267 ..
268 } => Some(acp::McpServer::Stdio(
269 acp::McpServerStdio::new(id.0.to_string(), &command.path)
270 .args(command.args.clone())
271 .env(if let Some(env) = command.env.as_ref() {
272 env.iter()
273 .map(|(name, value)| acp::EnvVariable::new(name, value))
274 .collect()
275 } else {
276 vec![]
277 }),
278 )),
279 project::context_server_store::ContextServerConfiguration::Http {
280 url,
281 headers,
282 } => Some(acp::McpServer::Http(
283 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
284 headers
285 .iter()
286 .map(|(name, value)| acp::HttpHeader::new(name, value))
287 .collect(),
288 ),
289 )),
290 }
291 })
292 .collect()
293 } else {
294 // In SSH projects, the external agent is running on the remote
295 // machine, and currently we only run MCP servers on the local
296 // machine. So don't pass any MCP servers to the agent in that case.
297 Vec::new()
298 };
299
300 cx.spawn(async move |cx| {
301 let response = conn
302 .new_session(acp::NewSessionRequest::new(cwd).mcp_servers(mcp_servers))
303 .await
304 .map_err(|err| {
305 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
306 let mut error = AuthRequired::new();
307
308 if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
309 error = error.with_description(err.message);
310 }
311
312 anyhow!(error)
313 } else {
314 anyhow!(err)
315 }
316 })?;
317
318 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
319 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
320
321 if let Some(default_mode) = default_mode {
322 if let Some(modes) = modes.as_ref() {
323 let mut modes_ref = modes.borrow_mut();
324 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
325
326 if has_mode {
327 let initial_mode_id = modes_ref.current_mode_id.clone();
328
329 cx.spawn({
330 let default_mode = default_mode.clone();
331 let session_id = response.session_id.clone();
332 let modes = modes.clone();
333 let conn = conn.clone();
334 async move |_| {
335 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
336 .await.log_err();
337
338 if result.is_none() {
339 modes.borrow_mut().current_mode_id = initial_mode_id;
340 }
341 }
342 }).detach();
343
344 modes_ref.current_mode_id = default_mode;
345 } else {
346 let available_modes = modes_ref
347 .available_modes
348 .iter()
349 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
350 .collect::<Vec<_>>()
351 .join("\n");
352
353 log::warn!(
354 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
355 );
356 }
357 } else {
358 log::warn!(
359 "`{name}` does not support modes, but `default_mode` was set in settings.",
360 );
361 }
362 }
363
364 if let Some(default_model) = default_model {
365 if let Some(models) = models.as_ref() {
366 let mut models_ref = models.borrow_mut();
367 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
368
369 if has_model {
370 let initial_model_id = models_ref.current_model_id.clone();
371
372 cx.spawn({
373 let default_model = default_model.clone();
374 let session_id = response.session_id.clone();
375 let models = models.clone();
376 let conn = conn.clone();
377 async move |_| {
378 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
379 .await.log_err();
380
381 if result.is_none() {
382 models.borrow_mut().current_model_id = initial_model_id;
383 }
384 }
385 }).detach();
386
387 models_ref.current_model_id = default_model;
388 } else {
389 let available_models = models_ref
390 .available_models
391 .iter()
392 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
393 .collect::<Vec<_>>()
394 .join("\n");
395
396 log::warn!(
397 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
398 );
399 }
400 } else {
401 log::warn!(
402 "`{name}` does not support model selection, but `default_model` was set in settings.",
403 );
404 }
405 }
406
407 let session_id = response.session_id;
408 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
409 let thread = cx.new(|cx| {
410 AcpThread::new(
411 self.server_name.clone(),
412 self.clone(),
413 project,
414 action_log,
415 session_id.clone(),
416 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
417 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
418 cx,
419 )
420 })?;
421
422
423 let session = AcpSession {
424 thread: thread.downgrade(),
425 suppress_abort_err: false,
426 session_modes: modes,
427 models,
428 };
429 sessions.borrow_mut().insert(session_id, session);
430
431 Ok(thread)
432 })
433 }
434
435 fn auth_methods(&self) -> &[acp::AuthMethod] {
436 &self.auth_methods
437 }
438
439 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
440 let conn = self.connection.clone();
441 cx.foreground_executor().spawn(async move {
442 conn.authenticate(acp::AuthenticateRequest::new(method_id))
443 .await?;
444 Ok(())
445 })
446 }
447
448 fn prompt(
449 &self,
450 _id: Option<acp_thread::UserMessageId>,
451 params: acp::PromptRequest,
452 cx: &mut App,
453 ) -> Task<Result<acp::PromptResponse>> {
454 let conn = self.connection.clone();
455 let sessions = self.sessions.clone();
456 let session_id = params.session_id.clone();
457 cx.foreground_executor().spawn(async move {
458 let result = conn.prompt(params).await;
459
460 let mut suppress_abort_err = false;
461
462 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
463 suppress_abort_err = session.suppress_abort_err;
464 session.suppress_abort_err = false;
465 }
466
467 match result {
468 Ok(response) => Ok(response),
469 Err(err) => {
470 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
471 return Err(anyhow!(acp::Error::auth_required()));
472 }
473
474 if err.code != ErrorCode::INTERNAL_ERROR.code {
475 anyhow::bail!(err)
476 }
477
478 let Some(data) = &err.data else {
479 anyhow::bail!(err)
480 };
481
482 // Temporary workaround until the following PR is generally available:
483 // https://github.com/google-gemini/gemini-cli/pull/6656
484
485 #[derive(Deserialize)]
486 #[serde(deny_unknown_fields)]
487 struct ErrorDetails {
488 details: Box<str>,
489 }
490
491 match serde_json::from_value(data.clone()) {
492 Ok(ErrorDetails { details }) => {
493 if suppress_abort_err
494 && (details.contains("This operation was aborted")
495 || details.contains("The user aborted a request"))
496 {
497 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
498 } else {
499 Err(anyhow!(details))
500 }
501 }
502 Err(_) => Err(anyhow!(err)),
503 }
504 }
505 }
506 })
507 }
508
509 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
510 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
511 session.suppress_abort_err = true;
512 }
513 let conn = self.connection.clone();
514 let params = acp::CancelNotification::new(session_id.clone());
515 cx.foreground_executor()
516 .spawn(async move { conn.cancel(params).await })
517 .detach();
518 }
519
520 fn session_modes(
521 &self,
522 session_id: &acp::SessionId,
523 _cx: &App,
524 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
525 let sessions = self.sessions.clone();
526 let sessions_ref = sessions.borrow();
527 let Some(session) = sessions_ref.get(session_id) else {
528 return None;
529 };
530
531 if let Some(modes) = session.session_modes.as_ref() {
532 Some(Rc::new(AcpSessionModes {
533 connection: self.connection.clone(),
534 session_id: session_id.clone(),
535 state: modes.clone(),
536 }) as _)
537 } else {
538 None
539 }
540 }
541
542 fn model_selector(
543 &self,
544 session_id: &acp::SessionId,
545 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
546 let sessions = self.sessions.clone();
547 let sessions_ref = sessions.borrow();
548 let Some(session) = sessions_ref.get(session_id) else {
549 return None;
550 };
551
552 if let Some(models) = session.models.as_ref() {
553 Some(Rc::new(AcpModelSelector::new(
554 session_id.clone(),
555 self.connection.clone(),
556 models.clone(),
557 )) as _)
558 } else {
559 None
560 }
561 }
562
563 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
564 self
565 }
566}
567
568struct AcpSessionModes {
569 session_id: acp::SessionId,
570 connection: Rc<acp::ClientSideConnection>,
571 state: Rc<RefCell<acp::SessionModeState>>,
572}
573
574impl acp_thread::AgentSessionModes for AcpSessionModes {
575 fn current_mode(&self) -> acp::SessionModeId {
576 self.state.borrow().current_mode_id.clone()
577 }
578
579 fn all_modes(&self) -> Vec<acp::SessionMode> {
580 self.state.borrow().available_modes.clone()
581 }
582
583 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
584 let connection = self.connection.clone();
585 let session_id = self.session_id.clone();
586 let old_mode_id;
587 {
588 let mut state = self.state.borrow_mut();
589 old_mode_id = state.current_mode_id.clone();
590 state.current_mode_id = mode_id.clone();
591 };
592 let state = self.state.clone();
593 cx.foreground_executor().spawn(async move {
594 let result = connection
595 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
596 .await;
597
598 if result.is_err() {
599 state.borrow_mut().current_mode_id = old_mode_id;
600 }
601
602 result?;
603
604 Ok(())
605 })
606 }
607}
608
609struct AcpModelSelector {
610 session_id: acp::SessionId,
611 connection: Rc<acp::ClientSideConnection>,
612 state: Rc<RefCell<acp::SessionModelState>>,
613}
614
615impl AcpModelSelector {
616 fn new(
617 session_id: acp::SessionId,
618 connection: Rc<acp::ClientSideConnection>,
619 state: Rc<RefCell<acp::SessionModelState>>,
620 ) -> Self {
621 Self {
622 session_id,
623 connection,
624 state,
625 }
626 }
627}
628
629impl acp_thread::AgentModelSelector for AcpModelSelector {
630 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
631 Task::ready(Ok(acp_thread::AgentModelList::Flat(
632 self.state
633 .borrow()
634 .available_models
635 .clone()
636 .into_iter()
637 .map(acp_thread::AgentModelInfo::from)
638 .collect(),
639 )))
640 }
641
642 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
643 let connection = self.connection.clone();
644 let session_id = self.session_id.clone();
645 let old_model_id;
646 {
647 let mut state = self.state.borrow_mut();
648 old_model_id = state.current_model_id.clone();
649 state.current_model_id = model_id.clone();
650 };
651 let state = self.state.clone();
652 cx.foreground_executor().spawn(async move {
653 let result = connection
654 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
655 .await;
656
657 if result.is_err() {
658 state.borrow_mut().current_model_id = old_model_id;
659 }
660
661 result?;
662
663 Ok(())
664 })
665 }
666
667 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
668 let state = self.state.borrow();
669 Task::ready(
670 state
671 .available_models
672 .iter()
673 .find(|m| m.model_id == state.current_model_id)
674 .cloned()
675 .map(acp_thread::AgentModelInfo::from)
676 .ok_or_else(|| anyhow::anyhow!("Model not found")),
677 )
678 }
679}
680
681struct ClientDelegate {
682 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
683 cx: AsyncApp,
684}
685
686#[async_trait::async_trait(?Send)]
687impl acp::Client for ClientDelegate {
688 async fn request_permission(
689 &self,
690 arguments: acp::RequestPermissionRequest,
691 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
692 let respect_always_allow_setting;
693 let thread;
694 {
695 let sessions_ref = self.sessions.borrow();
696 let session = sessions_ref
697 .get(&arguments.session_id)
698 .context("Failed to get session")?;
699 respect_always_allow_setting = session.session_modes.is_none();
700 thread = session.thread.clone();
701 }
702
703 let cx = &mut self.cx.clone();
704
705 let task = thread.update(cx, |thread, cx| {
706 thread.request_tool_call_authorization(
707 arguments.tool_call,
708 arguments.options,
709 respect_always_allow_setting,
710 cx,
711 )
712 })??;
713
714 let outcome = task.await;
715
716 Ok(acp::RequestPermissionResponse::new(outcome))
717 }
718
719 async fn write_text_file(
720 &self,
721 arguments: acp::WriteTextFileRequest,
722 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
723 let cx = &mut self.cx.clone();
724 let task = self
725 .session_thread(&arguments.session_id)?
726 .update(cx, |thread, cx| {
727 thread.write_text_file(arguments.path, arguments.content, cx)
728 })?;
729
730 task.await?;
731
732 Ok(Default::default())
733 }
734
735 async fn read_text_file(
736 &self,
737 arguments: acp::ReadTextFileRequest,
738 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
739 let task = self.session_thread(&arguments.session_id)?.update(
740 &mut self.cx.clone(),
741 |thread, cx| {
742 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
743 },
744 )?;
745
746 let content = task.await?;
747
748 Ok(acp::ReadTextFileResponse::new(content))
749 }
750
751 async fn session_notification(
752 &self,
753 notification: acp::SessionNotification,
754 ) -> Result<(), acp::Error> {
755 let sessions = self.sessions.borrow();
756 let session = sessions
757 .get(¬ification.session_id)
758 .context("Failed to get session")?;
759
760 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
761 current_mode_id,
762 ..
763 }) = ¬ification.update
764 {
765 if let Some(session_modes) = &session.session_modes {
766 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
767 } else {
768 log::error!(
769 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
770 );
771 }
772 }
773
774 // Clone so we can inspect meta both before and after handing off to the thread
775 let update_clone = notification.update.clone();
776
777 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
778 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
779 if let Some(meta) = &tc.meta {
780 if let Some(terminal_info) = meta.get("terminal_info") {
781 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
782 {
783 let terminal_id = acp::TerminalId::new(id_str);
784 let cwd = terminal_info
785 .get("cwd")
786 .and_then(|v| v.as_str().map(PathBuf::from));
787
788 // Create a minimal display-only lower-level terminal and register it.
789 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
790 let builder = TerminalBuilder::new_display_only(
791 CursorShape::default(),
792 AlternateScroll::On,
793 None,
794 0,
795 )?;
796 let lower = cx.new(|cx| builder.subscribe(cx));
797 thread.on_terminal_provider_event(
798 TerminalProviderEvent::Created {
799 terminal_id,
800 label: tc.title.clone(),
801 cwd,
802 output_byte_limit: None,
803 terminal: lower,
804 },
805 cx,
806 );
807 anyhow::Ok(())
808 });
809 }
810 }
811 }
812 }
813
814 // Forward the update to the acp_thread as usual.
815 session.thread.update(&mut self.cx.clone(), |thread, cx| {
816 thread.handle_session_update(notification.update.clone(), cx)
817 })??;
818
819 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
820 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
821 if let Some(meta) = &tcu.meta {
822 if let Some(term_out) = meta.get("terminal_output") {
823 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
824 let terminal_id = acp::TerminalId::new(id_str);
825 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
826 let data = s.as_bytes().to_vec();
827 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
828 thread.on_terminal_provider_event(
829 TerminalProviderEvent::Output { terminal_id, data },
830 cx,
831 );
832 });
833 }
834 }
835 }
836
837 // terminal_exit
838 if let Some(term_exit) = meta.get("terminal_exit") {
839 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
840 let terminal_id = acp::TerminalId::new(id_str);
841 let mut status = acp::TerminalExitStatus::new();
842 if let Some(code) = term_exit.get("exit_code").and_then(|v| v.as_u64()) {
843 status = status.exit_code(code as u32)
844 }
845 if let Some(signal) = term_exit.get("signal").and_then(|v| v.as_str()) {
846 status = status.signal(signal);
847 }
848
849 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
850 thread.on_terminal_provider_event(
851 TerminalProviderEvent::Exit {
852 terminal_id,
853 status,
854 },
855 cx,
856 );
857 });
858 }
859 }
860 }
861 }
862
863 Ok(())
864 }
865
866 async fn create_terminal(
867 &self,
868 args: acp::CreateTerminalRequest,
869 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
870 let thread = self.session_thread(&args.session_id)?;
871 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
872
873 let terminal_entity = acp_thread::create_terminal_entity(
874 args.command.clone(),
875 &args.args,
876 args.env
877 .into_iter()
878 .map(|env| (env.name, env.value))
879 .collect(),
880 args.cwd.clone(),
881 &project,
882 &mut self.cx.clone(),
883 )
884 .await?;
885
886 // Register with renderer
887 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
888 thread.register_terminal_created(
889 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
890 format!("{} {}", args.command, args.args.join(" ")),
891 args.cwd.clone(),
892 args.output_byte_limit,
893 terminal_entity,
894 cx,
895 )
896 })?;
897 let terminal_id =
898 terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
899 Ok(acp::CreateTerminalResponse::new(terminal_id))
900 }
901
902 async fn kill_terminal_command(
903 &self,
904 args: acp::KillTerminalCommandRequest,
905 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
906 self.session_thread(&args.session_id)?
907 .update(&mut self.cx.clone(), |thread, cx| {
908 thread.kill_terminal(args.terminal_id, cx)
909 })??;
910
911 Ok(Default::default())
912 }
913
914 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
915 Err(acp::Error::method_not_found())
916 }
917
918 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
919 Err(acp::Error::method_not_found())
920 }
921
922 async fn release_terminal(
923 &self,
924 args: acp::ReleaseTerminalRequest,
925 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
926 self.session_thread(&args.session_id)?
927 .update(&mut self.cx.clone(), |thread, cx| {
928 thread.release_terminal(args.terminal_id, cx)
929 })??;
930
931 Ok(Default::default())
932 }
933
934 async fn terminal_output(
935 &self,
936 args: acp::TerminalOutputRequest,
937 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
938 self.session_thread(&args.session_id)?
939 .read_with(&mut self.cx.clone(), |thread, cx| {
940 let out = thread
941 .terminal(args.terminal_id)?
942 .read(cx)
943 .current_output(cx);
944
945 Ok(out)
946 })?
947 }
948
949 async fn wait_for_terminal_exit(
950 &self,
951 args: acp::WaitForTerminalExitRequest,
952 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
953 let exit_status = self
954 .session_thread(&args.session_id)?
955 .update(&mut self.cx.clone(), |thread, cx| {
956 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
957 })??
958 .await;
959
960 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
961 }
962}
963
964impl ClientDelegate {
965 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
966 let sessions = self.sessions.borrow();
967 sessions
968 .get(session_id)
969 .context("Failed to get session")
970 .map(|session| session.thread.clone())
971 }
972}