1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _};
5use anyhow::anyhow;
6use collections::HashMap;
7use futures::AsyncBufReadExt as _;
8use futures::io::BufReader;
9use project::Project;
10use project::agent_server_store::AgentServerCommand;
11use serde::Deserialize;
12use util::ResultExt as _;
13
14use std::path::PathBuf;
15use std::{any::Any, cell::RefCell};
16use std::{path::Path, rc::Rc};
17use thiserror::Error;
18
19use anyhow::{Context as _, Result};
20use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
21
22use acp_thread::{AcpThread, AuthRequired, LoadError};
23
24#[derive(Debug, Error)]
25#[error("Unsupported version")]
26pub struct UnsupportedVersion;
27
28pub struct AcpConnection {
29 server_name: SharedString,
30 connection: Rc<acp::ClientSideConnection>,
31 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
32 auth_methods: Vec<acp::AuthMethod>,
33 agent_capabilities: acp::AgentCapabilities,
34 default_mode: Option<acp::SessionModeId>,
35 root_dir: PathBuf,
36 // NB: Don't move this into the wait_task, since we need to ensure the process is
37 // killed on drop (setting kill_on_drop on the command seems to not always work).
38 child: smol::process::Child,
39 _io_task: Task<Result<()>>,
40 _wait_task: Task<Result<()>>,
41 _stderr_task: Task<Result<()>>,
42}
43
44pub struct AcpSession {
45 thread: WeakEntity<AcpThread>,
46 suppress_abort_err: bool,
47 models: Option<Rc<RefCell<acp::SessionModelState>>>,
48 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
49}
50
51pub async fn connect(
52 server_name: SharedString,
53 command: AgentServerCommand,
54 root_dir: &Path,
55 default_mode: Option<acp::SessionModeId>,
56 is_remote: bool,
57 cx: &mut AsyncApp,
58) -> Result<Rc<dyn AgentConnection>> {
59 let conn = AcpConnection::stdio(
60 server_name,
61 command.clone(),
62 root_dir,
63 default_mode,
64 is_remote,
65 cx,
66 )
67 .await?;
68 Ok(Rc::new(conn) as _)
69}
70
71const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::V1;
72
73impl AcpConnection {
74 pub async fn stdio(
75 server_name: SharedString,
76 command: AgentServerCommand,
77 root_dir: &Path,
78 default_mode: Option<acp::SessionModeId>,
79 is_remote: bool,
80 cx: &mut AsyncApp,
81 ) -> Result<Self> {
82 let mut child = util::command::new_smol_command(command.path);
83 child
84 .args(command.args.iter().map(|arg| arg.as_str()))
85 .envs(command.env.iter().flatten())
86 .stdin(std::process::Stdio::piped())
87 .stdout(std::process::Stdio::piped())
88 .stderr(std::process::Stdio::piped());
89 if !is_remote {
90 child.current_dir(root_dir);
91 }
92 let mut child = child.spawn()?;
93
94 let stdout = child.stdout.take().context("Failed to take stdout")?;
95 let stdin = child.stdin.take().context("Failed to take stdin")?;
96 let stderr = child.stderr.take().context("Failed to take stderr")?;
97 log::trace!("Spawned (pid: {})", child.id());
98
99 let sessions = Rc::new(RefCell::new(HashMap::default()));
100
101 let client = ClientDelegate {
102 sessions: sessions.clone(),
103 cx: cx.clone(),
104 };
105 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
106 let foreground_executor = cx.foreground_executor().clone();
107 move |fut| {
108 foreground_executor.spawn(fut).detach();
109 }
110 });
111
112 let io_task = cx.background_spawn(io_task);
113
114 let stderr_task = cx.background_spawn(async move {
115 let mut stderr = BufReader::new(stderr);
116 let mut line = String::new();
117 while let Ok(n) = stderr.read_line(&mut line).await
118 && n > 0
119 {
120 log::warn!("agent stderr: {}", &line);
121 line.clear();
122 }
123 Ok(())
124 });
125
126 let wait_task = cx.spawn({
127 let sessions = sessions.clone();
128 let status_fut = child.status();
129 async move |cx| {
130 let status = status_fut.await?;
131
132 for session in sessions.borrow().values() {
133 session
134 .thread
135 .update(cx, |thread, cx| {
136 thread.emit_load_error(LoadError::Exited { status }, cx)
137 })
138 .ok();
139 }
140
141 anyhow::Ok(())
142 }
143 });
144
145 let connection = Rc::new(connection);
146
147 cx.update(|cx| {
148 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
149 registry.set_active_connection(server_name.clone(), &connection, cx)
150 });
151 })?;
152
153 let response = connection
154 .initialize(acp::InitializeRequest {
155 protocol_version: acp::VERSION,
156 client_capabilities: acp::ClientCapabilities {
157 fs: acp::FileSystemCapability {
158 read_text_file: true,
159 write_text_file: true,
160 meta: None,
161 },
162 terminal: true,
163 meta: None,
164 },
165 meta: None,
166 })
167 .await?;
168
169 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
170 return Err(UnsupportedVersion.into());
171 }
172
173 Ok(Self {
174 auth_methods: response.auth_methods,
175 root_dir: root_dir.to_owned(),
176 connection,
177 server_name,
178 sessions,
179 agent_capabilities: response.agent_capabilities,
180 default_mode,
181 _io_task: io_task,
182 _wait_task: wait_task,
183 _stderr_task: stderr_task,
184 child,
185 })
186 }
187
188 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
189 &self.agent_capabilities.prompt_capabilities
190 }
191
192 pub fn root_dir(&self) -> &Path {
193 &self.root_dir
194 }
195}
196
197impl Drop for AcpConnection {
198 fn drop(&mut self) {
199 // See the comment on the child field.
200 self.child.kill().log_err();
201 }
202}
203
204impl AgentConnection for AcpConnection {
205 fn new_thread(
206 self: Rc<Self>,
207 project: Entity<Project>,
208 cwd: &Path,
209 cx: &mut App,
210 ) -> Task<Result<Entity<AcpThread>>> {
211 let name = self.server_name.clone();
212 let conn = self.connection.clone();
213 let sessions = self.sessions.clone();
214 let default_mode = self.default_mode.clone();
215 let cwd = cwd.to_path_buf();
216 let context_server_store = project.read(cx).context_server_store().read(cx);
217 let mcp_servers = if project.read(cx).is_local() {
218 context_server_store
219 .configured_server_ids()
220 .iter()
221 .filter_map(|id| {
222 let configuration = context_server_store.configuration_for_server(id)?;
223 let command = configuration.command();
224 Some(acp::McpServer::Stdio {
225 name: id.0.to_string(),
226 command: command.path.clone(),
227 args: command.args.clone(),
228 env: if let Some(env) = command.env.as_ref() {
229 env.iter()
230 .map(|(name, value)| acp::EnvVariable {
231 name: name.clone(),
232 value: value.clone(),
233 meta: None,
234 })
235 .collect()
236 } else {
237 vec![]
238 },
239 })
240 })
241 .collect()
242 } else {
243 // In SSH projects, the external agent is running on the remote
244 // machine, and currently we only run MCP servers on the local
245 // machine. So don't pass any MCP servers to the agent in that case.
246 Vec::new()
247 };
248
249 cx.spawn(async move |cx| {
250 let response = conn
251 .new_session(acp::NewSessionRequest { mcp_servers, cwd, meta: None })
252 .await
253 .map_err(|err| {
254 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
255 let mut error = AuthRequired::new();
256
257 if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
258 error = error.with_description(err.message);
259 }
260
261 anyhow!(error)
262 } else {
263 anyhow!(err)
264 }
265 })?;
266
267 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
268 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
269
270 if let Some(default_mode) = default_mode {
271 if let Some(modes) = modes.as_ref() {
272 let mut modes_ref = modes.borrow_mut();
273 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
274
275 if has_mode {
276 let initial_mode_id = modes_ref.current_mode_id.clone();
277
278 cx.spawn({
279 let default_mode = default_mode.clone();
280 let session_id = response.session_id.clone();
281 let modes = modes.clone();
282 async move |_| {
283 let result = conn.set_session_mode(acp::SetSessionModeRequest {
284 session_id,
285 mode_id: default_mode,
286 meta: None,
287 })
288 .await.log_err();
289
290 if result.is_none() {
291 modes.borrow_mut().current_mode_id = initial_mode_id;
292 }
293 }
294 }).detach();
295
296 modes_ref.current_mode_id = default_mode;
297 } else {
298 let available_modes = modes_ref
299 .available_modes
300 .iter()
301 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
302 .collect::<Vec<_>>()
303 .join("\n");
304
305 log::warn!(
306 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
307 );
308 }
309 } else {
310 log::warn!(
311 "`{name}` does not support modes, but `default_mode` was set in settings.",
312 );
313 }
314 }
315
316 let session_id = response.session_id;
317 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
318 let thread = cx.new(|cx| {
319 AcpThread::new(
320 self.server_name.clone(),
321 self.clone(),
322 project,
323 action_log,
324 session_id.clone(),
325 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
326 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
327 cx,
328 )
329 })?;
330
331
332 let session = AcpSession {
333 thread: thread.downgrade(),
334 suppress_abort_err: false,
335 session_modes: modes,
336 models,
337 };
338 sessions.borrow_mut().insert(session_id, session);
339
340 Ok(thread)
341 })
342 }
343
344 fn auth_methods(&self) -> &[acp::AuthMethod] {
345 &self.auth_methods
346 }
347
348 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
349 let conn = self.connection.clone();
350 cx.foreground_executor().spawn(async move {
351 conn.authenticate(acp::AuthenticateRequest {
352 method_id: method_id.clone(),
353 meta: None,
354 })
355 .await?;
356
357 Ok(())
358 })
359 }
360
361 fn prompt(
362 &self,
363 _id: Option<acp_thread::UserMessageId>,
364 params: acp::PromptRequest,
365 cx: &mut App,
366 ) -> Task<Result<acp::PromptResponse>> {
367 let conn = self.connection.clone();
368 let sessions = self.sessions.clone();
369 let session_id = params.session_id.clone();
370 cx.foreground_executor().spawn(async move {
371 let result = conn.prompt(params).await;
372
373 let mut suppress_abort_err = false;
374
375 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
376 suppress_abort_err = session.suppress_abort_err;
377 session.suppress_abort_err = false;
378 }
379
380 match result {
381 Ok(response) => Ok(response),
382 Err(err) => {
383 if err.code != acp::ErrorCode::INTERNAL_ERROR.code {
384 // Intercept Unauthorized to trigger auth UI instead of retrying
385 if let Some(data) = &err.data {
386 #[derive(Deserialize)]
387 #[serde(deny_unknown_fields)]
388 struct ErrorDetails {
389 details: Box<str>,
390 }
391 if let Ok(ErrorDetails { details }) =
392 serde_json::from_value::<ErrorDetails>(data.clone())
393 {
394 if details.contains("401") || details.contains("Unauthorized") {
395 return Err(anyhow!(acp_thread::AuthRequired::new()));
396 }
397 }
398 }
399 if err.message.contains("401") || err.message.contains("Unauthorized") {
400 return Err(anyhow!(acp::Error::auth_required()));
401 }
402 anyhow::bail!(err)
403 }
404
405 let Some(data) = &err.data else {
406 // INTERNAL_ERROR without data but Unauthorized in the message
407 if err.message.contains("401") || err.message.contains("Unauthorized") {
408 return Err(anyhow!(acp::Error::auth_required()));
409 } else {
410 anyhow::bail!(err)
411 }
412 };
413
414 // Temporary workaround until the following PR is generally available:
415 // https://github.com/google-gemini/gemini-cli/pull/6656
416
417 #[derive(Deserialize)]
418 #[serde(deny_unknown_fields)]
419 struct ErrorDetails {
420 details: Box<str>,
421 }
422
423 match serde_json::from_value(data.clone()) {
424 Ok(ErrorDetails { details }) => {
425 if suppress_abort_err
426 && (details.contains("This operation was aborted")
427 || details.contains("The user aborted a request"))
428 {
429 Ok(acp::PromptResponse {
430 stop_reason: acp::StopReason::Cancelled,
431 meta: None,
432 })
433 } else if details.contains("401") || details.contains("Unauthorized") {
434 Err(anyhow!(acp::Error::auth_required()))
435 } else {
436 Err(anyhow!(details))
437 }
438 }
439 Err(_) => {
440 let msg = err.message.as_str();
441 if msg.contains("401") || msg.contains("Unauthorized") {
442 Err(anyhow!(acp_thread::AuthRequired::new()))
443 } else {
444 Err(anyhow!(err))
445 }
446 }
447 }
448 }
449 }
450 })
451 }
452
453 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
454 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
455 session.suppress_abort_err = true;
456 }
457 let conn = self.connection.clone();
458 let params = acp::CancelNotification {
459 session_id: session_id.clone(),
460 meta: None,
461 };
462 cx.foreground_executor()
463 .spawn(async move { conn.cancel(params).await })
464 .detach();
465 }
466
467 fn session_modes(
468 &self,
469 session_id: &acp::SessionId,
470 _cx: &App,
471 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
472 let sessions = self.sessions.clone();
473 let sessions_ref = sessions.borrow();
474 let Some(session) = sessions_ref.get(session_id) else {
475 return None;
476 };
477
478 if let Some(modes) = session.session_modes.as_ref() {
479 Some(Rc::new(AcpSessionModes {
480 connection: self.connection.clone(),
481 session_id: session_id.clone(),
482 state: modes.clone(),
483 }) as _)
484 } else {
485 None
486 }
487 }
488
489 fn model_selector(
490 &self,
491 session_id: &acp::SessionId,
492 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
493 let sessions = self.sessions.clone();
494 let sessions_ref = sessions.borrow();
495 let Some(session) = sessions_ref.get(session_id) else {
496 return None;
497 };
498
499 if let Some(models) = session.models.as_ref() {
500 Some(Rc::new(AcpModelSelector::new(
501 session_id.clone(),
502 self.connection.clone(),
503 models.clone(),
504 )) as _)
505 } else {
506 None
507 }
508 }
509
510 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
511 self
512 }
513}
514
515struct AcpSessionModes {
516 session_id: acp::SessionId,
517 connection: Rc<acp::ClientSideConnection>,
518 state: Rc<RefCell<acp::SessionModeState>>,
519}
520
521impl acp_thread::AgentSessionModes for AcpSessionModes {
522 fn current_mode(&self) -> acp::SessionModeId {
523 self.state.borrow().current_mode_id.clone()
524 }
525
526 fn all_modes(&self) -> Vec<acp::SessionMode> {
527 self.state.borrow().available_modes.clone()
528 }
529
530 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
531 let connection = self.connection.clone();
532 let session_id = self.session_id.clone();
533 let old_mode_id;
534 {
535 let mut state = self.state.borrow_mut();
536 old_mode_id = state.current_mode_id.clone();
537 state.current_mode_id = mode_id.clone();
538 };
539 let state = self.state.clone();
540 cx.foreground_executor().spawn(async move {
541 let result = connection
542 .set_session_mode(acp::SetSessionModeRequest {
543 session_id,
544 mode_id,
545 meta: None,
546 })
547 .await;
548
549 if result.is_err() {
550 state.borrow_mut().current_mode_id = old_mode_id;
551 }
552
553 result?;
554
555 Ok(())
556 })
557 }
558}
559
560struct AcpModelSelector {
561 session_id: acp::SessionId,
562 connection: Rc<acp::ClientSideConnection>,
563 state: Rc<RefCell<acp::SessionModelState>>,
564}
565
566impl AcpModelSelector {
567 fn new(
568 session_id: acp::SessionId,
569 connection: Rc<acp::ClientSideConnection>,
570 state: Rc<RefCell<acp::SessionModelState>>,
571 ) -> Self {
572 Self {
573 session_id,
574 connection,
575 state,
576 }
577 }
578}
579
580impl acp_thread::AgentModelSelector for AcpModelSelector {
581 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
582 Task::ready(Ok(acp_thread::AgentModelList::Flat(
583 self.state
584 .borrow()
585 .available_models
586 .clone()
587 .into_iter()
588 .map(acp_thread::AgentModelInfo::from)
589 .collect(),
590 )))
591 }
592
593 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
594 let connection = self.connection.clone();
595 let session_id = self.session_id.clone();
596 let old_model_id;
597 {
598 let mut state = self.state.borrow_mut();
599 old_model_id = state.current_model_id.clone();
600 state.current_model_id = model_id.clone();
601 };
602 let state = self.state.clone();
603 cx.foreground_executor().spawn(async move {
604 let result = connection
605 .set_session_model(acp::SetSessionModelRequest {
606 session_id,
607 model_id,
608 meta: None,
609 })
610 .await;
611
612 if result.is_err() {
613 state.borrow_mut().current_model_id = old_model_id;
614 }
615
616 result?;
617
618 Ok(())
619 })
620 }
621
622 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
623 let state = self.state.borrow();
624 Task::ready(
625 state
626 .available_models
627 .iter()
628 .find(|m| m.model_id == state.current_model_id)
629 .cloned()
630 .map(acp_thread::AgentModelInfo::from)
631 .ok_or_else(|| anyhow::anyhow!("Model not found")),
632 )
633 }
634}
635
636struct ClientDelegate {
637 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
638 cx: AsyncApp,
639}
640
641#[async_trait::async_trait(?Send)]
642impl acp::Client for ClientDelegate {
643 async fn request_permission(
644 &self,
645 arguments: acp::RequestPermissionRequest,
646 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
647 let respect_always_allow_setting;
648 let thread;
649 {
650 let sessions_ref = self.sessions.borrow();
651 let session = sessions_ref
652 .get(&arguments.session_id)
653 .context("Failed to get session")?;
654 respect_always_allow_setting = session.session_modes.is_none();
655 thread = session.thread.clone();
656 }
657
658 let cx = &mut self.cx.clone();
659
660 let task = thread.update(cx, |thread, cx| {
661 thread.request_tool_call_authorization(
662 arguments.tool_call,
663 arguments.options,
664 respect_always_allow_setting,
665 cx,
666 )
667 })??;
668
669 let outcome = task.await;
670
671 Ok(acp::RequestPermissionResponse {
672 outcome,
673 meta: None,
674 })
675 }
676
677 async fn write_text_file(
678 &self,
679 arguments: acp::WriteTextFileRequest,
680 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
681 let cx = &mut self.cx.clone();
682 let task = self
683 .session_thread(&arguments.session_id)?
684 .update(cx, |thread, cx| {
685 thread.write_text_file(arguments.path, arguments.content, cx)
686 })?;
687
688 task.await?;
689
690 Ok(Default::default())
691 }
692
693 async fn read_text_file(
694 &self,
695 arguments: acp::ReadTextFileRequest,
696 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
697 let task = self.session_thread(&arguments.session_id)?.update(
698 &mut self.cx.clone(),
699 |thread, cx| {
700 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
701 },
702 )?;
703
704 let content = task.await?;
705
706 Ok(acp::ReadTextFileResponse {
707 content,
708 meta: None,
709 })
710 }
711
712 async fn session_notification(
713 &self,
714 notification: acp::SessionNotification,
715 ) -> Result<(), acp::Error> {
716 let sessions = self.sessions.borrow();
717 let session = sessions
718 .get(¬ification.session_id)
719 .context("Failed to get session")?;
720
721 if let acp::SessionUpdate::CurrentModeUpdate { current_mode_id } = ¬ification.update {
722 if let Some(session_modes) = &session.session_modes {
723 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
724 } else {
725 log::error!(
726 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
727 );
728 }
729 }
730
731 session.thread.update(&mut self.cx.clone(), |thread, cx| {
732 thread.handle_session_update(notification.update, cx)
733 })??;
734
735 Ok(())
736 }
737
738 async fn create_terminal(
739 &self,
740 args: acp::CreateTerminalRequest,
741 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
742 let terminal = self
743 .session_thread(&args.session_id)?
744 .update(&mut self.cx.clone(), |thread, cx| {
745 thread.create_terminal(
746 args.command,
747 args.args,
748 args.env,
749 args.cwd,
750 args.output_byte_limit,
751 cx,
752 )
753 })?
754 .await?;
755 Ok(
756 terminal.read_with(&self.cx, |terminal, _| acp::CreateTerminalResponse {
757 terminal_id: terminal.id().clone(),
758 meta: None,
759 })?,
760 )
761 }
762
763 async fn kill_terminal_command(
764 &self,
765 args: acp::KillTerminalCommandRequest,
766 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
767 self.session_thread(&args.session_id)?
768 .update(&mut self.cx.clone(), |thread, cx| {
769 thread.kill_terminal(args.terminal_id, cx)
770 })??;
771
772 Ok(Default::default())
773 }
774
775 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
776 Err(acp::Error::method_not_found())
777 }
778
779 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
780 Err(acp::Error::method_not_found())
781 }
782
783 async fn release_terminal(
784 &self,
785 args: acp::ReleaseTerminalRequest,
786 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
787 self.session_thread(&args.session_id)?
788 .update(&mut self.cx.clone(), |thread, cx| {
789 thread.release_terminal(args.terminal_id, cx)
790 })??;
791
792 Ok(Default::default())
793 }
794
795 async fn terminal_output(
796 &self,
797 args: acp::TerminalOutputRequest,
798 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
799 self.session_thread(&args.session_id)?
800 .read_with(&mut self.cx.clone(), |thread, cx| {
801 let out = thread
802 .terminal(args.terminal_id)?
803 .read(cx)
804 .current_output(cx);
805
806 Ok(out)
807 })?
808 }
809
810 async fn wait_for_terminal_exit(
811 &self,
812 args: acp::WaitForTerminalExitRequest,
813 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
814 let exit_status = self
815 .session_thread(&args.session_id)?
816 .update(&mut self.cx.clone(), |thread, cx| {
817 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
818 })??
819 .await;
820
821 Ok(acp::WaitForTerminalExitResponse {
822 exit_status,
823 meta: None,
824 })
825 }
826}
827
828impl ClientDelegate {
829 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
830 let sessions = self.sessions.borrow();
831 sessions
832 .get(session_id)
833 .context("Failed to get session")
834 .map(|session| session.thread.clone())
835 }
836}