runtimes.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use futures::lock::Mutex;
  4use futures::{channel::mpsc, SinkExt as _, StreamExt as _};
  5use gpui::{AsyncAppContext, EntityId};
  6use project::Fs;
  7use runtimelib::{dirs, ConnectionInfo, JupyterKernelspec, JupyterMessage, JupyterMessageContent};
  8use smol::{net::TcpListener, process::Command};
  9use std::fmt::Debug;
 10use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 11use std::{path::PathBuf, sync::Arc};
 12
 13#[derive(Debug)]
 14pub struct Request {
 15    pub request: runtimelib::JupyterMessageContent,
 16    pub responses_rx: mpsc::UnboundedSender<JupyterMessageContent>,
 17}
 18
 19#[derive(Debug, Clone)]
 20pub struct RuntimeSpecification {
 21    pub name: String,
 22    pub path: PathBuf,
 23    pub kernelspec: JupyterKernelspec,
 24}
 25
 26impl RuntimeSpecification {
 27    #[must_use]
 28    fn command(&self, connection_path: &PathBuf) -> Result<Command> {
 29        let argv = &self.kernelspec.argv;
 30
 31        if argv.is_empty() {
 32            return Err(anyhow::anyhow!("Empty argv in kernelspec {}", self.name));
 33        }
 34
 35        if argv.len() < 2 {
 36            return Err(anyhow::anyhow!("Invalid argv in kernelspec {}", self.name));
 37        }
 38
 39        if !argv.contains(&"{connection_file}".to_string()) {
 40            return Err(anyhow::anyhow!(
 41                "Missing 'connection_file' in argv in kernelspec {}",
 42                self.name
 43            ));
 44        }
 45
 46        let mut cmd = Command::new(&argv[0]);
 47
 48        for arg in &argv[1..] {
 49            if arg == "{connection_file}" {
 50                cmd.arg(connection_path);
 51            } else {
 52                cmd.arg(arg);
 53            }
 54        }
 55
 56        if let Some(env) = &self.kernelspec.env {
 57            cmd.envs(env);
 58        }
 59
 60        Ok(cmd)
 61    }
 62}
 63
 64// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
 65// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
 66async fn peek_ports(ip: IpAddr) -> anyhow::Result<[u16; 5]> {
 67    let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
 68    addr_zeroport.set_port(0);
 69    let mut ports: [u16; 5] = [0; 5];
 70    for i in 0..5 {
 71        let listener = TcpListener::bind(addr_zeroport).await?;
 72        let addr = listener.local_addr()?;
 73        ports[i] = addr.port();
 74    }
 75    Ok(ports)
 76}
 77
 78#[derive(Debug)]
 79pub struct RunningKernel {
 80    #[allow(unused)]
 81    runtime: RuntimeSpecification,
 82    #[allow(unused)]
 83    process: smol::process::Child,
 84    pub request_tx: mpsc::UnboundedSender<Request>,
 85}
 86
 87impl RunningKernel {
 88    pub async fn new(
 89        runtime: RuntimeSpecification,
 90        entity_id: EntityId,
 91        fs: Arc<dyn Fs>,
 92        cx: AsyncAppContext,
 93    ) -> anyhow::Result<Self> {
 94        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
 95        let ports = peek_ports(ip).await?;
 96
 97        let connection_info = ConnectionInfo {
 98            transport: "tcp".to_string(),
 99            ip: ip.to_string(),
100            stdin_port: ports[0],
101            control_port: ports[1],
102            hb_port: ports[2],
103            shell_port: ports[3],
104            iopub_port: ports[4],
105            signature_scheme: "hmac-sha256".to_string(),
106            key: uuid::Uuid::new_v4().to_string(),
107            kernel_name: Some(format!("zed-{}", runtime.name)),
108        };
109
110        let connection_path = dirs::runtime_dir().join(format!("kernel-zed-{}.json", entity_id));
111        let content = serde_json::to_string(&connection_info)?;
112        // write out file to disk for kernel
113        fs.atomic_write(connection_path.clone(), content).await?;
114
115        let mut cmd = runtime.command(&connection_path)?;
116        let process = cmd
117            // .stdout(Stdio::null())
118            // .stderr(Stdio::null())
119            .kill_on_drop(true)
120            .spawn()
121            .context("failed to start the kernel process")?;
122
123        let mut iopub = connection_info.create_client_iopub_connection("").await?;
124        let mut shell = connection_info.create_client_shell_connection().await?;
125
126        // Spawn a background task to handle incoming messages from the kernel as well
127        // as outgoing messages to the kernel
128
129        let child_messages: Arc<
130            Mutex<HashMap<String, mpsc::UnboundedSender<JupyterMessageContent>>>,
131        > = Default::default();
132
133        let (request_tx, mut request_rx) = mpsc::unbounded::<Request>();
134
135        cx.background_executor()
136            .spawn({
137                let child_messages = child_messages.clone();
138
139                async move {
140                    let child_messages = child_messages.clone();
141                    while let Ok(message) = iopub.read().await {
142                        if let Some(parent_header) = message.parent_header {
143                            let child_messages = child_messages.lock().await;
144
145                            let sender = child_messages.get(&parent_header.msg_id);
146
147                            match sender {
148                                Some(mut sender) => {
149                                    sender.send(message.content).await?;
150                                }
151                                None => {}
152                            }
153                        }
154                    }
155
156                    anyhow::Ok(())
157                }
158            })
159            .detach();
160
161        cx.background_executor()
162            .spawn({
163                let child_messages = child_messages.clone();
164                async move {
165                    while let Some(request) = request_rx.next().await {
166                        let rx = request.responses_rx.clone();
167
168                        let request: JupyterMessage = request.request.into();
169                        let msg_id = request.header.msg_id.clone();
170
171                        let mut sender = rx.clone();
172
173                        child_messages
174                            .lock()
175                            .await
176                            .insert(msg_id.clone(), sender.clone());
177
178                        shell.send(request).await?;
179
180                        let response = shell.read().await?;
181
182                        sender.send(response.content).await?;
183                    }
184
185                    anyhow::Ok(())
186                }
187            })
188            .detach();
189
190        Ok(Self {
191            runtime,
192            process,
193            request_tx,
194        })
195    }
196}
197
198async fn read_kernelspec_at(
199    // Path should be a directory to a jupyter kernelspec, as in
200    // /usr/local/share/jupyter/kernels/python3
201    kernel_dir: PathBuf,
202    fs: Arc<dyn Fs>,
203) -> anyhow::Result<RuntimeSpecification> {
204    let path = kernel_dir;
205    let kernel_name = if let Some(kernel_name) = path.file_name() {
206        kernel_name.to_string_lossy().to_string()
207    } else {
208        return Err(anyhow::anyhow!("Invalid kernelspec directory: {:?}", path));
209    };
210
211    if !fs.is_dir(path.as_path()).await {
212        return Err(anyhow::anyhow!("Not a directory: {:?}", path));
213    }
214
215    let expected_kernel_json = path.join("kernel.json");
216    let spec = fs.load(expected_kernel_json.as_path()).await?;
217    let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
218
219    Ok(RuntimeSpecification {
220        name: kernel_name,
221        path,
222        kernelspec: spec,
223    })
224}
225
226/// Read a directory of kernelspec directories
227async fn read_kernels_dir(
228    path: PathBuf,
229    fs: Arc<dyn Fs>,
230) -> anyhow::Result<Vec<RuntimeSpecification>> {
231    let mut kernelspec_dirs = fs.read_dir(&path).await?;
232
233    let mut valid_kernelspecs = Vec::new();
234    while let Some(path) = kernelspec_dirs.next().await {
235        match path {
236            Ok(path) => {
237                if fs.is_dir(path.as_path()).await {
238                    let fs = fs.clone();
239                    if let Ok(kernelspec) = read_kernelspec_at(path, fs).await {
240                        valid_kernelspecs.push(kernelspec);
241                    }
242                }
243            }
244            Err(err) => {
245                log::warn!("Error reading kernelspec directory: {:?}", err);
246            }
247        }
248    }
249
250    Ok(valid_kernelspecs)
251}
252
253pub async fn get_runtime_specifications(
254    fs: Arc<dyn Fs>,
255) -> anyhow::Result<Vec<RuntimeSpecification>> {
256    let data_dirs = dirs::data_dirs();
257    let kernel_dirs = data_dirs
258        .iter()
259        .map(|dir| dir.join("kernels"))
260        .map(|path| read_kernels_dir(path, fs.clone()))
261        .collect::<Vec<_>>();
262
263    let kernel_dirs = futures::future::join_all(kernel_dirs).await;
264    let kernel_dirs = kernel_dirs
265        .into_iter()
266        .filter_map(Result::ok)
267        .flatten()
268        .collect::<Vec<_>>();
269
270    Ok(kernel_dirs)
271}
272
273#[cfg(test)]
274mod test {
275    use super::*;
276    use std::path::PathBuf;
277
278    use gpui::TestAppContext;
279    use project::FakeFs;
280    use serde_json::json;
281
282    #[gpui::test]
283    async fn test_get_kernelspecs(cx: &mut TestAppContext) {
284        let fs = FakeFs::new(cx.executor());
285        fs.insert_tree(
286            "/jupyter",
287            json!({
288                ".zed": {
289                    "settings.json": r#"{ "tab_size": 8 }"#,
290                    "tasks.json": r#"[{
291                        "label": "cargo check",
292                        "command": "cargo",
293                        "args": ["check", "--all"]
294                    },]"#,
295                },
296                "kernels": {
297                    "python": {
298                        "kernel.json": r#"{
299                            "display_name": "Python 3",
300                            "language": "python",
301                            "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
302                            "env": {}
303                        }"#
304                    },
305                    "deno": {
306                        "kernel.json": r#"{
307                            "display_name": "Deno",
308                            "language": "typescript",
309                            "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
310                            "env": {}
311                        }"#
312                    }
313                },
314            }),
315        )
316        .await;
317
318        let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs)
319            .await
320            .unwrap();
321
322        kernels.sort_by(|a, b| a.name.cmp(&b.name));
323
324        assert_eq!(
325            kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
326            vec!["deno", "python"]
327        );
328    }
329}