acp.rs

  1use acp_thread::AgentConnection;
  2use acp_tools::AcpConnectionRegistry;
  3use action_log::ActionLog;
  4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
  5use anyhow::anyhow;
  6use collections::HashMap;
  7use futures::AsyncBufReadExt as _;
  8use futures::io::BufReader;
  9use project::Project;
 10use project::agent_server_store::AgentServerCommand;
 11use serde::Deserialize;
 12use settings::{Settings as _, SettingsLocation};
 13use task::Shell;
 14use util::{ResultExt as _, get_default_system_shell_preferring_bash};
 15
 16use std::path::PathBuf;
 17use std::{any::Any, cell::RefCell};
 18use std::{path::Path, rc::Rc};
 19use thiserror::Error;
 20
 21use anyhow::{Context as _, Result};
 22use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 23
 24use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
 25use terminal::TerminalBuilder;
 26use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
 27
 28#[derive(Debug, Error)]
 29#[error("Unsupported version")]
 30pub struct UnsupportedVersion;
 31
 32pub struct AcpConnection {
 33    server_name: SharedString,
 34    connection: Rc<acp::ClientSideConnection>,
 35    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 36    auth_methods: Vec<acp::AuthMethod>,
 37    agent_capabilities: acp::AgentCapabilities,
 38    default_mode: Option<acp::SessionModeId>,
 39    root_dir: PathBuf,
 40    // NB: Don't move this into the wait_task, since we need to ensure the process is
 41    // killed on drop (setting kill_on_drop on the command seems to not always work).
 42    child: smol::process::Child,
 43    _io_task: Task<Result<(), acp::Error>>,
 44    _wait_task: Task<Result<()>>,
 45    _stderr_task: Task<Result<()>>,
 46}
 47
 48pub struct AcpSession {
 49    thread: WeakEntity<AcpThread>,
 50    suppress_abort_err: bool,
 51    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 52    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 53}
 54
 55pub async fn connect(
 56    server_name: SharedString,
 57    command: AgentServerCommand,
 58    root_dir: &Path,
 59    default_mode: Option<acp::SessionModeId>,
 60    is_remote: bool,
 61    cx: &mut AsyncApp,
 62) -> Result<Rc<dyn AgentConnection>> {
 63    let conn = AcpConnection::stdio(
 64        server_name,
 65        command.clone(),
 66        root_dir,
 67        default_mode,
 68        is_remote,
 69        cx,
 70    )
 71    .await?;
 72    Ok(Rc::new(conn) as _)
 73}
 74
 75const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::V1;
 76
 77impl AcpConnection {
 78    pub async fn stdio(
 79        server_name: SharedString,
 80        command: AgentServerCommand,
 81        root_dir: &Path,
 82        default_mode: Option<acp::SessionModeId>,
 83        is_remote: bool,
 84        cx: &mut AsyncApp,
 85    ) -> Result<Self> {
 86        let mut child = util::command::new_smol_command(&command.path);
 87        child
 88            .args(command.args.iter().map(|arg| arg.as_str()))
 89            .envs(command.env.iter().flatten())
 90            .stdin(std::process::Stdio::piped())
 91            .stdout(std::process::Stdio::piped())
 92            .stderr(std::process::Stdio::piped());
 93        if !is_remote {
 94            child.current_dir(root_dir);
 95        }
 96        let mut child = child.spawn()?;
 97
 98        let stdout = child.stdout.take().context("Failed to take stdout")?;
 99        let stdin = child.stdin.take().context("Failed to take stdin")?;
100        let stderr = child.stderr.take().context("Failed to take stderr")?;
101        log::debug!(
102            "Spawning external agent server: {:?}, {:?}",
103            command.path,
104            command.args
105        );
106        log::trace!("Spawned (pid: {})", child.id());
107
108        let sessions = Rc::new(RefCell::new(HashMap::default()));
109
110        let (release_channel, version) = cx.update(|cx| {
111            (
112                release_channel::ReleaseChannel::try_global(cx)
113                    .map(|release_channel| release_channel.display_name()),
114                release_channel::AppVersion::global(cx).to_string(),
115            )
116        })?;
117
118        let client = ClientDelegate {
119            sessions: sessions.clone(),
120            cx: cx.clone(),
121        };
122        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
123            let foreground_executor = cx.foreground_executor().clone();
124            move |fut| {
125                foreground_executor.spawn(fut).detach();
126            }
127        });
128
129        let io_task = cx.background_spawn(io_task);
130
131        let stderr_task = cx.background_spawn(async move {
132            let mut stderr = BufReader::new(stderr);
133            let mut line = String::new();
134            while let Ok(n) = stderr.read_line(&mut line).await
135                && n > 0
136            {
137                log::warn!("agent stderr: {}", &line);
138                line.clear();
139            }
140            Ok(())
141        });
142
143        let wait_task = cx.spawn({
144            let sessions = sessions.clone();
145            let status_fut = child.status();
146            async move |cx| {
147                let status = status_fut.await?;
148
149                for session in sessions.borrow().values() {
150                    session
151                        .thread
152                        .update(cx, |thread, cx| {
153                            thread.emit_load_error(LoadError::Exited { status }, cx)
154                        })
155                        .ok();
156                }
157
158                anyhow::Ok(())
159            }
160        });
161
162        let connection = Rc::new(connection);
163
164        cx.update(|cx| {
165            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
166                registry.set_active_connection(server_name.clone(), &connection, cx)
167            });
168        })?;
169
170        let response = connection
171            .initialize(acp::InitializeRequest {
172                protocol_version: acp::VERSION,
173                client_capabilities: acp::ClientCapabilities {
174                    fs: acp::FileSystemCapability {
175                        read_text_file: true,
176                        write_text_file: true,
177                        meta: None,
178                    },
179                    terminal: true,
180                    meta: Some(serde_json::json!({
181                        // Experimental: Allow for rendering terminal output from the agents
182                        "terminal_output": true,
183                    })),
184                },
185                client_info: Some(acp::Implementation {
186                    name: "zed".to_owned(),
187                    title: release_channel.map(|c| c.to_owned()),
188                    version,
189                }),
190                meta: None,
191            })
192            .await?;
193
194        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
195            return Err(UnsupportedVersion.into());
196        }
197
198        Ok(Self {
199            auth_methods: response.auth_methods,
200            root_dir: root_dir.to_owned(),
201            connection,
202            server_name,
203            sessions,
204            agent_capabilities: response.agent_capabilities,
205            default_mode,
206            _io_task: io_task,
207            _wait_task: wait_task,
208            _stderr_task: stderr_task,
209            child,
210        })
211    }
212
213    pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
214        &self.agent_capabilities.prompt_capabilities
215    }
216
217    pub fn root_dir(&self) -> &Path {
218        &self.root_dir
219    }
220}
221
222impl Drop for AcpConnection {
223    fn drop(&mut self) {
224        // See the comment on the child field.
225        self.child.kill().log_err();
226    }
227}
228
229impl AgentConnection for AcpConnection {
230    fn new_thread(
231        self: Rc<Self>,
232        project: Entity<Project>,
233        cwd: &Path,
234        cx: &mut App,
235    ) -> Task<Result<Entity<AcpThread>>> {
236        let name = self.server_name.clone();
237        let conn = self.connection.clone();
238        let sessions = self.sessions.clone();
239        let default_mode = self.default_mode.clone();
240        let cwd = cwd.to_path_buf();
241        let context_server_store = project.read(cx).context_server_store().read(cx);
242        let mcp_servers = if project.read(cx).is_local() {
243            context_server_store
244                .configured_server_ids()
245                .iter()
246                .filter_map(|id| {
247                    let configuration = context_server_store.configuration_for_server(id)?;
248                    let command = configuration.command();
249                    Some(acp::McpServer::Stdio {
250                        name: id.0.to_string(),
251                        command: command.path.clone(),
252                        args: command.args.clone(),
253                        env: if let Some(env) = command.env.as_ref() {
254                            env.iter()
255                                .map(|(name, value)| acp::EnvVariable {
256                                    name: name.clone(),
257                                    value: value.clone(),
258                                    meta: None,
259                                })
260                                .collect()
261                        } else {
262                            vec![]
263                        },
264                    })
265                })
266                .collect()
267        } else {
268            // In SSH projects, the external agent is running on the remote
269            // machine, and currently we only run MCP servers on the local
270            // machine. So don't pass any MCP servers to the agent in that case.
271            Vec::new()
272        };
273
274        cx.spawn(async move |cx| {
275            let response = conn
276                .new_session(acp::NewSessionRequest { mcp_servers, cwd, meta: None })
277                .await
278                .map_err(|err| {
279                    if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
280                        let mut error = AuthRequired::new();
281
282                        if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
283                            error = error.with_description(err.message);
284                        }
285
286                        anyhow!(error)
287                    } else {
288                        anyhow!(err)
289                    }
290                })?;
291
292            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
293            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
294
295            if let Some(default_mode) = default_mode {
296                if let Some(modes) = modes.as_ref() {
297                    let mut modes_ref = modes.borrow_mut();
298                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
299
300                    if has_mode {
301                        let initial_mode_id = modes_ref.current_mode_id.clone();
302
303                        cx.spawn({
304                            let default_mode = default_mode.clone();
305                            let session_id = response.session_id.clone();
306                            let modes = modes.clone();
307                            async move |_| {
308                                let result = conn.set_session_mode(acp::SetSessionModeRequest {
309                                    session_id,
310                                    mode_id: default_mode,
311                                    meta: None,
312                                })
313                                .await.log_err();
314
315                                if result.is_none() {
316                                    modes.borrow_mut().current_mode_id = initial_mode_id;
317                                }
318                            }
319                        }).detach();
320
321                        modes_ref.current_mode_id = default_mode;
322                    } else {
323                        let available_modes = modes_ref
324                            .available_modes
325                            .iter()
326                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
327                            .collect::<Vec<_>>()
328                            .join("\n");
329
330                        log::warn!(
331                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
332                        );
333                    }
334                } else {
335                    log::warn!(
336                        "`{name}` does not support modes, but `default_mode` was set in settings.",
337                    );
338                }
339            }
340
341            let session_id = response.session_id;
342            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
343            let thread = cx.new(|cx| {
344                AcpThread::new(
345                    self.server_name.clone(),
346                    self.clone(),
347                    project,
348                    action_log,
349                    session_id.clone(),
350                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
351                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
352                    cx,
353                )
354            })?;
355
356
357            let session = AcpSession {
358                thread: thread.downgrade(),
359                suppress_abort_err: false,
360                session_modes: modes,
361                models,
362            };
363            sessions.borrow_mut().insert(session_id, session);
364
365            Ok(thread)
366        })
367    }
368
369    fn auth_methods(&self) -> &[acp::AuthMethod] {
370        &self.auth_methods
371    }
372
373    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
374        let conn = self.connection.clone();
375        cx.foreground_executor().spawn(async move {
376            conn.authenticate(acp::AuthenticateRequest {
377                method_id: method_id.clone(),
378                meta: None,
379            })
380            .await?;
381
382            Ok(())
383        })
384    }
385
386    fn prompt(
387        &self,
388        _id: Option<acp_thread::UserMessageId>,
389        params: acp::PromptRequest,
390        cx: &mut App,
391    ) -> Task<Result<acp::PromptResponse>> {
392        let conn = self.connection.clone();
393        let sessions = self.sessions.clone();
394        let session_id = params.session_id.clone();
395        cx.foreground_executor().spawn(async move {
396            let result = conn.prompt(params).await;
397
398            let mut suppress_abort_err = false;
399
400            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
401                suppress_abort_err = session.suppress_abort_err;
402                session.suppress_abort_err = false;
403            }
404
405            match result {
406                Ok(response) => Ok(response),
407                Err(err) => {
408                    if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
409                        return Err(anyhow!(acp::Error::auth_required()));
410                    }
411
412                    if err.code != ErrorCode::INTERNAL_ERROR.code {
413                        anyhow::bail!(err)
414                    }
415
416                    let Some(data) = &err.data else {
417                        anyhow::bail!(err)
418                    };
419
420                    // Temporary workaround until the following PR is generally available:
421                    // https://github.com/google-gemini/gemini-cli/pull/6656
422
423                    #[derive(Deserialize)]
424                    #[serde(deny_unknown_fields)]
425                    struct ErrorDetails {
426                        details: Box<str>,
427                    }
428
429                    match serde_json::from_value(data.clone()) {
430                        Ok(ErrorDetails { details }) => {
431                            if suppress_abort_err
432                                && (details.contains("This operation was aborted")
433                                    || details.contains("The user aborted a request"))
434                            {
435                                Ok(acp::PromptResponse {
436                                    stop_reason: acp::StopReason::Cancelled,
437                                    meta: None,
438                                })
439                            } else {
440                                Err(anyhow!(details))
441                            }
442                        }
443                        Err(_) => Err(anyhow!(err)),
444                    }
445                }
446            }
447        })
448    }
449
450    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
451        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
452            session.suppress_abort_err = true;
453        }
454        let conn = self.connection.clone();
455        let params = acp::CancelNotification {
456            session_id: session_id.clone(),
457            meta: None,
458        };
459        cx.foreground_executor()
460            .spawn(async move { conn.cancel(params).await })
461            .detach();
462    }
463
464    fn session_modes(
465        &self,
466        session_id: &acp::SessionId,
467        _cx: &App,
468    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
469        let sessions = self.sessions.clone();
470        let sessions_ref = sessions.borrow();
471        let Some(session) = sessions_ref.get(session_id) else {
472            return None;
473        };
474
475        if let Some(modes) = session.session_modes.as_ref() {
476            Some(Rc::new(AcpSessionModes {
477                connection: self.connection.clone(),
478                session_id: session_id.clone(),
479                state: modes.clone(),
480            }) as _)
481        } else {
482            None
483        }
484    }
485
486    fn model_selector(
487        &self,
488        session_id: &acp::SessionId,
489    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
490        let sessions = self.sessions.clone();
491        let sessions_ref = sessions.borrow();
492        let Some(session) = sessions_ref.get(session_id) else {
493            return None;
494        };
495
496        if let Some(models) = session.models.as_ref() {
497            Some(Rc::new(AcpModelSelector::new(
498                session_id.clone(),
499                self.connection.clone(),
500                models.clone(),
501            )) as _)
502        } else {
503            None
504        }
505    }
506
507    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
508        self
509    }
510}
511
512struct AcpSessionModes {
513    session_id: acp::SessionId,
514    connection: Rc<acp::ClientSideConnection>,
515    state: Rc<RefCell<acp::SessionModeState>>,
516}
517
518impl acp_thread::AgentSessionModes for AcpSessionModes {
519    fn current_mode(&self) -> acp::SessionModeId {
520        self.state.borrow().current_mode_id.clone()
521    }
522
523    fn all_modes(&self) -> Vec<acp::SessionMode> {
524        self.state.borrow().available_modes.clone()
525    }
526
527    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
528        let connection = self.connection.clone();
529        let session_id = self.session_id.clone();
530        let old_mode_id;
531        {
532            let mut state = self.state.borrow_mut();
533            old_mode_id = state.current_mode_id.clone();
534            state.current_mode_id = mode_id.clone();
535        };
536        let state = self.state.clone();
537        cx.foreground_executor().spawn(async move {
538            let result = connection
539                .set_session_mode(acp::SetSessionModeRequest {
540                    session_id,
541                    mode_id,
542                    meta: None,
543                })
544                .await;
545
546            if result.is_err() {
547                state.borrow_mut().current_mode_id = old_mode_id;
548            }
549
550            result?;
551
552            Ok(())
553        })
554    }
555}
556
557struct AcpModelSelector {
558    session_id: acp::SessionId,
559    connection: Rc<acp::ClientSideConnection>,
560    state: Rc<RefCell<acp::SessionModelState>>,
561}
562
563impl AcpModelSelector {
564    fn new(
565        session_id: acp::SessionId,
566        connection: Rc<acp::ClientSideConnection>,
567        state: Rc<RefCell<acp::SessionModelState>>,
568    ) -> Self {
569        Self {
570            session_id,
571            connection,
572            state,
573        }
574    }
575}
576
577impl acp_thread::AgentModelSelector for AcpModelSelector {
578    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
579        Task::ready(Ok(acp_thread::AgentModelList::Flat(
580            self.state
581                .borrow()
582                .available_models
583                .clone()
584                .into_iter()
585                .map(acp_thread::AgentModelInfo::from)
586                .collect(),
587        )))
588    }
589
590    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
591        let connection = self.connection.clone();
592        let session_id = self.session_id.clone();
593        let old_model_id;
594        {
595            let mut state = self.state.borrow_mut();
596            old_model_id = state.current_model_id.clone();
597            state.current_model_id = model_id.clone();
598        };
599        let state = self.state.clone();
600        cx.foreground_executor().spawn(async move {
601            let result = connection
602                .set_session_model(acp::SetSessionModelRequest {
603                    session_id,
604                    model_id,
605                    meta: None,
606                })
607                .await;
608
609            if result.is_err() {
610                state.borrow_mut().current_model_id = old_model_id;
611            }
612
613            result?;
614
615            Ok(())
616        })
617    }
618
619    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
620        let state = self.state.borrow();
621        Task::ready(
622            state
623                .available_models
624                .iter()
625                .find(|m| m.model_id == state.current_model_id)
626                .cloned()
627                .map(acp_thread::AgentModelInfo::from)
628                .ok_or_else(|| anyhow::anyhow!("Model not found")),
629        )
630    }
631}
632
633struct ClientDelegate {
634    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
635    cx: AsyncApp,
636}
637
638#[async_trait::async_trait(?Send)]
639impl acp::Client for ClientDelegate {
640    async fn request_permission(
641        &self,
642        arguments: acp::RequestPermissionRequest,
643    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
644        let respect_always_allow_setting;
645        let thread;
646        {
647            let sessions_ref = self.sessions.borrow();
648            let session = sessions_ref
649                .get(&arguments.session_id)
650                .context("Failed to get session")?;
651            respect_always_allow_setting = session.session_modes.is_none();
652            thread = session.thread.clone();
653        }
654
655        let cx = &mut self.cx.clone();
656
657        let task = thread.update(cx, |thread, cx| {
658            thread.request_tool_call_authorization(
659                arguments.tool_call,
660                arguments.options,
661                respect_always_allow_setting,
662                cx,
663            )
664        })??;
665
666        let outcome = task.await;
667
668        Ok(acp::RequestPermissionResponse {
669            outcome,
670            meta: None,
671        })
672    }
673
674    async fn write_text_file(
675        &self,
676        arguments: acp::WriteTextFileRequest,
677    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
678        let cx = &mut self.cx.clone();
679        let task = self
680            .session_thread(&arguments.session_id)?
681            .update(cx, |thread, cx| {
682                thread.write_text_file(arguments.path, arguments.content, cx)
683            })?;
684
685        task.await?;
686
687        Ok(Default::default())
688    }
689
690    async fn read_text_file(
691        &self,
692        arguments: acp::ReadTextFileRequest,
693    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
694        let task = self.session_thread(&arguments.session_id)?.update(
695            &mut self.cx.clone(),
696            |thread, cx| {
697                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
698            },
699        )?;
700
701        let content = task.await?;
702
703        Ok(acp::ReadTextFileResponse {
704            content,
705            meta: None,
706        })
707    }
708
709    async fn session_notification(
710        &self,
711        notification: acp::SessionNotification,
712    ) -> Result<(), acp::Error> {
713        let sessions = self.sessions.borrow();
714        let session = sessions
715            .get(&notification.session_id)
716            .context("Failed to get session")?;
717
718        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
719            current_mode_id,
720            ..
721        }) = &notification.update
722        {
723            if let Some(session_modes) = &session.session_modes {
724                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
725            } else {
726                log::error!(
727                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
728                );
729            }
730        }
731
732        // Clone so we can inspect meta both before and after handing off to the thread
733        let update_clone = notification.update.clone();
734
735        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
736        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
737            if let Some(meta) = &tc.meta {
738                if let Some(terminal_info) = meta.get("terminal_info") {
739                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
740                    {
741                        let terminal_id = acp::TerminalId(id_str.into());
742                        let cwd = terminal_info
743                            .get("cwd")
744                            .and_then(|v| v.as_str().map(PathBuf::from));
745
746                        // Create a minimal display-only lower-level terminal and register it.
747                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
748                            let builder = TerminalBuilder::new_display_only(
749                                CursorShape::default(),
750                                AlternateScroll::On,
751                                None,
752                                0,
753                            )?;
754                            let lower = cx.new(|cx| builder.subscribe(cx));
755                            thread.on_terminal_provider_event(
756                                TerminalProviderEvent::Created {
757                                    terminal_id: terminal_id.clone(),
758                                    label: tc.title.clone(),
759                                    cwd,
760                                    output_byte_limit: None,
761                                    terminal: lower,
762                                },
763                                cx,
764                            );
765                            anyhow::Ok(())
766                        });
767                    }
768                }
769            }
770        }
771
772        // Forward the update to the acp_thread as usual.
773        session.thread.update(&mut self.cx.clone(), |thread, cx| {
774            thread.handle_session_update(notification.update.clone(), cx)
775        })??;
776
777        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
778        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
779            if let Some(meta) = &tcu.meta {
780                if let Some(term_out) = meta.get("terminal_output") {
781                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
782                        let terminal_id = acp::TerminalId(id_str.into());
783                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
784                            let data = s.as_bytes().to_vec();
785                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
786                                thread.on_terminal_provider_event(
787                                    TerminalProviderEvent::Output {
788                                        terminal_id: terminal_id.clone(),
789                                        data,
790                                    },
791                                    cx,
792                                );
793                            });
794                        }
795                    }
796                }
797
798                // terminal_exit
799                if let Some(term_exit) = meta.get("terminal_exit") {
800                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
801                        let terminal_id = acp::TerminalId(id_str.into());
802                        let status = acp::TerminalExitStatus {
803                            exit_code: term_exit
804                                .get("exit_code")
805                                .and_then(|v| v.as_u64())
806                                .map(|i| i as u32),
807                            signal: term_exit
808                                .get("signal")
809                                .and_then(|v| v.as_str().map(|s| s.to_string())),
810                            meta: None,
811                        };
812                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
813                            thread.on_terminal_provider_event(
814                                TerminalProviderEvent::Exit {
815                                    terminal_id: terminal_id.clone(),
816                                    status,
817                                },
818                                cx,
819                            );
820                        });
821                    }
822                }
823            }
824        }
825
826        Ok(())
827    }
828
829    async fn create_terminal(
830        &self,
831        args: acp::CreateTerminalRequest,
832    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
833        let thread = self.session_thread(&args.session_id)?;
834        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
835
836        let mut env = if let Some(dir) = &args.cwd {
837            project
838                .update(&mut self.cx.clone(), |project, cx| {
839                    let worktree = project.find_worktree(dir.as_path(), cx);
840                    let shell = TerminalSettings::get(
841                        worktree.as_ref().map(|(worktree, path)| SettingsLocation {
842                            worktree_id: worktree.read(cx).id(),
843                            path: &path,
844                        }),
845                        cx,
846                    )
847                    .shell
848                    .clone();
849                    project.directory_environment(&shell, dir.clone().into(), cx)
850                })?
851                .await
852                .unwrap_or_default()
853        } else {
854            Default::default()
855        };
856        // Disables paging for `git` and hopefully other commands
857        env.insert("PAGER".into(), "".into());
858        for var in args.env {
859            env.insert(var.name, var.value);
860        }
861
862        // Use remote shell or default system shell, as appropriate
863        let shell = project
864            .update(&mut self.cx.clone(), |project, cx| {
865                project
866                    .remote_client()
867                    .and_then(|r| r.read(cx).default_system_shell())
868                    .map(Shell::Program)
869            })?
870            .unwrap_or_else(|| Shell::Program(get_default_system_shell_preferring_bash()));
871        let is_windows = project
872            .read_with(&self.cx, |project, cx| project.path_style(cx).is_windows())
873            .unwrap_or(cfg!(windows));
874        let (task_command, task_args) = task::ShellBuilder::new(&shell, is_windows)
875            .redirect_stdin_to_dev_null()
876            .build(Some(args.command.clone()), &args.args);
877
878        let terminal_entity = project
879            .update(&mut self.cx.clone(), |project, cx| {
880                project.create_terminal_task(
881                    task::SpawnInTerminal {
882                        command: Some(task_command),
883                        args: task_args,
884                        cwd: args.cwd.clone(),
885                        env,
886                        ..Default::default()
887                    },
888                    cx,
889                )
890            })?
891            .await?;
892
893        // Register with renderer
894        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
895            thread.register_terminal_created(
896                acp::TerminalId(uuid::Uuid::new_v4().to_string().into()),
897                format!("{} {}", args.command, args.args.join(" ")),
898                args.cwd.clone(),
899                args.output_byte_limit,
900                terminal_entity,
901                cx,
902            )
903        })?;
904        let terminal_id =
905            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
906        Ok(acp::CreateTerminalResponse {
907            terminal_id,
908            meta: None,
909        })
910    }
911
912    async fn kill_terminal_command(
913        &self,
914        args: acp::KillTerminalCommandRequest,
915    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
916        self.session_thread(&args.session_id)?
917            .update(&mut self.cx.clone(), |thread, cx| {
918                thread.kill_terminal(args.terminal_id, cx)
919            })??;
920
921        Ok(Default::default())
922    }
923
924    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
925        Err(acp::Error::method_not_found())
926    }
927
928    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
929        Err(acp::Error::method_not_found())
930    }
931
932    async fn release_terminal(
933        &self,
934        args: acp::ReleaseTerminalRequest,
935    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
936        self.session_thread(&args.session_id)?
937            .update(&mut self.cx.clone(), |thread, cx| {
938                thread.release_terminal(args.terminal_id, cx)
939            })??;
940
941        Ok(Default::default())
942    }
943
944    async fn terminal_output(
945        &self,
946        args: acp::TerminalOutputRequest,
947    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
948        self.session_thread(&args.session_id)?
949            .read_with(&mut self.cx.clone(), |thread, cx| {
950                let out = thread
951                    .terminal(args.terminal_id)?
952                    .read(cx)
953                    .current_output(cx);
954
955                Ok(out)
956            })?
957    }
958
959    async fn wait_for_terminal_exit(
960        &self,
961        args: acp::WaitForTerminalExitRequest,
962    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
963        let exit_status = self
964            .session_thread(&args.session_id)?
965            .update(&mut self.cx.clone(), |thread, cx| {
966                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
967            })??
968            .await;
969
970        Ok(acp::WaitForTerminalExitResponse {
971            exit_status,
972            meta: None,
973        })
974    }
975}
976
977impl ClientDelegate {
978    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
979        let sessions = self.sessions.borrow();
980        sessions
981            .get(session_id)
982            .context("Failed to get session")
983            .map(|session| session.thread.clone())
984    }
985}