agent_servers.rs

  1mod acp;
  2mod claude;
  3mod custom;
  4mod gemini;
  5mod settings;
  6
  7#[cfg(any(test, feature = "test-support"))]
  8pub mod e2e_tests;
  9
 10use anyhow::Context as _;
 11pub use claude::*;
 12pub use custom::*;
 13use fs::Fs;
 14use fs::RemoveOptions;
 15use fs::RenameOptions;
 16use futures::StreamExt as _;
 17pub use gemini::*;
 18use gpui::AppContext;
 19use node_runtime::NodeRuntime;
 20pub use settings::*;
 21
 22use acp_thread::AgentConnection;
 23use acp_thread::LoadError;
 24use anyhow::Result;
 25use anyhow::anyhow;
 26use collections::HashMap;
 27use gpui::{App, AsyncApp, Entity, SharedString, Task};
 28use project::Project;
 29use schemars::JsonSchema;
 30use semver::Version;
 31use serde::{Deserialize, Serialize};
 32use std::str::FromStr as _;
 33use std::{
 34    any::Any,
 35    path::{Path, PathBuf},
 36    rc::Rc,
 37    sync::Arc,
 38};
 39use util::ResultExt as _;
 40
 41pub fn init(cx: &mut App) {
 42    settings::init(cx);
 43}
 44
 45pub struct AgentServerDelegate {
 46    project: Entity<Project>,
 47    status_tx: Option<watch::Sender<SharedString>>,
 48    new_version_available: Option<watch::Sender<Option<String>>>,
 49}
 50
 51impl AgentServerDelegate {
 52    pub fn new(
 53        project: Entity<Project>,
 54        status_tx: Option<watch::Sender<SharedString>>,
 55        new_version_tx: Option<watch::Sender<Option<String>>>,
 56    ) -> Self {
 57        Self {
 58            project,
 59            status_tx,
 60            new_version_available: new_version_tx,
 61        }
 62    }
 63
 64    pub fn project(&self) -> &Entity<Project> {
 65        &self.project
 66    }
 67
 68    fn get_or_npm_install_builtin_agent(
 69        self,
 70        binary_name: SharedString,
 71        package_name: SharedString,
 72        entrypoint_path: PathBuf,
 73        ignore_system_version: bool,
 74        minimum_version: Option<Version>,
 75        cx: &mut App,
 76    ) -> Task<Result<AgentServerCommand>> {
 77        let project = self.project;
 78        let fs = project.read(cx).fs().clone();
 79        let Some(node_runtime) = project.read(cx).node_runtime().cloned() else {
 80            return Task::ready(Err(anyhow!(
 81                "External agents are not yet available in remote projects."
 82            )));
 83        };
 84        let status_tx = self.status_tx;
 85        let new_version_available = self.new_version_available;
 86
 87        cx.spawn(async move |cx| {
 88            if !ignore_system_version {
 89                if let Some(bin) = find_bin_in_path(binary_name.clone(), &project, cx).await {
 90                    return Ok(AgentServerCommand {
 91                        path: bin,
 92                        args: Vec::new(),
 93                        env: Default::default(),
 94                    });
 95                }
 96            }
 97
 98            cx.spawn(async move |cx| {
 99                let node_path = node_runtime.binary_path().await?;
100                let dir = paths::data_dir()
101                    .join("external_agents")
102                    .join(binary_name.as_str());
103                fs.create_dir(&dir).await?;
104
105                let mut stream = fs.read_dir(&dir).await?;
106                let mut versions = Vec::new();
107                let mut to_delete = Vec::new();
108                while let Some(entry) = stream.next().await {
109                    let Ok(entry) = entry else { continue };
110                    let Some(file_name) = entry.file_name() else {
111                        continue;
112                    };
113
114                    if let Some(version) = file_name
115                        .to_str()
116                        .and_then(|name| semver::Version::from_str(&name).ok())
117                    {
118                        versions.push((version, file_name.to_owned()));
119                    } else {
120                        to_delete.push(file_name.to_owned())
121                    }
122                }
123
124                versions.sort();
125                let newest_version = if let Some((version, file_name)) = versions.last().cloned()
126                    && minimum_version.is_none_or(|minimum_version| version >= minimum_version)
127                {
128                    versions.pop();
129                    Some(file_name)
130                } else {
131                    None
132                };
133                log::debug!("existing version of {package_name}: {newest_version:?}");
134                to_delete.extend(versions.into_iter().map(|(_, file_name)| file_name));
135
136                cx.background_spawn({
137                    let fs = fs.clone();
138                    let dir = dir.clone();
139                    async move {
140                        for file_name in to_delete {
141                            fs.remove_dir(
142                                &dir.join(file_name),
143                                RemoveOptions {
144                                    recursive: true,
145                                    ignore_if_not_exists: false,
146                                },
147                            )
148                            .await
149                            .ok();
150                        }
151                    }
152                })
153                .detach();
154
155                let version = if let Some(file_name) = newest_version {
156                    cx.background_spawn({
157                        let file_name = file_name.clone();
158                        let dir = dir.clone();
159                        async move {
160                            let latest_version =
161                                node_runtime.npm_package_latest_version(&package_name).await;
162                            if let Ok(latest_version) = latest_version
163                                && &latest_version != &file_name.to_string_lossy()
164                            {
165                                Self::download_latest_version(
166                                    fs,
167                                    dir.clone(),
168                                    node_runtime,
169                                    package_name,
170                                )
171                                .await
172                                .log_err();
173                                if let Some(mut new_version_available) = new_version_available {
174                                    new_version_available.send(Some(latest_version)).ok();
175                                }
176                            }
177                        }
178                    })
179                    .detach();
180                    file_name
181                } else {
182                    if let Some(mut status_tx) = status_tx {
183                        status_tx.send("Installing…".into()).ok();
184                    }
185                    let dir = dir.clone();
186                    cx.background_spawn(Self::download_latest_version(
187                        fs,
188                        dir.clone(),
189                        node_runtime,
190                        package_name,
191                    ))
192                    .await?
193                    .into()
194                };
195                anyhow::Ok(AgentServerCommand {
196                    path: node_path,
197                    args: vec![
198                        dir.join(version)
199                            .join(entrypoint_path)
200                            .to_string_lossy()
201                            .to_string(),
202                    ],
203                    env: Default::default(),
204                })
205            })
206            .await
207            .map_err(|e| LoadError::FailedToInstall(e.to_string().into()).into())
208        })
209    }
210
211    async fn download_latest_version(
212        fs: Arc<dyn Fs>,
213        dir: PathBuf,
214        node_runtime: NodeRuntime,
215        package_name: SharedString,
216    ) -> Result<String> {
217        log::debug!("downloading latest version of {package_name}");
218
219        let tmp_dir = tempfile::tempdir_in(&dir)?;
220
221        node_runtime
222            .npm_install_packages(tmp_dir.path(), &[(&package_name, "latest")])
223            .await?;
224
225        let version = node_runtime
226            .npm_package_installed_version(tmp_dir.path(), &package_name)
227            .await?
228            .context("expected package to be installed")?;
229
230        fs.rename(
231            &tmp_dir.keep(),
232            &dir.join(&version),
233            RenameOptions {
234                ignore_if_exists: true,
235                overwrite: false,
236            },
237        )
238        .await?;
239
240        anyhow::Ok(version)
241    }
242}
243
244pub trait AgentServer: Send {
245    fn logo(&self) -> ui::IconName;
246    fn name(&self) -> SharedString;
247    fn telemetry_id(&self) -> &'static str;
248
249    fn connect(
250        &self,
251        root_dir: &Path,
252        delegate: AgentServerDelegate,
253        cx: &mut App,
254    ) -> Task<Result<Rc<dyn AgentConnection>>>;
255
256    fn into_any(self: Rc<Self>) -> Rc<dyn Any>;
257}
258
259impl dyn AgentServer {
260    pub fn downcast<T: 'static + AgentServer + Sized>(self: Rc<Self>) -> Option<Rc<T>> {
261        self.into_any().downcast().ok()
262    }
263}
264
265impl std::fmt::Debug for AgentServerCommand {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let filtered_env = self.env.as_ref().map(|env| {
268            env.iter()
269                .map(|(k, v)| {
270                    (
271                        k,
272                        if util::redact::should_redact(k) {
273                            "[REDACTED]"
274                        } else {
275                            v
276                        },
277                    )
278                })
279                .collect::<Vec<_>>()
280        });
281
282        f.debug_struct("AgentServerCommand")
283            .field("path", &self.path)
284            .field("args", &self.args)
285            .field("env", &filtered_env)
286            .finish()
287    }
288}
289
290#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, JsonSchema)]
291pub struct AgentServerCommand {
292    #[serde(rename = "command")]
293    pub path: PathBuf,
294    #[serde(default)]
295    pub args: Vec<String>,
296    pub env: Option<HashMap<String, String>>,
297}
298
299impl AgentServerCommand {
300    pub async fn resolve(
301        path_bin_name: &'static str,
302        extra_args: &[&'static str],
303        fallback_path: Option<&Path>,
304        settings: Option<BuiltinAgentServerSettings>,
305        project: &Entity<Project>,
306        cx: &mut AsyncApp,
307    ) -> Option<Self> {
308        if let Some(settings) = settings
309            && let Some(command) = settings.custom_command()
310        {
311            Some(command)
312        } else {
313            match find_bin_in_path(path_bin_name.into(), project, cx).await {
314                Some(path) => Some(Self {
315                    path,
316                    args: extra_args.iter().map(|arg| arg.to_string()).collect(),
317                    env: None,
318                }),
319                None => fallback_path.and_then(|path| {
320                    if path.exists() {
321                        Some(Self {
322                            path: path.to_path_buf(),
323                            args: extra_args.iter().map(|arg| arg.to_string()).collect(),
324                            env: None,
325                        })
326                    } else {
327                        None
328                    }
329                }),
330            }
331        }
332    }
333}
334
335async fn find_bin_in_path(
336    bin_name: SharedString,
337    project: &Entity<Project>,
338    cx: &mut AsyncApp,
339) -> Option<PathBuf> {
340    let (env_task, root_dir) = project
341        .update(cx, |project, cx| {
342            let worktree = project.visible_worktrees(cx).next();
343            match worktree {
344                Some(worktree) => {
345                    let env_task = project.environment().update(cx, |env, cx| {
346                        env.get_worktree_environment(worktree.clone(), cx)
347                    });
348
349                    let path = worktree.read(cx).abs_path();
350                    (env_task, path)
351                }
352                None => {
353                    let path: Arc<Path> = paths::home_dir().as_path().into();
354                    let env_task = project.environment().update(cx, |env, cx| {
355                        env.get_directory_environment(path.clone(), cx)
356                    });
357                    (env_task, path)
358                }
359            }
360        })
361        .log_err()?;
362
363    cx.background_executor()
364        .spawn(async move {
365            let which_result = if cfg!(windows) {
366                which::which(bin_name.as_str())
367            } else {
368                let env = env_task.await.unwrap_or_default();
369                let shell_path = env.get("PATH").cloned();
370                which::which_in(bin_name.as_str(), shell_path.as_ref(), root_dir.as_ref())
371            };
372
373            if let Err(which::Error::CannotFindBinaryPath) = which_result {
374                return None;
375            }
376
377            which_result.log_err()
378        })
379        .await
380}