agent_servers.rs

  1mod acp;
  2mod claude;
  3mod custom;
  4mod gemini;
  5mod settings;
  6
  7#[cfg(any(test, feature = "test-support"))]
  8pub mod e2e_tests;
  9
 10use anyhow::Context as _;
 11pub use claude::*;
 12pub use custom::*;
 13use fs::Fs;
 14use fs::RemoveOptions;
 15use fs::RenameOptions;
 16use futures::StreamExt as _;
 17pub use gemini::*;
 18use gpui::AppContext;
 19use node_runtime::NodeRuntime;
 20pub use settings::*;
 21
 22use acp_thread::AgentConnection;
 23use acp_thread::LoadError;
 24use anyhow::Result;
 25use anyhow::anyhow;
 26use collections::HashMap;
 27use gpui::{App, AsyncApp, Entity, SharedString, Task};
 28use project::Project;
 29use schemars::JsonSchema;
 30use semver::Version;
 31use serde::{Deserialize, Serialize};
 32use std::str::FromStr as _;
 33use std::{
 34    any::Any,
 35    path::{Path, PathBuf},
 36    rc::Rc,
 37    sync::Arc,
 38};
 39use util::ResultExt as _;
 40
 41pub fn init(cx: &mut App) {
 42    settings::init(cx);
 43}
 44
 45pub struct AgentServerDelegate {
 46    project: Entity<Project>,
 47    status_tx: Option<watch::Sender<SharedString>>,
 48    new_version_available: Option<watch::Sender<Option<String>>>,
 49}
 50
 51impl AgentServerDelegate {
 52    pub fn new(
 53        project: Entity<Project>,
 54        status_tx: Option<watch::Sender<SharedString>>,
 55        new_version_tx: Option<watch::Sender<Option<String>>>,
 56    ) -> Self {
 57        Self {
 58            project,
 59            status_tx,
 60            new_version_available: new_version_tx,
 61        }
 62    }
 63
 64    pub fn project(&self) -> &Entity<Project> {
 65        &self.project
 66    }
 67
 68    fn get_or_npm_install_builtin_agent(
 69        self,
 70        binary_name: SharedString,
 71        package_name: SharedString,
 72        entrypoint_path: PathBuf,
 73        ignore_system_version: bool,
 74        minimum_version: Option<Version>,
 75        cx: &mut App,
 76    ) -> Task<Result<AgentServerCommand>> {
 77        let project = self.project;
 78        let fs = project.read(cx).fs().clone();
 79        let Some(node_runtime) = project.read(cx).node_runtime().cloned() else {
 80            return Task::ready(Err(anyhow!(
 81                "External agents are not yet available in remote projects."
 82            )));
 83        };
 84        let status_tx = self.status_tx;
 85        let new_version_available = self.new_version_available;
 86
 87        cx.spawn(async move |cx| {
 88            if !ignore_system_version {
 89                if let Some(bin) = find_bin_in_path(binary_name.clone(), &project, cx).await {
 90                    return Ok(AgentServerCommand {
 91                        path: bin,
 92                        args: Vec::new(),
 93                        env: Default::default(),
 94                    });
 95                }
 96            }
 97
 98            cx.spawn(async move |cx| {
 99                let node_path = node_runtime.binary_path().await?;
100                let dir = paths::data_dir()
101                    .join("external_agents")
102                    .join(binary_name.as_str());
103                fs.create_dir(&dir).await?;
104
105                let mut stream = fs.read_dir(&dir).await?;
106                let mut versions = Vec::new();
107                let mut to_delete = Vec::new();
108                while let Some(entry) = stream.next().await {
109                    let Ok(entry) = entry else { continue };
110                    let Some(file_name) = entry.file_name() else {
111                        continue;
112                    };
113
114                    if let Some(name) = file_name.to_str()
115                        && let Some(version) = semver::Version::from_str(name).ok()
116                        && fs
117                            .is_file(&dir.join(file_name).join(&entrypoint_path))
118                            .await
119                    {
120                        versions.push((version, file_name.to_owned()));
121                    } else {
122                        to_delete.push(file_name.to_owned())
123                    }
124                }
125
126                versions.sort();
127                let newest_version = if let Some((version, file_name)) = versions.last().cloned()
128                    && minimum_version.is_none_or(|minimum_version| version >= minimum_version)
129                {
130                    versions.pop();
131                    Some(file_name)
132                } else {
133                    None
134                };
135                log::debug!("existing version of {package_name}: {newest_version:?}");
136                to_delete.extend(versions.into_iter().map(|(_, file_name)| file_name));
137
138                cx.background_spawn({
139                    let fs = fs.clone();
140                    let dir = dir.clone();
141                    async move {
142                        for file_name in to_delete {
143                            fs.remove_dir(
144                                &dir.join(file_name),
145                                RemoveOptions {
146                                    recursive: true,
147                                    ignore_if_not_exists: false,
148                                },
149                            )
150                            .await
151                            .ok();
152                        }
153                    }
154                })
155                .detach();
156
157                let version = if let Some(file_name) = newest_version {
158                    cx.background_spawn({
159                        let file_name = file_name.clone();
160                        let dir = dir.clone();
161                        let fs = fs.clone();
162                        async move {
163                            let latest_version =
164                                node_runtime.npm_package_latest_version(&package_name).await;
165                            if let Ok(latest_version) = latest_version
166                                && &latest_version != &file_name.to_string_lossy()
167                            {
168                                Self::download_latest_version(
169                                    fs,
170                                    dir.clone(),
171                                    node_runtime,
172                                    package_name,
173                                )
174                                .await
175                                .log_err();
176                                if let Some(mut new_version_available) = new_version_available {
177                                    new_version_available.send(Some(latest_version)).ok();
178                                }
179                            }
180                        }
181                    })
182                    .detach();
183                    file_name
184                } else {
185                    if let Some(mut status_tx) = status_tx {
186                        status_tx.send("Installing…".into()).ok();
187                    }
188                    let dir = dir.clone();
189                    cx.background_spawn(Self::download_latest_version(
190                        fs.clone(),
191                        dir.clone(),
192                        node_runtime,
193                        package_name,
194                    ))
195                    .await?
196                    .into()
197                };
198
199                let agent_server_path = dir.join(version).join(entrypoint_path);
200                let agent_server_path_exists = fs.is_file(&agent_server_path).await;
201                anyhow::ensure!(
202                    agent_server_path_exists,
203                    "Missing entrypoint path {} after installation",
204                    agent_server_path.to_string_lossy()
205                );
206
207                anyhow::Ok(AgentServerCommand {
208                    path: node_path,
209                    args: vec![agent_server_path.to_string_lossy().to_string()],
210                    env: Default::default(),
211                })
212            })
213            .await
214            .map_err(|e| LoadError::FailedToInstall(e.to_string().into()).into())
215        })
216    }
217
218    async fn download_latest_version(
219        fs: Arc<dyn Fs>,
220        dir: PathBuf,
221        node_runtime: NodeRuntime,
222        package_name: SharedString,
223    ) -> Result<String> {
224        log::debug!("downloading latest version of {package_name}");
225
226        let tmp_dir = tempfile::tempdir_in(&dir)?;
227
228        node_runtime
229            .npm_install_packages(tmp_dir.path(), &[(&package_name, "latest")])
230            .await?;
231
232        let version = node_runtime
233            .npm_package_installed_version(tmp_dir.path(), &package_name)
234            .await?
235            .context("expected package to be installed")?;
236
237        fs.rename(
238            &tmp_dir.keep(),
239            &dir.join(&version),
240            RenameOptions {
241                ignore_if_exists: true,
242                overwrite: false,
243            },
244        )
245        .await?;
246
247        anyhow::Ok(version)
248    }
249}
250
251pub trait AgentServer: Send {
252    fn logo(&self) -> ui::IconName;
253    fn name(&self) -> SharedString;
254    fn telemetry_id(&self) -> &'static str;
255
256    fn connect(
257        &self,
258        root_dir: &Path,
259        delegate: AgentServerDelegate,
260        cx: &mut App,
261    ) -> Task<Result<Rc<dyn AgentConnection>>>;
262
263    fn into_any(self: Rc<Self>) -> Rc<dyn Any>;
264}
265
266impl dyn AgentServer {
267    pub fn downcast<T: 'static + AgentServer + Sized>(self: Rc<Self>) -> Option<Rc<T>> {
268        self.into_any().downcast().ok()
269    }
270}
271
272impl std::fmt::Debug for AgentServerCommand {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        let filtered_env = self.env.as_ref().map(|env| {
275            env.iter()
276                .map(|(k, v)| {
277                    (
278                        k,
279                        if util::redact::should_redact(k) {
280                            "[REDACTED]"
281                        } else {
282                            v
283                        },
284                    )
285                })
286                .collect::<Vec<_>>()
287        });
288
289        f.debug_struct("AgentServerCommand")
290            .field("path", &self.path)
291            .field("args", &self.args)
292            .field("env", &filtered_env)
293            .finish()
294    }
295}
296
297#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, JsonSchema)]
298pub struct AgentServerCommand {
299    #[serde(rename = "command")]
300    pub path: PathBuf,
301    #[serde(default)]
302    pub args: Vec<String>,
303    pub env: Option<HashMap<String, String>>,
304}
305
306impl AgentServerCommand {
307    pub async fn resolve(
308        path_bin_name: &'static str,
309        extra_args: &[&'static str],
310        fallback_path: Option<&Path>,
311        settings: Option<BuiltinAgentServerSettings>,
312        project: &Entity<Project>,
313        cx: &mut AsyncApp,
314    ) -> Option<Self> {
315        if let Some(settings) = settings
316            && let Some(command) = settings.custom_command()
317        {
318            Some(command)
319        } else {
320            match find_bin_in_path(path_bin_name.into(), project, cx).await {
321                Some(path) => Some(Self {
322                    path,
323                    args: extra_args.iter().map(|arg| arg.to_string()).collect(),
324                    env: None,
325                }),
326                None => fallback_path.and_then(|path| {
327                    if path.exists() {
328                        Some(Self {
329                            path: path.to_path_buf(),
330                            args: extra_args.iter().map(|arg| arg.to_string()).collect(),
331                            env: None,
332                        })
333                    } else {
334                        None
335                    }
336                }),
337            }
338        }
339    }
340}
341
342async fn find_bin_in_path(
343    bin_name: SharedString,
344    project: &Entity<Project>,
345    cx: &mut AsyncApp,
346) -> Option<PathBuf> {
347    let (env_task, root_dir) = project
348        .update(cx, |project, cx| {
349            let worktree = project.visible_worktrees(cx).next();
350            match worktree {
351                Some(worktree) => {
352                    let env_task = project.environment().update(cx, |env, cx| {
353                        env.get_worktree_environment(worktree.clone(), cx)
354                    });
355
356                    let path = worktree.read(cx).abs_path();
357                    (env_task, path)
358                }
359                None => {
360                    let path: Arc<Path> = paths::home_dir().as_path().into();
361                    let env_task = project.environment().update(cx, |env, cx| {
362                        env.get_directory_environment(path.clone(), cx)
363                    });
364                    (env_task, path)
365                }
366            }
367        })
368        .log_err()?;
369
370    cx.background_executor()
371        .spawn(async move {
372            let which_result = if cfg!(windows) {
373                which::which(bin_name.as_str())
374            } else {
375                let env = env_task.await.unwrap_or_default();
376                let shell_path = env.get("PATH").cloned();
377                which::which_in(bin_name.as_str(), shell_path.as_ref(), root_dir.as_ref())
378            };
379
380            if let Err(which::Error::CannotFindBinaryPath) = which_result {
381                return None;
382            }
383
384            which_result.log_err()
385        })
386        .await
387}