1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
5use anyhow::anyhow;
6use collections::HashMap;
7use futures::AsyncBufReadExt as _;
8use futures::io::BufReader;
9use project::Project;
10use project::agent_server_store::AgentServerCommand;
11use serde::Deserialize;
12use util::ResultExt as _;
13
14use std::path::PathBuf;
15use std::{any::Any, cell::RefCell};
16use std::{path::Path, rc::Rc};
17use thiserror::Error;
18
19use anyhow::{Context as _, Result};
20use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
21
22use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
23use terminal::TerminalBuilder;
24use terminal::terminal_settings::{AlternateScroll, CursorShape};
25
26#[derive(Debug, Error)]
27#[error("Unsupported version")]
28pub struct UnsupportedVersion;
29
30pub struct AcpConnection {
31 server_name: SharedString,
32 telemetry_id: &'static str,
33 connection: Rc<acp::ClientSideConnection>,
34 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
35 auth_methods: Vec<acp::AuthMethod>,
36 agent_capabilities: acp::AgentCapabilities,
37 default_mode: Option<acp::SessionModeId>,
38 root_dir: PathBuf,
39 // NB: Don't move this into the wait_task, since we need to ensure the process is
40 // killed on drop (setting kill_on_drop on the command seems to not always work).
41 child: smol::process::Child,
42 _io_task: Task<Result<(), acp::Error>>,
43 _wait_task: Task<Result<()>>,
44 _stderr_task: Task<Result<()>>,
45}
46
47pub struct AcpSession {
48 thread: WeakEntity<AcpThread>,
49 suppress_abort_err: bool,
50 models: Option<Rc<RefCell<acp::SessionModelState>>>,
51 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
52}
53
54pub async fn connect(
55 server_name: SharedString,
56 telemetry_id: &'static str,
57 command: AgentServerCommand,
58 root_dir: &Path,
59 default_mode: Option<acp::SessionModeId>,
60 is_remote: bool,
61 cx: &mut AsyncApp,
62) -> Result<Rc<dyn AgentConnection>> {
63 let conn = AcpConnection::stdio(
64 server_name,
65 telemetry_id,
66 command.clone(),
67 root_dir,
68 default_mode,
69 is_remote,
70 cx,
71 )
72 .await?;
73 Ok(Rc::new(conn) as _)
74}
75
76const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::V1;
77
78impl AcpConnection {
79 pub async fn stdio(
80 server_name: SharedString,
81 telemetry_id: &'static str,
82 command: AgentServerCommand,
83 root_dir: &Path,
84 default_mode: Option<acp::SessionModeId>,
85 is_remote: bool,
86 cx: &mut AsyncApp,
87 ) -> Result<Self> {
88 let mut child = util::command::new_smol_command(&command.path);
89 child
90 .args(command.args.iter().map(|arg| arg.as_str()))
91 .envs(command.env.iter().flatten())
92 .stdin(std::process::Stdio::piped())
93 .stdout(std::process::Stdio::piped())
94 .stderr(std::process::Stdio::piped());
95 if !is_remote {
96 child.current_dir(root_dir);
97 }
98 let mut child = child.spawn()?;
99
100 let stdout = child.stdout.take().context("Failed to take stdout")?;
101 let stdin = child.stdin.take().context("Failed to take stdin")?;
102 let stderr = child.stderr.take().context("Failed to take stderr")?;
103 log::debug!(
104 "Spawning external agent server: {:?}, {:?}",
105 command.path,
106 command.args
107 );
108 log::trace!("Spawned (pid: {})", child.id());
109
110 let sessions = Rc::new(RefCell::new(HashMap::default()));
111
112 let (release_channel, version) = cx.update(|cx| {
113 (
114 release_channel::ReleaseChannel::try_global(cx)
115 .map(|release_channel| release_channel.display_name()),
116 release_channel::AppVersion::global(cx).to_string(),
117 )
118 })?;
119
120 let client = ClientDelegate {
121 sessions: sessions.clone(),
122 cx: cx.clone(),
123 };
124 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
125 let foreground_executor = cx.foreground_executor().clone();
126 move |fut| {
127 foreground_executor.spawn(fut).detach();
128 }
129 });
130
131 let io_task = cx.background_spawn(io_task);
132
133 let stderr_task = cx.background_spawn(async move {
134 let mut stderr = BufReader::new(stderr);
135 let mut line = String::new();
136 while let Ok(n) = stderr.read_line(&mut line).await
137 && n > 0
138 {
139 log::warn!("agent stderr: {}", line.trim());
140 line.clear();
141 }
142 Ok(())
143 });
144
145 let wait_task = cx.spawn({
146 let sessions = sessions.clone();
147 let status_fut = child.status();
148 async move |cx| {
149 let status = status_fut.await?;
150
151 for session in sessions.borrow().values() {
152 session
153 .thread
154 .update(cx, |thread, cx| {
155 thread.emit_load_error(LoadError::Exited { status }, cx)
156 })
157 .ok();
158 }
159
160 anyhow::Ok(())
161 }
162 });
163
164 let connection = Rc::new(connection);
165
166 cx.update(|cx| {
167 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
168 registry.set_active_connection(server_name.clone(), &connection, cx)
169 });
170 })?;
171
172 let response = connection
173 .initialize(acp::InitializeRequest {
174 protocol_version: acp::VERSION,
175 client_capabilities: acp::ClientCapabilities {
176 fs: acp::FileSystemCapability {
177 read_text_file: true,
178 write_text_file: true,
179 meta: None,
180 },
181 terminal: true,
182 meta: Some(serde_json::json!({
183 // Experimental: Allow for rendering terminal output from the agents
184 "terminal_output": true,
185 "terminal-auth": true,
186 })),
187 },
188 client_info: Some(acp::Implementation {
189 name: "zed".to_owned(),
190 title: release_channel.map(|c| c.to_owned()),
191 version,
192 }),
193 meta: None,
194 })
195 .await?;
196
197 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
198 return Err(UnsupportedVersion.into());
199 }
200
201 Ok(Self {
202 auth_methods: response.auth_methods,
203 root_dir: root_dir.to_owned(),
204 connection,
205 server_name,
206 telemetry_id,
207 sessions,
208 agent_capabilities: response.agent_capabilities,
209 default_mode,
210 _io_task: io_task,
211 _wait_task: wait_task,
212 _stderr_task: stderr_task,
213 child,
214 })
215 }
216
217 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
218 &self.agent_capabilities.prompt_capabilities
219 }
220
221 pub fn root_dir(&self) -> &Path {
222 &self.root_dir
223 }
224}
225
226impl Drop for AcpConnection {
227 fn drop(&mut self) {
228 // See the comment on the child field.
229 self.child.kill().log_err();
230 }
231}
232
233impl AgentConnection for AcpConnection {
234 fn telemetry_id(&self) -> &'static str {
235 self.telemetry_id
236 }
237
238 fn new_thread(
239 self: Rc<Self>,
240 project: Entity<Project>,
241 cwd: &Path,
242 cx: &mut App,
243 ) -> Task<Result<Entity<AcpThread>>> {
244 let name = self.server_name.clone();
245 let conn = self.connection.clone();
246 let sessions = self.sessions.clone();
247 let default_mode = self.default_mode.clone();
248 let cwd = cwd.to_path_buf();
249 let context_server_store = project.read(cx).context_server_store().read(cx);
250 let mcp_servers =
251 if project.read(cx).is_local() {
252 context_server_store
253 .configured_server_ids()
254 .iter()
255 .filter_map(|id| {
256 let configuration = context_server_store.configuration_for_server(id)?;
257 match &*configuration {
258 project::context_server_store::ContextServerConfiguration::Custom {
259 command,
260 ..
261 }
262 | project::context_server_store::ContextServerConfiguration::Extension {
263 command,
264 ..
265 } => Some(acp::McpServer::Stdio {
266 name: id.0.to_string(),
267 command: command.path.clone(),
268 args: command.args.clone(),
269 env: if let Some(env) = command.env.as_ref() {
270 env.iter()
271 .map(|(name, value)| acp::EnvVariable {
272 name: name.clone(),
273 value: value.clone(),
274 meta: None,
275 })
276 .collect()
277 } else {
278 vec![]
279 },
280 }),
281 project::context_server_store::ContextServerConfiguration::Http {
282 url,
283 headers,
284 } => Some(acp::McpServer::Http {
285 name: id.0.to_string(),
286 url: url.to_string(),
287 headers: headers.iter().map(|(name, value)| acp::HttpHeader {
288 name: name.clone(),
289 value: value.clone(),
290 meta: None,
291 }).collect(),
292 }),
293 }
294 })
295 .collect()
296 } else {
297 // In SSH projects, the external agent is running on the remote
298 // machine, and currently we only run MCP servers on the local
299 // machine. So don't pass any MCP servers to the agent in that case.
300 Vec::new()
301 };
302
303 cx.spawn(async move |cx| {
304 let response = conn
305 .new_session(acp::NewSessionRequest { mcp_servers, cwd, meta: None })
306 .await
307 .map_err(|err| {
308 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
309 let mut error = AuthRequired::new();
310
311 if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
312 error = error.with_description(err.message);
313 }
314
315 anyhow!(error)
316 } else {
317 anyhow!(err)
318 }
319 })?;
320
321 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
322 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
323
324 if let Some(default_mode) = default_mode {
325 if let Some(modes) = modes.as_ref() {
326 let mut modes_ref = modes.borrow_mut();
327 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
328
329 if has_mode {
330 let initial_mode_id = modes_ref.current_mode_id.clone();
331
332 cx.spawn({
333 let default_mode = default_mode.clone();
334 let session_id = response.session_id.clone();
335 let modes = modes.clone();
336 async move |_| {
337 let result = conn.set_session_mode(acp::SetSessionModeRequest {
338 session_id,
339 mode_id: default_mode,
340 meta: None,
341 })
342 .await.log_err();
343
344 if result.is_none() {
345 modes.borrow_mut().current_mode_id = initial_mode_id;
346 }
347 }
348 }).detach();
349
350 modes_ref.current_mode_id = default_mode;
351 } else {
352 let available_modes = modes_ref
353 .available_modes
354 .iter()
355 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
356 .collect::<Vec<_>>()
357 .join("\n");
358
359 log::warn!(
360 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
361 );
362 }
363 } else {
364 log::warn!(
365 "`{name}` does not support modes, but `default_mode` was set in settings.",
366 );
367 }
368 }
369
370 let session_id = response.session_id;
371 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
372 let thread = cx.new(|cx| {
373 AcpThread::new(
374 self.server_name.clone(),
375 self.clone(),
376 project,
377 action_log,
378 session_id.clone(),
379 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
380 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
381 cx,
382 )
383 })?;
384
385
386 let session = AcpSession {
387 thread: thread.downgrade(),
388 suppress_abort_err: false,
389 session_modes: modes,
390 models,
391 };
392 sessions.borrow_mut().insert(session_id, session);
393
394 Ok(thread)
395 })
396 }
397
398 fn auth_methods(&self) -> &[acp::AuthMethod] {
399 &self.auth_methods
400 }
401
402 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
403 let conn = self.connection.clone();
404 cx.foreground_executor().spawn(async move {
405 conn.authenticate(acp::AuthenticateRequest {
406 method_id: method_id.clone(),
407 meta: None,
408 })
409 .await?;
410
411 Ok(())
412 })
413 }
414
415 fn prompt(
416 &self,
417 _id: Option<acp_thread::UserMessageId>,
418 params: acp::PromptRequest,
419 cx: &mut App,
420 ) -> Task<Result<acp::PromptResponse>> {
421 let conn = self.connection.clone();
422 let sessions = self.sessions.clone();
423 let session_id = params.session_id.clone();
424 cx.foreground_executor().spawn(async move {
425 let result = conn.prompt(params).await;
426
427 let mut suppress_abort_err = false;
428
429 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
430 suppress_abort_err = session.suppress_abort_err;
431 session.suppress_abort_err = false;
432 }
433
434 match result {
435 Ok(response) => Ok(response),
436 Err(err) => {
437 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
438 return Err(anyhow!(acp::Error::auth_required()));
439 }
440
441 if err.code != ErrorCode::INTERNAL_ERROR.code {
442 anyhow::bail!(err)
443 }
444
445 let Some(data) = &err.data else {
446 anyhow::bail!(err)
447 };
448
449 // Temporary workaround until the following PR is generally available:
450 // https://github.com/google-gemini/gemini-cli/pull/6656
451
452 #[derive(Deserialize)]
453 #[serde(deny_unknown_fields)]
454 struct ErrorDetails {
455 details: Box<str>,
456 }
457
458 match serde_json::from_value(data.clone()) {
459 Ok(ErrorDetails { details }) => {
460 if suppress_abort_err
461 && (details.contains("This operation was aborted")
462 || details.contains("The user aborted a request"))
463 {
464 Ok(acp::PromptResponse {
465 stop_reason: acp::StopReason::Cancelled,
466 meta: None,
467 })
468 } else {
469 Err(anyhow!(details))
470 }
471 }
472 Err(_) => Err(anyhow!(err)),
473 }
474 }
475 }
476 })
477 }
478
479 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
480 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
481 session.suppress_abort_err = true;
482 }
483 let conn = self.connection.clone();
484 let params = acp::CancelNotification {
485 session_id: session_id.clone(),
486 meta: None,
487 };
488 cx.foreground_executor()
489 .spawn(async move { conn.cancel(params).await })
490 .detach();
491 }
492
493 fn session_modes(
494 &self,
495 session_id: &acp::SessionId,
496 _cx: &App,
497 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
498 let sessions = self.sessions.clone();
499 let sessions_ref = sessions.borrow();
500 let Some(session) = sessions_ref.get(session_id) else {
501 return None;
502 };
503
504 if let Some(modes) = session.session_modes.as_ref() {
505 Some(Rc::new(AcpSessionModes {
506 connection: self.connection.clone(),
507 session_id: session_id.clone(),
508 state: modes.clone(),
509 }) as _)
510 } else {
511 None
512 }
513 }
514
515 fn model_selector(
516 &self,
517 session_id: &acp::SessionId,
518 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
519 let sessions = self.sessions.clone();
520 let sessions_ref = sessions.borrow();
521 let Some(session) = sessions_ref.get(session_id) else {
522 return None;
523 };
524
525 if let Some(models) = session.models.as_ref() {
526 Some(Rc::new(AcpModelSelector::new(
527 session_id.clone(),
528 self.connection.clone(),
529 models.clone(),
530 )) as _)
531 } else {
532 None
533 }
534 }
535
536 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
537 self
538 }
539}
540
541struct AcpSessionModes {
542 session_id: acp::SessionId,
543 connection: Rc<acp::ClientSideConnection>,
544 state: Rc<RefCell<acp::SessionModeState>>,
545}
546
547impl acp_thread::AgentSessionModes for AcpSessionModes {
548 fn current_mode(&self) -> acp::SessionModeId {
549 self.state.borrow().current_mode_id.clone()
550 }
551
552 fn all_modes(&self) -> Vec<acp::SessionMode> {
553 self.state.borrow().available_modes.clone()
554 }
555
556 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
557 let connection = self.connection.clone();
558 let session_id = self.session_id.clone();
559 let old_mode_id;
560 {
561 let mut state = self.state.borrow_mut();
562 old_mode_id = state.current_mode_id.clone();
563 state.current_mode_id = mode_id.clone();
564 };
565 let state = self.state.clone();
566 cx.foreground_executor().spawn(async move {
567 let result = connection
568 .set_session_mode(acp::SetSessionModeRequest {
569 session_id,
570 mode_id,
571 meta: None,
572 })
573 .await;
574
575 if result.is_err() {
576 state.borrow_mut().current_mode_id = old_mode_id;
577 }
578
579 result?;
580
581 Ok(())
582 })
583 }
584}
585
586struct AcpModelSelector {
587 session_id: acp::SessionId,
588 connection: Rc<acp::ClientSideConnection>,
589 state: Rc<RefCell<acp::SessionModelState>>,
590}
591
592impl AcpModelSelector {
593 fn new(
594 session_id: acp::SessionId,
595 connection: Rc<acp::ClientSideConnection>,
596 state: Rc<RefCell<acp::SessionModelState>>,
597 ) -> Self {
598 Self {
599 session_id,
600 connection,
601 state,
602 }
603 }
604}
605
606impl acp_thread::AgentModelSelector for AcpModelSelector {
607 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
608 Task::ready(Ok(acp_thread::AgentModelList::Flat(
609 self.state
610 .borrow()
611 .available_models
612 .clone()
613 .into_iter()
614 .map(acp_thread::AgentModelInfo::from)
615 .collect(),
616 )))
617 }
618
619 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
620 let connection = self.connection.clone();
621 let session_id = self.session_id.clone();
622 let old_model_id;
623 {
624 let mut state = self.state.borrow_mut();
625 old_model_id = state.current_model_id.clone();
626 state.current_model_id = model_id.clone();
627 };
628 let state = self.state.clone();
629 cx.foreground_executor().spawn(async move {
630 let result = connection
631 .set_session_model(acp::SetSessionModelRequest {
632 session_id,
633 model_id,
634 meta: None,
635 })
636 .await;
637
638 if result.is_err() {
639 state.borrow_mut().current_model_id = old_model_id;
640 }
641
642 result?;
643
644 Ok(())
645 })
646 }
647
648 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
649 let state = self.state.borrow();
650 Task::ready(
651 state
652 .available_models
653 .iter()
654 .find(|m| m.model_id == state.current_model_id)
655 .cloned()
656 .map(acp_thread::AgentModelInfo::from)
657 .ok_or_else(|| anyhow::anyhow!("Model not found")),
658 )
659 }
660}
661
662struct ClientDelegate {
663 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
664 cx: AsyncApp,
665}
666
667#[async_trait::async_trait(?Send)]
668impl acp::Client for ClientDelegate {
669 async fn request_permission(
670 &self,
671 arguments: acp::RequestPermissionRequest,
672 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
673 let respect_always_allow_setting;
674 let thread;
675 {
676 let sessions_ref = self.sessions.borrow();
677 let session = sessions_ref
678 .get(&arguments.session_id)
679 .context("Failed to get session")?;
680 respect_always_allow_setting = session.session_modes.is_none();
681 thread = session.thread.clone();
682 }
683
684 let cx = &mut self.cx.clone();
685
686 let task = thread.update(cx, |thread, cx| {
687 thread.request_tool_call_authorization(
688 arguments.tool_call,
689 arguments.options,
690 respect_always_allow_setting,
691 cx,
692 )
693 })??;
694
695 let outcome = task.await;
696
697 Ok(acp::RequestPermissionResponse {
698 outcome,
699 meta: None,
700 })
701 }
702
703 async fn write_text_file(
704 &self,
705 arguments: acp::WriteTextFileRequest,
706 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
707 let cx = &mut self.cx.clone();
708 let task = self
709 .session_thread(&arguments.session_id)?
710 .update(cx, |thread, cx| {
711 thread.write_text_file(arguments.path, arguments.content, cx)
712 })?;
713
714 task.await?;
715
716 Ok(Default::default())
717 }
718
719 async fn read_text_file(
720 &self,
721 arguments: acp::ReadTextFileRequest,
722 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
723 let task = self.session_thread(&arguments.session_id)?.update(
724 &mut self.cx.clone(),
725 |thread, cx| {
726 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
727 },
728 )?;
729
730 let content = task.await?;
731
732 Ok(acp::ReadTextFileResponse {
733 content,
734 meta: None,
735 })
736 }
737
738 async fn session_notification(
739 &self,
740 notification: acp::SessionNotification,
741 ) -> Result<(), acp::Error> {
742 let sessions = self.sessions.borrow();
743 let session = sessions
744 .get(¬ification.session_id)
745 .context("Failed to get session")?;
746
747 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
748 current_mode_id,
749 ..
750 }) = ¬ification.update
751 {
752 if let Some(session_modes) = &session.session_modes {
753 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
754 } else {
755 log::error!(
756 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
757 );
758 }
759 }
760
761 // Clone so we can inspect meta both before and after handing off to the thread
762 let update_clone = notification.update.clone();
763
764 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
765 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
766 if let Some(meta) = &tc.meta {
767 if let Some(terminal_info) = meta.get("terminal_info") {
768 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
769 {
770 let terminal_id = acp::TerminalId(id_str.into());
771 let cwd = terminal_info
772 .get("cwd")
773 .and_then(|v| v.as_str().map(PathBuf::from));
774
775 // Create a minimal display-only lower-level terminal and register it.
776 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
777 let builder = TerminalBuilder::new_display_only(
778 CursorShape::default(),
779 AlternateScroll::On,
780 None,
781 0,
782 )?;
783 let lower = cx.new(|cx| builder.subscribe(cx));
784 thread.on_terminal_provider_event(
785 TerminalProviderEvent::Created {
786 terminal_id: terminal_id.clone(),
787 label: tc.title.clone(),
788 cwd,
789 output_byte_limit: None,
790 terminal: lower,
791 },
792 cx,
793 );
794 anyhow::Ok(())
795 });
796 }
797 }
798 }
799 }
800
801 // Forward the update to the acp_thread as usual.
802 session.thread.update(&mut self.cx.clone(), |thread, cx| {
803 thread.handle_session_update(notification.update.clone(), cx)
804 })??;
805
806 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
807 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
808 if let Some(meta) = &tcu.meta {
809 if let Some(term_out) = meta.get("terminal_output") {
810 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
811 let terminal_id = acp::TerminalId(id_str.into());
812 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
813 let data = s.as_bytes().to_vec();
814 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
815 thread.on_terminal_provider_event(
816 TerminalProviderEvent::Output {
817 terminal_id: terminal_id.clone(),
818 data,
819 },
820 cx,
821 );
822 });
823 }
824 }
825 }
826
827 // terminal_exit
828 if let Some(term_exit) = meta.get("terminal_exit") {
829 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
830 let terminal_id = acp::TerminalId(id_str.into());
831 let status = acp::TerminalExitStatus {
832 exit_code: term_exit
833 .get("exit_code")
834 .and_then(|v| v.as_u64())
835 .map(|i| i as u32),
836 signal: term_exit
837 .get("signal")
838 .and_then(|v| v.as_str().map(|s| s.to_string())),
839 meta: None,
840 };
841 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
842 thread.on_terminal_provider_event(
843 TerminalProviderEvent::Exit {
844 terminal_id: terminal_id.clone(),
845 status,
846 },
847 cx,
848 );
849 });
850 }
851 }
852 }
853 }
854
855 Ok(())
856 }
857
858 async fn create_terminal(
859 &self,
860 args: acp::CreateTerminalRequest,
861 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
862 let thread = self.session_thread(&args.session_id)?;
863 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
864
865 let terminal_entity = acp_thread::create_terminal_entity(
866 args.command.clone(),
867 &args.args,
868 args.env
869 .into_iter()
870 .map(|env| (env.name, env.value))
871 .collect(),
872 args.cwd.clone(),
873 &project,
874 &mut self.cx.clone(),
875 )
876 .await?;
877
878 // Register with renderer
879 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
880 thread.register_terminal_created(
881 acp::TerminalId(uuid::Uuid::new_v4().to_string().into()),
882 format!("{} {}", args.command, args.args.join(" ")),
883 args.cwd.clone(),
884 args.output_byte_limit,
885 terminal_entity,
886 cx,
887 )
888 })?;
889 let terminal_id =
890 terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
891 Ok(acp::CreateTerminalResponse {
892 terminal_id,
893 meta: None,
894 })
895 }
896
897 async fn kill_terminal_command(
898 &self,
899 args: acp::KillTerminalCommandRequest,
900 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
901 self.session_thread(&args.session_id)?
902 .update(&mut self.cx.clone(), |thread, cx| {
903 thread.kill_terminal(args.terminal_id, cx)
904 })??;
905
906 Ok(Default::default())
907 }
908
909 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
910 Err(acp::Error::method_not_found())
911 }
912
913 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
914 Err(acp::Error::method_not_found())
915 }
916
917 async fn release_terminal(
918 &self,
919 args: acp::ReleaseTerminalRequest,
920 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
921 self.session_thread(&args.session_id)?
922 .update(&mut self.cx.clone(), |thread, cx| {
923 thread.release_terminal(args.terminal_id, cx)
924 })??;
925
926 Ok(Default::default())
927 }
928
929 async fn terminal_output(
930 &self,
931 args: acp::TerminalOutputRequest,
932 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
933 self.session_thread(&args.session_id)?
934 .read_with(&mut self.cx.clone(), |thread, cx| {
935 let out = thread
936 .terminal(args.terminal_id)?
937 .read(cx)
938 .current_output(cx);
939
940 Ok(out)
941 })?
942 }
943
944 async fn wait_for_terminal_exit(
945 &self,
946 args: acp::WaitForTerminalExitRequest,
947 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
948 let exit_status = self
949 .session_thread(&args.session_id)?
950 .update(&mut self.cx.clone(), |thread, cx| {
951 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
952 })??
953 .await;
954
955 Ok(acp::WaitForTerminalExitResponse {
956 exit_status,
957 meta: None,
958 })
959 }
960}
961
962impl ClientDelegate {
963 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
964 let sessions = self.sessions.borrow();
965 sessions
966 .get(session_id)
967 .context("Failed to get session")
968 .map(|session| session.thread.clone())
969 }
970}