1use acp_thread::AgentConnection;
  2use acp_tools::AcpConnectionRegistry;
  3use action_log::ActionLog;
  4use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
  5use anyhow::anyhow;
  6use collections::HashMap;
  7use futures::AsyncBufReadExt as _;
  8use futures::io::BufReader;
  9use project::Project;
 10use project::agent_server_store::AgentServerCommand;
 11use serde::Deserialize;
 12use settings::{Settings as _, SettingsLocation};
 13use task::Shell;
 14use util::{ResultExt as _, get_default_system_shell_preferring_bash};
 15
 16use std::path::PathBuf;
 17use std::{any::Any, cell::RefCell};
 18use std::{path::Path, rc::Rc};
 19use thiserror::Error;
 20
 21use anyhow::{Context as _, Result};
 22use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 23
 24use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
 25use terminal::TerminalBuilder;
 26use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
 27
 28#[derive(Debug, Error)]
 29#[error("Unsupported version")]
 30pub struct UnsupportedVersion;
 31
 32pub struct AcpConnection {
 33    server_name: SharedString,
 34    connection: Rc<acp::ClientSideConnection>,
 35    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
 36    auth_methods: Vec<acp::AuthMethod>,
 37    agent_capabilities: acp::AgentCapabilities,
 38    default_mode: Option<acp::SessionModeId>,
 39    root_dir: PathBuf,
 40    // NB: Don't move this into the wait_task, since we need to ensure the process is
 41    // killed on drop (setting kill_on_drop on the command seems to not always work).
 42    child: smol::process::Child,
 43    _io_task: Task<Result<()>>,
 44    _wait_task: Task<Result<()>>,
 45    _stderr_task: Task<Result<()>>,
 46}
 47
 48pub struct AcpSession {
 49    thread: WeakEntity<AcpThread>,
 50    suppress_abort_err: bool,
 51    models: Option<Rc<RefCell<acp::SessionModelState>>>,
 52    session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
 53}
 54
 55pub async fn connect(
 56    server_name: SharedString,
 57    command: AgentServerCommand,
 58    root_dir: &Path,
 59    default_mode: Option<acp::SessionModeId>,
 60    is_remote: bool,
 61    cx: &mut AsyncApp,
 62) -> Result<Rc<dyn AgentConnection>> {
 63    let conn = AcpConnection::stdio(
 64        server_name,
 65        command.clone(),
 66        root_dir,
 67        default_mode,
 68        is_remote,
 69        cx,
 70    )
 71    .await?;
 72    Ok(Rc::new(conn) as _)
 73}
 74
 75const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::V1;
 76
 77impl AcpConnection {
 78    pub async fn stdio(
 79        server_name: SharedString,
 80        command: AgentServerCommand,
 81        root_dir: &Path,
 82        default_mode: Option<acp::SessionModeId>,
 83        is_remote: bool,
 84        cx: &mut AsyncApp,
 85    ) -> Result<Self> {
 86        let mut child = util::command::new_smol_command(&command.path);
 87        child
 88            .args(command.args.iter().map(|arg| arg.as_str()))
 89            .envs(command.env.iter().flatten())
 90            .stdin(std::process::Stdio::piped())
 91            .stdout(std::process::Stdio::piped())
 92            .stderr(std::process::Stdio::piped());
 93        if !is_remote {
 94            child.current_dir(root_dir);
 95        }
 96        let mut child = child.spawn()?;
 97
 98        let stdout = child.stdout.take().context("Failed to take stdout")?;
 99        let stdin = child.stdin.take().context("Failed to take stdin")?;
100        let stderr = child.stderr.take().context("Failed to take stderr")?;
101        log::debug!(
102            "Spawning external agent server: {:?}, {:?}",
103            command.path,
104            command.args
105        );
106        log::trace!("Spawned (pid: {})", child.id());
107
108        let sessions = Rc::new(RefCell::new(HashMap::default()));
109
110        let client = ClientDelegate {
111            sessions: sessions.clone(),
112            cx: cx.clone(),
113        };
114        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
115            let foreground_executor = cx.foreground_executor().clone();
116            move |fut| {
117                foreground_executor.spawn(fut).detach();
118            }
119        });
120
121        let io_task = cx.background_spawn(io_task);
122
123        let stderr_task = cx.background_spawn(async move {
124            let mut stderr = BufReader::new(stderr);
125            let mut line = String::new();
126            while let Ok(n) = stderr.read_line(&mut line).await
127                && n > 0
128            {
129                log::warn!("agent stderr: {}", &line);
130                line.clear();
131            }
132            Ok(())
133        });
134
135        let wait_task = cx.spawn({
136            let sessions = sessions.clone();
137            let status_fut = child.status();
138            async move |cx| {
139                let status = status_fut.await?;
140
141                for session in sessions.borrow().values() {
142                    session
143                        .thread
144                        .update(cx, |thread, cx| {
145                            thread.emit_load_error(LoadError::Exited { status }, cx)
146                        })
147                        .ok();
148                }
149
150                anyhow::Ok(())
151            }
152        });
153
154        let connection = Rc::new(connection);
155
156        cx.update(|cx| {
157            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
158                registry.set_active_connection(server_name.clone(), &connection, cx)
159            });
160        })?;
161
162        let response = connection
163            .initialize(acp::InitializeRequest {
164                protocol_version: acp::VERSION,
165                client_capabilities: acp::ClientCapabilities {
166                    fs: acp::FileSystemCapability {
167                        read_text_file: true,
168                        write_text_file: true,
169                        meta: None,
170                    },
171                    terminal: true,
172                    meta: Some(serde_json::json!({
173                        // Experimental: Allow for rendering terminal output from the agents
174                        "terminal_output": true,
175                    })),
176                },
177                meta: None,
178            })
179            .await?;
180
181        if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
182            return Err(UnsupportedVersion.into());
183        }
184
185        Ok(Self {
186            auth_methods: response.auth_methods,
187            root_dir: root_dir.to_owned(),
188            connection,
189            server_name,
190            sessions,
191            agent_capabilities: response.agent_capabilities,
192            default_mode,
193            _io_task: io_task,
194            _wait_task: wait_task,
195            _stderr_task: stderr_task,
196            child,
197        })
198    }
199
200    pub const fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
201        &self.agent_capabilities.prompt_capabilities
202    }
203
204    pub fn root_dir(&self) -> &Path {
205        &self.root_dir
206    }
207}
208
209impl Drop for AcpConnection {
210    fn drop(&mut self) {
211        // See the comment on the child field.
212        self.child.kill().log_err();
213    }
214}
215
216impl AgentConnection for AcpConnection {
217    fn new_thread(
218        self: Rc<Self>,
219        project: Entity<Project>,
220        cwd: &Path,
221        cx: &mut App,
222    ) -> Task<Result<Entity<AcpThread>>> {
223        let name = self.server_name.clone();
224        let conn = self.connection.clone();
225        let sessions = self.sessions.clone();
226        let default_mode = self.default_mode.clone();
227        let cwd = cwd.to_path_buf();
228        let context_server_store = project.read(cx).context_server_store().read(cx);
229        let mcp_servers = if project.read(cx).is_local() {
230            context_server_store
231                .configured_server_ids()
232                .iter()
233                .filter_map(|id| {
234                    let configuration = context_server_store.configuration_for_server(id)?;
235                    let command = configuration.command();
236                    Some(acp::McpServer::Stdio {
237                        name: id.0.to_string(),
238                        command: command.path.clone(),
239                        args: command.args.clone(),
240                        env: if let Some(env) = command.env.as_ref() {
241                            env.iter()
242                                .map(|(name, value)| acp::EnvVariable {
243                                    name: name.clone(),
244                                    value: value.clone(),
245                                    meta: None,
246                                })
247                                .collect()
248                        } else {
249                            vec![]
250                        },
251                    })
252                })
253                .collect()
254        } else {
255            // In SSH projects, the external agent is running on the remote
256            // machine, and currently we only run MCP servers on the local
257            // machine. So don't pass any MCP servers to the agent in that case.
258            Vec::new()
259        };
260
261        cx.spawn(async move |cx| {
262            let response = conn
263                .new_session(acp::NewSessionRequest { mcp_servers, cwd, meta: None })
264                .await
265                .map_err(|err| {
266                    if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
267                        let mut error = AuthRequired::new();
268
269                        if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
270                            error = error.with_description(err.message);
271                        }
272
273                        anyhow!(error)
274                    } else {
275                        anyhow!(err)
276                    }
277                })?;
278
279            let modes = response.modes.map(|modes| Rc::new(RefCell::new(modes)));
280            let models = response.models.map(|models| Rc::new(RefCell::new(models)));
281
282            if let Some(default_mode) = default_mode {
283                if let Some(modes) = modes.as_ref() {
284                    let mut modes_ref = modes.borrow_mut();
285                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
286
287                    if has_mode {
288                        let initial_mode_id = modes_ref.current_mode_id.clone();
289
290                        cx.spawn({
291                            let default_mode = default_mode.clone();
292                            let session_id = response.session_id.clone();
293                            let modes = modes.clone();
294                            async move |_| {
295                                let result = conn.set_session_mode(acp::SetSessionModeRequest {
296                                    session_id,
297                                    mode_id: default_mode,
298                                    meta: None,
299                                })
300                                .await.log_err();
301
302                                if result.is_none() {
303                                    modes.borrow_mut().current_mode_id = initial_mode_id;
304                                }
305                            }
306                        }).detach();
307
308                        modes_ref.current_mode_id = default_mode;
309                    } else {
310                        let available_modes = modes_ref
311                            .available_modes
312                            .iter()
313                            .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
314                            .collect::<Vec<_>>()
315                            .join("\n");
316
317                        log::warn!(
318                            "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
319                        );
320                    }
321                } else {
322                    log::warn!(
323                        "`{name}` does not support modes, but `default_mode` was set in settings.",
324                    );
325                }
326            }
327
328            let session_id = response.session_id;
329            let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
330            let thread = cx.new(|cx| {
331                AcpThread::new(
332                    self.server_name.clone(),
333                    self.clone(),
334                    project,
335                    action_log,
336                    session_id.clone(),
337                    // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
338                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
339                    cx,
340                )
341            })?;
342
343
344            let session = AcpSession {
345                thread: thread.downgrade(),
346                suppress_abort_err: false,
347                session_modes: modes,
348                models,
349            };
350            sessions.borrow_mut().insert(session_id, session);
351
352            Ok(thread)
353        })
354    }
355
356    fn auth_methods(&self) -> &[acp::AuthMethod] {
357        &self.auth_methods
358    }
359
360    fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
361        let conn = self.connection.clone();
362        cx.foreground_executor().spawn(async move {
363            conn.authenticate(acp::AuthenticateRequest {
364                method_id: method_id.clone(),
365                meta: None,
366            })
367            .await?;
368
369            Ok(())
370        })
371    }
372
373    fn prompt(
374        &self,
375        _id: Option<acp_thread::UserMessageId>,
376        params: acp::PromptRequest,
377        cx: &mut App,
378    ) -> Task<Result<acp::PromptResponse>> {
379        let conn = self.connection.clone();
380        let sessions = self.sessions.clone();
381        let session_id = params.session_id.clone();
382        cx.foreground_executor().spawn(async move {
383            let result = conn.prompt(params).await;
384
385            let mut suppress_abort_err = false;
386
387            if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
388                suppress_abort_err = session.suppress_abort_err;
389                session.suppress_abort_err = false;
390            }
391
392            match result {
393                Ok(response) => Ok(response),
394                Err(err) => {
395                    if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
396                        return Err(anyhow!(acp::Error::auth_required()));
397                    }
398
399                    if err.code != ErrorCode::INTERNAL_ERROR.code {
400                        anyhow::bail!(err)
401                    }
402
403                    let Some(data) = &err.data else {
404                        anyhow::bail!(err)
405                    };
406
407                    // Temporary workaround until the following PR is generally available:
408                    // https://github.com/google-gemini/gemini-cli/pull/6656
409
410                    #[derive(Deserialize)]
411                    #[serde(deny_unknown_fields)]
412                    struct ErrorDetails {
413                        details: Box<str>,
414                    }
415
416                    match serde_json::from_value(data.clone()) {
417                        Ok(ErrorDetails { details }) => {
418                            if suppress_abort_err
419                                && (details.contains("This operation was aborted")
420                                    || details.contains("The user aborted a request"))
421                            {
422                                Ok(acp::PromptResponse {
423                                    stop_reason: acp::StopReason::Cancelled,
424                                    meta: None,
425                                })
426                            } else {
427                                Err(anyhow!(details))
428                            }
429                        }
430                        Err(_) => Err(anyhow!(err)),
431                    }
432                }
433            }
434        })
435    }
436
437    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
438        if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
439            session.suppress_abort_err = true;
440        }
441        let conn = self.connection.clone();
442        let params = acp::CancelNotification {
443            session_id: session_id.clone(),
444            meta: None,
445        };
446        cx.foreground_executor()
447            .spawn(async move { conn.cancel(params).await })
448            .detach();
449    }
450
451    fn session_modes(
452        &self,
453        session_id: &acp::SessionId,
454        _cx: &App,
455    ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
456        let sessions = self.sessions.clone();
457        let sessions_ref = sessions.borrow();
458        let Some(session) = sessions_ref.get(session_id) else {
459            return None;
460        };
461
462        if let Some(modes) = session.session_modes.as_ref() {
463            Some(Rc::new(AcpSessionModes {
464                connection: self.connection.clone(),
465                session_id: session_id.clone(),
466                state: modes.clone(),
467            }) as _)
468        } else {
469            None
470        }
471    }
472
473    fn model_selector(
474        &self,
475        session_id: &acp::SessionId,
476    ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
477        let sessions = self.sessions.clone();
478        let sessions_ref = sessions.borrow();
479        let Some(session) = sessions_ref.get(session_id) else {
480            return None;
481        };
482
483        if let Some(models) = session.models.as_ref() {
484            Some(Rc::new(AcpModelSelector::new(
485                session_id.clone(),
486                self.connection.clone(),
487                models.clone(),
488            )) as _)
489        } else {
490            None
491        }
492    }
493
494    fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
495        self
496    }
497}
498
499struct AcpSessionModes {
500    session_id: acp::SessionId,
501    connection: Rc<acp::ClientSideConnection>,
502    state: Rc<RefCell<acp::SessionModeState>>,
503}
504
505impl acp_thread::AgentSessionModes for AcpSessionModes {
506    fn current_mode(&self) -> acp::SessionModeId {
507        self.state.borrow().current_mode_id.clone()
508    }
509
510    fn all_modes(&self) -> Vec<acp::SessionMode> {
511        self.state.borrow().available_modes.clone()
512    }
513
514    fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
515        let connection = self.connection.clone();
516        let session_id = self.session_id.clone();
517        let old_mode_id;
518        {
519            let mut state = self.state.borrow_mut();
520            old_mode_id = state.current_mode_id.clone();
521            state.current_mode_id = mode_id.clone();
522        };
523        let state = self.state.clone();
524        cx.foreground_executor().spawn(async move {
525            let result = connection
526                .set_session_mode(acp::SetSessionModeRequest {
527                    session_id,
528                    mode_id,
529                    meta: None,
530                })
531                .await;
532
533            if result.is_err() {
534                state.borrow_mut().current_mode_id = old_mode_id;
535            }
536
537            result?;
538
539            Ok(())
540        })
541    }
542}
543
544struct AcpModelSelector {
545    session_id: acp::SessionId,
546    connection: Rc<acp::ClientSideConnection>,
547    state: Rc<RefCell<acp::SessionModelState>>,
548}
549
550impl AcpModelSelector {
551    const fn new(
552        session_id: acp::SessionId,
553        connection: Rc<acp::ClientSideConnection>,
554        state: Rc<RefCell<acp::SessionModelState>>,
555    ) -> Self {
556        Self {
557            session_id,
558            connection,
559            state,
560        }
561    }
562}
563
564impl acp_thread::AgentModelSelector for AcpModelSelector {
565    fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
566        Task::ready(Ok(acp_thread::AgentModelList::Flat(
567            self.state
568                .borrow()
569                .available_models
570                .clone()
571                .into_iter()
572                .map(acp_thread::AgentModelInfo::from)
573                .collect(),
574        )))
575    }
576
577    fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
578        let connection = self.connection.clone();
579        let session_id = self.session_id.clone();
580        let old_model_id;
581        {
582            let mut state = self.state.borrow_mut();
583            old_model_id = state.current_model_id.clone();
584            state.current_model_id = model_id.clone();
585        };
586        let state = self.state.clone();
587        cx.foreground_executor().spawn(async move {
588            let result = connection
589                .set_session_model(acp::SetSessionModelRequest {
590                    session_id,
591                    model_id,
592                    meta: None,
593                })
594                .await;
595
596            if result.is_err() {
597                state.borrow_mut().current_model_id = old_model_id;
598            }
599
600            result?;
601
602            Ok(())
603        })
604    }
605
606    fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
607        let state = self.state.borrow();
608        Task::ready(
609            state
610                .available_models
611                .iter()
612                .find(|m| m.model_id == state.current_model_id)
613                .cloned()
614                .map(acp_thread::AgentModelInfo::from)
615                .ok_or_else(|| anyhow::anyhow!("Model not found")),
616        )
617    }
618}
619
620struct ClientDelegate {
621    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
622    cx: AsyncApp,
623}
624
625#[async_trait::async_trait(?Send)]
626impl acp::Client for ClientDelegate {
627    async fn request_permission(
628        &self,
629        arguments: acp::RequestPermissionRequest,
630    ) -> Result<acp::RequestPermissionResponse, acp::Error> {
631        let respect_always_allow_setting;
632        let thread;
633        {
634            let sessions_ref = self.sessions.borrow();
635            let session = sessions_ref
636                .get(&arguments.session_id)
637                .context("Failed to get session")?;
638            respect_always_allow_setting = session.session_modes.is_none();
639            thread = session.thread.clone();
640        }
641
642        let cx = &mut self.cx.clone();
643
644        let task = thread.update(cx, |thread, cx| {
645            thread.request_tool_call_authorization(
646                arguments.tool_call,
647                arguments.options,
648                respect_always_allow_setting,
649                cx,
650            )
651        })??;
652
653        let outcome = task.await;
654
655        Ok(acp::RequestPermissionResponse {
656            outcome,
657            meta: None,
658        })
659    }
660
661    async fn write_text_file(
662        &self,
663        arguments: acp::WriteTextFileRequest,
664    ) -> Result<acp::WriteTextFileResponse, acp::Error> {
665        let cx = &mut self.cx.clone();
666        let task = self
667            .session_thread(&arguments.session_id)?
668            .update(cx, |thread, cx| {
669                thread.write_text_file(arguments.path, arguments.content, cx)
670            })?;
671
672        task.await?;
673
674        Ok(Default::default())
675    }
676
677    async fn read_text_file(
678        &self,
679        arguments: acp::ReadTextFileRequest,
680    ) -> Result<acp::ReadTextFileResponse, acp::Error> {
681        let task = self.session_thread(&arguments.session_id)?.update(
682            &mut self.cx.clone(),
683            |thread, cx| {
684                thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
685            },
686        )?;
687
688        let content = task.await?;
689
690        Ok(acp::ReadTextFileResponse {
691            content,
692            meta: None,
693        })
694    }
695
696    async fn session_notification(
697        &self,
698        notification: acp::SessionNotification,
699    ) -> Result<(), acp::Error> {
700        let sessions = self.sessions.borrow();
701        let session = sessions
702            .get(¬ification.session_id)
703            .context("Failed to get session")?;
704
705        if let acp::SessionUpdate::CurrentModeUpdate { current_mode_id } = ¬ification.update {
706            if let Some(session_modes) = &session.session_modes {
707                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
708            } else {
709                log::error!(
710                    "Got a `CurrentModeUpdate` notification, but they agent didn't specify `modes` during setting setup."
711                );
712            }
713        }
714
715        // Clone so we can inspect meta both before and after handing off to the thread
716        let update_clone = notification.update.clone();
717
718        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
719        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
720            if let Some(meta) = &tc.meta {
721                if let Some(terminal_info) = meta.get("terminal_info") {
722                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
723                    {
724                        let terminal_id = acp::TerminalId(id_str.into());
725                        let cwd = terminal_info
726                            .get("cwd")
727                            .and_then(|v| v.as_str().map(PathBuf::from));
728
729                        // Create a minimal display-only lower-level terminal and register it.
730                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
731                            let builder = TerminalBuilder::new_display_only(
732                                CursorShape::default(),
733                                AlternateScroll::On,
734                                None,
735                                0,
736                            )?;
737                            let lower = cx.new(|cx| builder.subscribe(cx));
738                            thread.on_terminal_provider_event(
739                                TerminalProviderEvent::Created {
740                                    terminal_id: terminal_id.clone(),
741                                    label: tc.title.clone(),
742                                    cwd,
743                                    output_byte_limit: None,
744                                    terminal: lower,
745                                },
746                                cx,
747                            );
748                            anyhow::Ok(())
749                        });
750                    }
751                }
752            }
753        }
754
755        // Forward the update to the acp_thread as usual.
756        session.thread.update(&mut self.cx.clone(), |thread, cx| {
757            thread.handle_session_update(notification.update.clone(), cx)
758        })??;
759
760        // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
761        if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
762            if let Some(meta) = &tcu.meta {
763                if let Some(term_out) = meta.get("terminal_output") {
764                    if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
765                        let terminal_id = acp::TerminalId(id_str.into());
766                        if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
767                            let data = s.as_bytes().to_vec();
768                            let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
769                                thread.on_terminal_provider_event(
770                                    TerminalProviderEvent::Output {
771                                        terminal_id: terminal_id.clone(),
772                                        data,
773                                    },
774                                    cx,
775                                );
776                            });
777                        }
778                    }
779                }
780
781                // terminal_exit
782                if let Some(term_exit) = meta.get("terminal_exit") {
783                    if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
784                        let terminal_id = acp::TerminalId(id_str.into());
785                        let status = acp::TerminalExitStatus {
786                            exit_code: term_exit
787                                .get("exit_code")
788                                .and_then(|v| v.as_u64())
789                                .map(|i| i as u32),
790                            signal: term_exit
791                                .get("signal")
792                                .and_then(|v| v.as_str().map(|s| s.to_string())),
793                            meta: None,
794                        };
795                        let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
796                            thread.on_terminal_provider_event(
797                                TerminalProviderEvent::Exit {
798                                    terminal_id: terminal_id.clone(),
799                                    status,
800                                },
801                                cx,
802                            );
803                        });
804                    }
805                }
806            }
807        }
808
809        Ok(())
810    }
811
812    async fn create_terminal(
813        &self,
814        args: acp::CreateTerminalRequest,
815    ) -> Result<acp::CreateTerminalResponse, acp::Error> {
816        let thread = self.session_thread(&args.session_id)?;
817        let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
818
819        let mut env = if let Some(dir) = &args.cwd {
820            project
821                .update(&mut self.cx.clone(), |project, cx| {
822                    let worktree = project.find_worktree(dir.as_path(), cx);
823                    let shell = TerminalSettings::get(
824                        worktree.as_ref().map(|(worktree, path)| SettingsLocation {
825                            worktree_id: worktree.read(cx).id(),
826                            path: &path,
827                        }),
828                        cx,
829                    )
830                    .shell
831                    .clone();
832                    project.directory_environment(&shell, dir.clone().into(), cx)
833                })?
834                .await
835                .unwrap_or_default()
836        } else {
837            Default::default()
838        };
839        // Disables paging for `git` and hopefully other commands
840        env.insert("PAGER".into(), "".into());
841        for var in args.env {
842            env.insert(var.name, var.value);
843        }
844
845        // Use remote shell or default system shell, as appropriate
846        let shell = project
847            .update(&mut self.cx.clone(), |project, cx| {
848                project
849                    .remote_client()
850                    .and_then(|r| r.read(cx).default_system_shell())
851                    .map(Shell::Program)
852            })?
853            .unwrap_or_else(|| Shell::Program(get_default_system_shell_preferring_bash()));
854        let is_windows = project
855            .read_with(&self.cx, |project, cx| project.path_style(cx).is_windows())
856            .unwrap_or(cfg!(windows));
857        let (task_command, task_args) = task::ShellBuilder::new(&shell, is_windows)
858            .redirect_stdin_to_dev_null()
859            .build(Some(args.command.clone()), &args.args);
860
861        let terminal_entity = project
862            .update(&mut self.cx.clone(), |project, cx| {
863                project.create_terminal_task(
864                    task::SpawnInTerminal {
865                        command: Some(task_command),
866                        args: task_args,
867                        cwd: args.cwd.clone(),
868                        env,
869                        ..Default::default()
870                    },
871                    cx,
872                )
873            })?
874            .await?;
875
876        // Register with renderer
877        let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
878            thread.register_terminal_created(
879                acp::TerminalId(uuid::Uuid::new_v4().to_string().into()),
880                format!("{} {}", args.command, args.args.join(" ")),
881                args.cwd.clone(),
882                args.output_byte_limit,
883                terminal_entity,
884                cx,
885            )
886        })?;
887        let terminal_id =
888            terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone())?;
889        Ok(acp::CreateTerminalResponse {
890            terminal_id,
891            meta: None,
892        })
893    }
894
895    async fn kill_terminal_command(
896        &self,
897        args: acp::KillTerminalCommandRequest,
898    ) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
899        self.session_thread(&args.session_id)?
900            .update(&mut self.cx.clone(), |thread, cx| {
901                thread.kill_terminal(args.terminal_id, cx)
902            })??;
903
904        Ok(Default::default())
905    }
906
907    async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
908        Err(acp::Error::method_not_found())
909    }
910
911    async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
912        Err(acp::Error::method_not_found())
913    }
914
915    async fn release_terminal(
916        &self,
917        args: acp::ReleaseTerminalRequest,
918    ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
919        self.session_thread(&args.session_id)?
920            .update(&mut self.cx.clone(), |thread, cx| {
921                thread.release_terminal(args.terminal_id, cx)
922            })??;
923
924        Ok(Default::default())
925    }
926
927    async fn terminal_output(
928        &self,
929        args: acp::TerminalOutputRequest,
930    ) -> Result<acp::TerminalOutputResponse, acp::Error> {
931        self.session_thread(&args.session_id)?
932            .read_with(&mut self.cx.clone(), |thread, cx| {
933                let out = thread
934                    .terminal(args.terminal_id)?
935                    .read(cx)
936                    .current_output(cx);
937
938                Ok(out)
939            })?
940    }
941
942    async fn wait_for_terminal_exit(
943        &self,
944        args: acp::WaitForTerminalExitRequest,
945    ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
946        let exit_status = self
947            .session_thread(&args.session_id)?
948            .update(&mut self.cx.clone(), |thread, cx| {
949                anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
950            })??
951            .await;
952
953        Ok(acp::WaitForTerminalExitResponse {
954            exit_status,
955            meta: None,
956        })
957    }
958}
959
960impl ClientDelegate {
961    fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
962        let sessions = self.sessions.borrow();
963        sessions
964            .get(session_id)
965            .context("Failed to get session")
966            .map(|session| session.thread.clone())
967    }
968}