1use acp_thread::AgentConnection;
2use acp_tools::AcpConnectionRegistry;
3use action_log::ActionLog;
4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
5use anyhow::anyhow;
6use collections::HashMap;
7use futures::AsyncBufReadExt as _;
8use futures::io::BufReader;
9use project::Project;
10use project::agent_server_store::AgentServerCommand;
11use serde::Deserialize;
12use settings::{Settings as _, SettingsLocation};
13use task::Shell;
14use util::{ResultExt as _, get_default_system_shell_preferring_bash};
15
16use std::path::PathBuf;
17use std::{any::Any, cell::RefCell};
18use std::{path::Path, rc::Rc};
19use thiserror::Error;
20
21use anyhow::{Context as _, Result};
22use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
23
24use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
25use terminal::TerminalBuilder;
26use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
27
28#[derive(Debug, Error)]
29#[error("Unsupported version")]
30pub struct UnsupportedVersion;
31
32pub struct AcpConnection {
33 server_name: SharedString,
34 connection: Rc<acp::ClientSideConnection>,
35 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
36 auth_methods: Vec<acp::AuthMethod>,
37 agent_capabilities: acp::AgentCapabilities,
38 default_mode: Option<acp::SessionModeId>,
39 root_dir: PathBuf,
40 // NB: Don't move this into the wait_task, since we need to ensure the process is
41 // killed on drop (setting kill_on_drop on the command seems to not always work).
42 child: smol::process::Child,
43 _io_task: Task<Result<()>>,
44 _wait_task: Task<Result<()>>,
45 _stderr_task: Task<Result<()>>,
46}
47
48pub struct AcpSession {
49 thread: WeakEntity<AcpThread>,
50 suppress_abort_err: bool,
51 models: Option<Rc<RefCell<acp::SessionModelState>>>,
52 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
53}
54
55pub async fn connect(
56 server_name: SharedString,
57 command: AgentServerCommand,
58 root_dir: &Path,
59 default_mode: Option<acp::SessionModeId>,
60 is_remote: bool,
61 cx: &mut AsyncApp,
62) -> Result<Rc<dyn AgentConnection>> {
63 let conn = AcpConnection::stdio(
64 server_name,
65 command.clone(),
66 root_dir,
67 default_mode,
68 is_remote,
69 cx,
70 )
71 .await?;
72 Ok(Rc::new(conn) as _)
73}
74
75const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::V1;
76
77impl AcpConnection {
78 pub async fn stdio(
79 server_name: SharedString,
80 command: AgentServerCommand,
81 root_dir: &Path,
82 default_mode: Option<acp::SessionModeId>,
83 is_remote: bool,
84 cx: &mut AsyncApp,
85 ) -> Result<Self> {
86 let mut child = util::command::new_smol_command(&command.path);
87 child
88 .args(command.args.iter().map(|arg| arg.as_str()))
89 .envs(command.env.iter().flatten())
90 .stdin(std::process::Stdio::piped())
91 .stdout(std::process::Stdio::piped())
92 .stderr(std::process::Stdio::piped());
93 if !is_remote {
94 child.current_dir(root_dir);
95 }
96 let mut child = child.spawn()?;
97
98 let stdout = child.stdout.take().context("Failed to take stdout")?;
99 let stdin = child.stdin.take().context("Failed to take stdin")?;
100 let stderr = child.stderr.take().context("Failed to take stderr")?;
101 log::info!(
102 "Spawning external agent server: {:?}, {:?}",
103 command.path,
104 command.args
105 );
106 log::trace!("Spawned (pid: {})", child.id());
107
108 let sessions = Rc::new(RefCell::new(HashMap::default()));
109
110 let client = ClientDelegate {
111 sessions: sessions.clone(),
112 cx: cx.clone(),
113 };
114 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
115 let foreground_executor = cx.foreground_executor().clone();
116 move |fut| {
117 foreground_executor.spawn(fut).detach();
118 }
119 });
120
121 let io_task = cx.background_spawn(io_task);
122
123 let stderr_task = cx.background_spawn(async move {
124 let mut stderr = BufReader::new(stderr);
125 let mut line = String::new();
126 while let Ok(n) = stderr.read_line(&mut line).await
127 && n > 0
128 {
129 log::warn!("agent stderr: {}", &line);
130 line.clear();
131 }
132 Ok(())
133 });
134
135 let wait_task = cx.spawn({
136 let sessions = sessions.clone();
137 let status_fut = child.status();
138 async move |cx| {
139 let status = status_fut.await?;
140
141 for session in sessions.borrow().values() {
142 session
143 .thread
144 .update(cx, |thread, cx| {
145 thread.emit_load_error(LoadError::Exited { status }, cx)
146 })
147 .ok();
148 }
149
150 anyhow::Ok(())
151 }
152 });
153
154 let connection = Rc::new(connection);
155
156 cx.update(|cx| {
157 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
158 registry.set_active_connection(server_name.clone(), &connection, cx)
159 });
160 })?;
161
162 let response = connection
163 .initialize(acp::InitializeRequest {
164 protocol_version: acp::VERSION,
165 client_capabilities: acp::ClientCapabilities {
166 fs: acp::FileSystemCapability {
167 read_text_file: true,
168 write_text_file: true,
169 meta: None,
170 },
171 terminal: true,
172 meta: Some(serde_json::json!({
173 // Experimental: Allow for rendering terminal output from the agents
174 "terminal_output": true,
175 })),
176 },
177 meta: None,
178 })
179 .await?;
180
181 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
182 return Err(UnsupportedVersion.into());
183 }
184
185 Ok(Self {
186 auth_methods: response.auth_methods,
187 root_dir: root_dir.to_owned(),
188 connection,
189 server_name,
190 sessions,
191 agent_capabilities: response.agent_capabilities,
192 default_mode,
193 _io_task: io_task,
194 _wait_task: wait_task,
195 _stderr_task: stderr_task,
196 child,
197 })
198 }
199
200 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
201 &self.agent_capabilities.prompt_capabilities
202 }
203
204 pub fn root_dir(&self) -> &Path {
205 &self.root_dir
206 }
207}
208
209impl Drop for AcpConnection {
210 fn drop(&mut self) {
211 // See the comment on the child field.
212 self.child.kill().log_err();
213 }
214}
215
216impl AgentConnection for AcpConnection {
217 fn new_thread(
218 self: Rc<Self>,
219 project: Entity<Project>,
220 cwd: &Path,
221 cx: &mut App,
222 ) -> Task<Result<Entity<AcpThread>>> {
223 let name = self.server_name.clone();
224 let conn = self.connection.clone();
225 let sessions = self.sessions.clone();
226 let default_mode = self.default_mode.clone();
227 let cwd = cwd.to_path_buf();
228 let context_server_store = project.read(cx).context_server_store().read(cx);
229 let mcp_servers = if project.read(cx).is_local() {
230 context_server_store
231 .configured_server_ids()
232 .iter()
233 .filter_map(|id| {
234 let configuration = context_server_store.configuration_for_server(id)?;
235 let command = configuration.command();
236 Some(acp::McpServer::Stdio {
237 name: id.0.to_string(),
238 command: command.path.clone(),
239 args: command.args.clone(),
240 env: if let Some(env) = command.env.as_ref() {
241 env.iter()
242 .map(|(name, value)| acp::EnvVariable {
243 name: name.clone(),
244 value: value.clone(),
245 meta: None,
246 })
247 .collect()
248 } else {
249 vec![]
250 },
251 })
252 })
253 .collect()
254 } else {
255 // In SSH projects, the external agent is running on the remote
256 // machine, and currently we only run MCP servers on the local
257 // machine. So don't pass any MCP servers to the agent in that case.
258 Vec::new()
259 };
260
261 cx.spawn(async move |cx| {
262 let response = conn
263 .new_session(acp::NewSessionRequest { mcp_servers, cwd, meta: None })
264 .await
265 .map_err(|err| {
266 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
267 let mut error = AuthRequired::new();
268
269 if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
270 error = error.with_description(err.message);
271 }
272
273 anyhow!(error)
274 } else {
275 anyhow!(err)
276 }
277 })?;
278
279 let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
280 let models = response.models.map(|models| Rc::new(RefCell::new(models)));
281
282 if let Some(default_mode) = default_mode {
283 if let Some(modes) = modes.as_ref() {
284 let mut modes_ref = modes.borrow_mut();
285 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
286
287 if has_mode {
288 let initial_mode_id = modes_ref.current_mode_id.clone();
289
290 cx.spawn({
291 let default_mode = default_mode.clone();
292 let session_id = response.session_id.clone();
293 let modes = modes.clone();
294 async move |_| {
295 let result = conn.set_session_mode(acp::SetSessionModeRequest {
296 session_id,
297 mode_id: default_mode,
298 meta: None,
299 })
300 .await.log_err();
301
302 if result.is_none() {
303 modes.borrow_mut().current_mode_id = initial_mode_id;
304 }
305 }
306 }).detach();
307
308 modes_ref.current_mode_id = default_mode;
309 } else {
310 let available_modes = modes_ref
311 .available_modes
312 .iter()
313 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
314 .collect::<Vec<_>>()
315 .join("\n");
316
317 log::warn!(
318 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
319 );
320 }
321 } else {
322 log::warn!(
323 "`{name}` does not support modes, but `default_mode` was set in settings.",
324 );
325 }
326 }
327
328 let session_id = response.session_id;
329 let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
330 let thread = cx.new(|cx| {
331 AcpThread::new(
332 self.server_name.clone(),
333 self.clone(),
334 project,
335 action_log,
336 session_id.clone(),
337 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
338 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
339 cx,
340 )
341 })?;
342
343
344 let session = AcpSession {
345 thread: thread.downgrade(),
346 suppress_abort_err: false,
347 session_modes: modes,
348 models,
349 };
350 sessions.borrow_mut().insert(session_id, session);
351
352 Ok(thread)
353 })
354 }
355
356 fn auth_methods(&self) -> &[acp::AuthMethod] {
357 &self.auth_methods
358 }
359
360 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
361 let conn = self.connection.clone();
362 cx.foreground_executor().spawn(async move {
363 conn.authenticate(acp::AuthenticateRequest {
364 method_id: method_id.clone(),
365 meta: None,
366 })
367 .await?;
368
369 Ok(())
370 })
371 }
372
373 fn prompt(
374 &self,
375 _id: Option<acp_thread::UserMessageId>,
376 params: acp::PromptRequest,
377 cx: &mut App,
378 ) -> Task<Result<acp::PromptResponse>> {
379 let conn = self.connection.clone();
380 let sessions = self.sessions.clone();
381 let session_id = params.session_id.clone();
382 cx.foreground_executor().spawn(async move {
383 let result = conn.prompt(params).await;
384
385 let mut suppress_abort_err = false;
386
387 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
388 suppress_abort_err = session.suppress_abort_err;
389 session.suppress_abort_err = false;
390 }
391
392 match result {
393 Ok(response) => Ok(response),
394 Err(err) => {
395 if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
396 return Err(anyhow!(acp::Error::auth_required()));
397 }
398
399 if err.code != ErrorCode::INTERNAL_ERROR.code {
400 anyhow::bail!(err)
401 }
402
403 let Some(data) = &err.data else {
404 anyhow::bail!(err)
405 };
406
407 // Temporary workaround until the following PR is generally available:
408 // https://github.com/google-gemini/gemini-cli/pull/6656
409
410 #[derive(Deserialize)]
411 #[serde(deny_unknown_fields)]
412 struct ErrorDetails {
413 details: Box<str>,
414 }
415
416 match serde_json::from_value(data.clone()) {
417 Ok(ErrorDetails { details }) => {
418 if suppress_abort_err
419 && (details.contains("This operation was aborted")
420 || details.contains("The user aborted a request"))
421 {
422 Ok(acp::PromptResponse {
423 stop_reason: acp::StopReason::Cancelled,
424 meta: None,
425 })
426 } else {
427 Err(anyhow!(details))
428 }
429 }
430 Err(_) => Err(anyhow!(err)),
431 }
432 }
433 }
434 })
435 }
436
437 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
438 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
439 session.suppress_abort_err = true;
440 }
441 let conn = self.connection.clone();
442 let params = acp::CancelNotification {
443 session_id: session_id.clone(),
444 meta: None,
445 };
446 cx.foreground_executor()
447 .spawn(async move { conn.cancel(params).await })
448 .detach();
449 }
450
451 fn session_modes(
452 &self,
453 session_id: &acp::SessionId,
454 _cx: &App,
455 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
456 let sessions = self.sessions.clone();
457 let sessions_ref = sessions.borrow();
458 let Some(session) = sessions_ref.get(session_id) else {
459 return None;
460 };
461
462 if let Some(modes) = session.session_modes.as_ref() {
463 Some(Rc::new(AcpSessionModes {
464 connection: self.connection.clone(),
465 session_id: session_id.clone(),
466 state: modes.clone(),
467 }) as _)
468 } else {
469 None
470 }
471 }
472
473 fn model_selector(
474 &self,
475 session_id: &acp::SessionId,
476 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
477 let sessions = self.sessions.clone();
478 let sessions_ref = sessions.borrow();
479 let Some(session) = sessions_ref.get(session_id) else {
480 return None;
481 };
482
483 if let Some(models) = session.models.as_ref() {
484 Some(Rc::new(AcpModelSelector::new(
485 session_id.clone(),
486 self.connection.clone(),
487 models.clone(),
488 )) as _)
489 } else {
490 None
491 }
492 }
493
494 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
495 self
496 }
497}
498
499struct AcpSessionModes {
500 session_id: acp::SessionId,
501 connection: Rc<acp::ClientSideConnection>,
502 state: Rc<RefCell<acp::SessionModeState>>,
503}
504
505impl acp_thread::AgentSessionModes for AcpSessionModes {
506 fn current_mode(&self) -> acp::SessionModeId {
507 self.state.borrow().current_mode_id.clone()
508 }
509
510 fn all_modes(&self) -> Vec<acp::SessionMode> {
511 self.state.borrow().available_modes.clone()
512 }
513
514 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
515 let connection = self.connection.clone();
516 let session_id = self.session_id.clone();
517 let old_mode_id;
518 {
519 let mut state = self.state.borrow_mut();
520 old_mode_id = state.current_mode_id.clone();
521 state.current_mode_id = mode_id.clone();
522 };
523 let state = self.state.clone();
524 cx.foreground_executor().spawn(async move {
525 let result = connection
526 .set_session_mode(acp::SetSessionModeRequest {
527 session_id,
528 mode_id,
529 meta: None,
530 })
531 .await;
532
533 if result.is_err() {
534 state.borrow_mut().current_mode_id = old_mode_id;
535 }
536
537 result?;
538
539 Ok(())
540 })
541 }
542}
543
544struct AcpModelSelector {
545 session_id: acp::SessionId,
546 connection: Rc<acp::ClientSideConnection>,
547 state: Rc<RefCell<acp::SessionModelState>>,
548}
549
550impl AcpModelSelector {
551 fn new(
552 session_id: acp::SessionId,
553 connection: Rc<acp::ClientSideConnection>,
554 state: Rc<RefCell<acp::SessionModelState>>,
555 ) -> Self {
556 Self {
557 session_id,
558 connection,
559 state,
560 }
561 }
562}
563
564impl acp_thread::AgentModelSelector for AcpModelSelector {
565 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
566 Task::ready(Ok(acp_thread::AgentModelList::Flat(
567 self.state
568 .borrow()
569 .available_models
570 .clone()
571 .into_iter()
572 .map(acp_thread::AgentModelInfo::from)
573 .collect(),
574 )))
575 }
576
577 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
578 let connection = self.connection.clone();
579 let session_id = self.session_id.clone();
580 let old_model_id;
581 {
582 let mut state = self.state.borrow_mut();
583 old_model_id = state.current_model_id.clone();
584 state.current_model_id = model_id.clone();
585 };
586 let state = self.state.clone();
587 cx.foreground_executor().spawn(async move {
588 let result = connection
589 .set_session_model(acp::SetSessionModelRequest {
590 session_id,
591 model_id,
592 meta: None,
593 })
594 .await;
595
596 if result.is_err() {
597 state.borrow_mut().current_model_id = old_model_id;
598 }
599
600 result?;
601
602 Ok(())
603 })
604 }
605
606 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
607 let state = self.state.borrow();
608 Task::ready(
609 state
610 .available_models
611 .iter()
612 .find(|m| m.model_id == state.current_model_id)
613 .cloned()
614 .map(acp_thread::AgentModelInfo::from)
615 .ok_or_else(|| anyhow::anyhow!("Model not found")),
616 )
617 }
618}
619
620struct ClientDelegate {
621 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
622 cx: AsyncApp,
623}
624
625#[async_trait::async_trait(?Send)]
626impl acp::Client for ClientDelegate {
627 async fn request_permission(
628 &self,
629 arguments: acp::RequestPermissionRequest,
630 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
631 let respect_always_allow_setting;
632 let thread;
633 {
634 let sessions_ref = self.sessions.borrow();
635 let session = sessions_ref
636 .get(&arguments.session_id)
637 .context("Failed to get session")?;
638 respect_always_allow_setting = session.session_modes.is_none();
639 thread = session.thread.clone();
640 }
641
642 let cx = &mut self.cx.clone();
643
644 let task = thread.update(cx, |thread, cx| {
645 thread.request_tool_call_authorization(
646 arguments.tool_call,
647 arguments.options,
648 respect_always_allow_setting,
649 cx,
650 )
651 })??;
652
653 let outcome = task.await;
654
655 Ok(acp::RequestPermissionResponse {
656 outcome,
657 meta: None,
658 })
659 }
660
661 async fn write_text_file(
662 &self,
663 arguments: acp::WriteTextFileRequest,
664 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
665 let cx = &mut self.cx.clone();
666 let task = self
667 .session_thread(&arguments.session_id)?
668 .update(cx, |thread, cx| {
669 thread.write_text_file(arguments.path, arguments.content, cx)
670 })?;
671
672 task.await?;
673
674 Ok(Default::default())
675 }
676
677 async fn read_text_file(
678 &self,
679 arguments: acp::ReadTextFileRequest,
680 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
681 let task = self.session_thread(&arguments.session_id)?.update(
682 &mut self.cx.clone(),
683 |thread, cx| {
684 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
685 },
686 )?;
687
688 let content = task.await?;
689
690 Ok(acp::ReadTextFileResponse {
691 content,
692 meta: None,
693 })
694 }
695
696 async fn session_notification(
697 &self,
698 notification: acp::SessionNotification,
699 ) -> Result<(), acp::Error> {
700 let sessions = self.sessions.borrow();
701 let session = sessions
702 .get(¬ification.session_id)
703 .context("Failed to get session")?;
704
705 if let acp::SessionUpdate::CurrentModeUpdate { current_mode_id } = ¬ification.update {
706 if let Some(session_modes) = &session.session_modes {
707 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
708 } else {
709 log::error!(
710 "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
711 );
712 }
713 }
714
715 // Clone so we can inspect meta both before and after handing off to the thread
716 let update_clone = notification.update.clone();
717
718 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
719 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
720 if let Some(meta) = &tc.meta {
721 if let Some(terminal_info) = meta.get("terminal_info") {
722 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
723 {
724 let terminal_id = acp::TerminalId(id_str.into());
725 let cwd = terminal_info
726 .get("cwd")
727 .and_then(|v| v.as_str().map(PathBuf::from));
728
729 // Create a minimal display-only lower-level terminal and register it.
730 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
731 let builder = TerminalBuilder::new_display_only(
732 CursorShape::default(),
733 AlternateScroll::On,
734 None,
735 0,
736 )?;
737 let lower = cx.new(|cx| builder.subscribe(cx));
738 thread.on_terminal_provider_event(
739 TerminalProviderEvent::Created {
740 terminal_id: terminal_id.clone(),
741 label: tc.title.clone(),
742 cwd,
743 output_byte_limit: None,
744 terminal: lower,
745 },
746 cx,
747 );
748 anyhow::Ok(())
749 });
750 }
751 }
752 }
753 }
754
755 // Forward the update to the acp_thread as usual.
756 session.thread.update(&mut self.cx.clone(), |thread, cx| {
757 thread.handle_session_update(notification.update.clone(), cx)
758 })??;
759
760 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
761 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
762 if let Some(meta) = &tcu.meta {
763 if let Some(term_out) = meta.get("terminal_output") {
764 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
765 let terminal_id = acp::TerminalId(id_str.into());
766 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
767 let data = s.as_bytes().to_vec();
768 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
769 thread.on_terminal_provider_event(
770 TerminalProviderEvent::Output {
771 terminal_id: terminal_id.clone(),
772 data,
773 },
774 cx,
775 );
776 });
777 }
778 }
779 }
780
781 // terminal_exit
782 if let Some(term_exit) = meta.get("terminal_exit") {
783 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
784 let terminal_id = acp::TerminalId(id_str.into());
785 let status = acp::TerminalExitStatus {
786 exit_code: term_exit
787 .get("exit_code")
788 .and_then(|v| v.as_u64())
789 .map(|i| i as u32),
790 signal: term_exit
791 .get("signal")
792 .and_then(|v| v.as_str().map(|s| s.to_string())),
793 meta: None,
794 };
795 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
796 thread.on_terminal_provider_event(
797 TerminalProviderEvent::Exit {
798 terminal_id: terminal_id.clone(),
799 status,
800 },
801 cx,
802 );
803 });
804 }
805 }
806 }
807 }
808
809 Ok(())
810 }
811
812 async fn create_terminal(
813 &self,
814 args: acp::CreateTerminalRequest,
815 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
816 let thread = self.session_thread(&args.session_id)?;
817 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
818
819 let mut env = if let Some(dir) = &args.cwd {
820 project
821 .update(&mut self.cx.clone(), |project, cx| {
822 let worktree = project.find_worktree(dir.as_path(), cx);
823 let shell = TerminalSettings::get(
824 worktree.as_ref().map(|(worktree, path)| SettingsLocation {
825 worktree_id: worktree.read(cx).id(),
826 path: &path,
827 }),
828 cx,
829 )
830 .shell
831 .clone();
832 project.directory_environment(&shell, dir.clone().into(), cx)
833 })?
834 .await
835 .unwrap_or_default()
836 } else {
837 Default::default()
838 };
839 // Disables paging for `git` and hopefully other commands
840 env.insert("PAGER".into(), "".into());
841 for var in args.env {
842 env.insert(var.name, var.value);
843 }
844
845 // Use remote shell or default system shell, as appropriate
846 let shell = project
847 .update(&mut self.cx.clone(), |project, cx| {
848 project
849 .remote_client()
850 .and_then(|r| r.read(cx).default_system_shell())
851 .map(Shell::Program)
852 })?
853 .unwrap_or_else(|| Shell::Program(get_default_system_shell_preferring_bash()));
854 let is_windows = project
855 .read_with(&self.cx, |project, cx| project.path_style(cx).is_windows())
856 .unwrap_or(cfg!(windows));
857 let (task_command, task_args) = task::ShellBuilder::new(&shell, is_windows)
858 .redirect_stdin_to_dev_null()
859 .build(Some(args.command.clone()), &args.args);
860
861 let terminal_entity = project
862 .update(&mut self.cx.clone(), |project, cx| {
863 project.create_terminal_task(
864 task::SpawnInTerminal {
865 command: Some(task_command),
866 args: task_args,
867 cwd: args.cwd.clone(),
868 env,
869 ..Default::default()
870 },
871 cx,
872 )
873 })?
874 .await?;
875
876 // Register with renderer
877 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
878 thread.register_terminal_created(
879 acp::TerminalId(uuid::Uuid::new_v4().to_string().into()),
880 format!("{} {}", args.command, args.args.join(" ")),
881 args.cwd.clone(),
882 args.output_byte_limit,
883 terminal_entity,
884 cx,
885 )
886 })?;
887 let terminal_id =
888 terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
889 Ok(acp::CreateTerminalResponse {
890 terminal_id,
891 meta: None,
892 })
893 }
894
895 async fn kill_terminal_command(
896 &self,
897 args: acp::KillTerminalCommandRequest,
898 ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
899 self.session_thread(&args.session_id)?
900 .update(&mut self.cx.clone(), |thread, cx| {
901 thread.kill_terminal(args.terminal_id, cx)
902 })??;
903
904 Ok(Default::default())
905 }
906
907 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
908 Err(acp::Error::method_not_found())
909 }
910
911 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
912 Err(acp::Error::method_not_found())
913 }
914
915 async fn release_terminal(
916 &self,
917 args: acp::ReleaseTerminalRequest,
918 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
919 self.session_thread(&args.session_id)?
920 .update(&mut self.cx.clone(), |thread, cx| {
921 thread.release_terminal(args.terminal_id, cx)
922 })??;
923
924 Ok(Default::default())
925 }
926
927 async fn terminal_output(
928 &self,
929 args: acp::TerminalOutputRequest,
930 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
931 self.session_thread(&args.session_id)?
932 .read_with(&mut self.cx.clone(), |thread, cx| {
933 let out = thread
934 .terminal(args.terminal_id)?
935 .read(cx)
936 .current_output(cx);
937
938 Ok(out)
939 })?
940 }
941
942 async fn wait_for_terminal_exit(
943 &self,
944 args: acp::WaitForTerminalExitRequest,
945 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
946 let exit_status = self
947 .session_thread(&args.session_id)?
948 .update(&mut self.cx.clone(), |thread, cx| {
949 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
950 })??
951 .await;
952
953 Ok(acp::WaitForTerminalExitResponse {
954 exit_status,
955 meta: None,
956 })
957 }
958}
959
960impl ClientDelegate {
961 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
962 let sessions = self.sessions.borrow();
963 sessions
964 .get(session_id)
965 .context("Failed to get session")
966 .map(|session| session.thread.clone())
967 }
968}