native_kernel.rs

  1use anyhow::{Context as _, Result};
  2use futures::{
  3    AsyncBufReadExt as _, StreamExt as _,
  4    channel::mpsc::{self},
  5    io::BufReader,
  6};
  7use gpui::{App, Entity, EntityId, Task, Window};
  8use jupyter_protocol::{
  9    ExecutionState, JupyterKernelspec, JupyterMessage, KernelInfoReply,
 10    connection_info::{ConnectionInfo, Transport},
 11};
 12use project::Fs;
 13use runtimelib::dirs;
 14use smol::net::TcpListener;
 15use std::{
 16    env,
 17    fmt::Debug,
 18    net::{IpAddr, Ipv4Addr, SocketAddr},
 19    path::PathBuf,
 20    sync::Arc,
 21};
 22
 23use uuid::Uuid;
 24
 25use super::{KernelSession, RunningKernel, start_kernel_tasks};
 26
 27#[derive(Debug, Clone)]
 28pub struct LocalKernelSpecification {
 29    pub name: String,
 30    pub path: PathBuf,
 31    pub kernelspec: JupyterKernelspec,
 32}
 33
 34impl PartialEq for LocalKernelSpecification {
 35    fn eq(&self, other: &Self) -> bool {
 36        self.name == other.name && self.path == other.path
 37    }
 38}
 39
 40impl Eq for LocalKernelSpecification {}
 41
 42impl LocalKernelSpecification {
 43    #[must_use]
 44    fn command(&self, connection_path: &PathBuf) -> Result<std::process::Command> {
 45        let argv = &self.kernelspec.argv;
 46
 47        anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
 48        anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
 49        anyhow::ensure!(
 50            argv.iter().any(|arg| arg == "{connection_file}"),
 51            "Missing 'connection_file' in argv in kernelspec {}",
 52            self.name
 53        );
 54
 55        let mut cmd = util::command::new_std_command(&argv[0]);
 56
 57        for arg in &argv[1..] {
 58            if arg == "{connection_file}" {
 59                cmd.arg(connection_path);
 60            } else {
 61                cmd.arg(arg);
 62            }
 63        }
 64
 65        if let Some(env) = &self.kernelspec.env {
 66            log::info!(
 67                "LocalKernelSpecification: applying env to command: {:?}",
 68                env.keys()
 69            );
 70            cmd.envs(env);
 71        } else {
 72            log::info!("LocalKernelSpecification: no env in kernelspec");
 73        }
 74
 75        Ok(cmd)
 76    }
 77}
 78
 79// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
 80// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
 81async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> {
 82    let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
 83    addr_zeroport.set_port(0);
 84    let mut ports: [u16; 5] = [0; 5];
 85    for i in 0..5 {
 86        let listener = TcpListener::bind(addr_zeroport).await?;
 87        let addr = listener.local_addr()?;
 88        ports[i] = addr.port();
 89    }
 90    Ok(ports)
 91}
 92
 93pub struct NativeRunningKernel {
 94    pub process: util::process::Child,
 95    connection_path: PathBuf,
 96    _process_status_task: Option<Task<()>>,
 97    pub working_directory: PathBuf,
 98    pub request_tx: mpsc::Sender<JupyterMessage>,
 99    pub stdin_tx: mpsc::Sender<JupyterMessage>,
100    pub execution_state: ExecutionState,
101    pub kernel_info: Option<KernelInfoReply>,
102}
103
104impl Debug for NativeRunningKernel {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("RunningKernel")
107            .field("process", &*self.process)
108            .finish()
109    }
110}
111
112impl NativeRunningKernel {
113    pub fn new<S: KernelSession + 'static>(
114        kernel_specification: LocalKernelSpecification,
115        entity_id: EntityId,
116        working_directory: PathBuf,
117        fs: Arc<dyn Fs>,
118        // todo: convert to weak view
119        session: Entity<S>,
120        window: &mut Window,
121        cx: &mut App,
122    ) -> Task<Result<Box<dyn RunningKernel>>> {
123        window.spawn(cx, async move |cx| {
124            let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
125            let ports = peek_ports(ip).await?;
126
127            let connection_info = ConnectionInfo {
128                transport: Transport::TCP,
129                ip: ip.to_string(),
130                stdin_port: ports[0],
131                control_port: ports[1],
132                hb_port: ports[2],
133                shell_port: ports[3],
134                iopub_port: ports[4],
135                signature_scheme: "hmac-sha256".to_string(),
136                key: uuid::Uuid::new_v4().to_string(),
137                kernel_name: Some(format!("zed-{}", kernel_specification.name)),
138            };
139
140            let runtime_dir = dirs::runtime_dir();
141            fs.create_dir(&runtime_dir)
142                .await
143                .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
144            let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
145            let content = serde_json::to_string(&connection_info)?;
146            fs.atomic_write(connection_path.clone(), content).await?;
147
148            let mut cmd = kernel_specification.command(&connection_path)?;
149            cmd.current_dir(&working_directory);
150
151            let mut process = util::process::Child::spawn(
152                cmd,
153                std::process::Stdio::piped(),
154                std::process::Stdio::piped(),
155                std::process::Stdio::piped(),
156            )?;
157
158            let session_id = Uuid::new_v4().to_string();
159
160            let iopub_socket =
161                runtimelib::create_client_iopub_connection(&connection_info, "", &session_id)
162                    .await?;
163            let control_socket =
164                runtimelib::create_client_control_connection(&connection_info, &session_id).await?;
165
166            let peer_identity = runtimelib::peer_identity_for_session(&session_id)?;
167            let shell_socket = runtimelib::create_client_shell_connection_with_identity(
168                &connection_info,
169                &session_id,
170                peer_identity.clone(),
171            )
172            .await?;
173            let stdin_socket = runtimelib::create_client_stdin_connection_with_identity(
174                &connection_info,
175                &session_id,
176                peer_identity,
177            )
178            .await?;
179
180            let (request_tx, stdin_tx) = start_kernel_tasks(
181                session.clone(),
182                iopub_socket,
183                shell_socket,
184                control_socket,
185                stdin_socket,
186                cx,
187            );
188
189            let stderr = process.stderr.take();
190            let stdout = process.stdout.take();
191
192            cx.spawn(async move |_cx| {
193                use futures::future::Either;
194
195                let stderr_lines = match stderr {
196                    Some(s) => Either::Left(
197                        BufReader::new(s)
198                            .lines()
199                            .map(|line| (log::Level::Error, line)),
200                    ),
201                    None => Either::Right(futures::stream::empty()),
202                };
203                let stdout_lines = match stdout {
204                    Some(s) => Either::Left(
205                        BufReader::new(s)
206                            .lines()
207                            .map(|line| (log::Level::Info, line)),
208                    ),
209                    None => Either::Right(futures::stream::empty()),
210                };
211                let mut lines = futures::stream::select(stderr_lines, stdout_lines);
212                while let Some((level, Ok(line))) = lines.next().await {
213                    log::log!(level, "kernel: {}", line);
214                }
215            })
216            .detach();
217
218            let status = process.status();
219
220            let process_status_task = cx.spawn(async move |cx| {
221                let error_message = match status.await {
222                    Ok(status) => {
223                        if status.success() {
224                            log::info!("kernel process exited successfully");
225                            return;
226                        }
227
228                        format!("kernel process exited with status: {:?}", status)
229                    }
230                    Err(err) => {
231                        format!("kernel process exited with error: {:?}", err)
232                    }
233                };
234
235                log::error!("{}", error_message);
236
237                session.update(cx, |session, cx| {
238                    session.kernel_errored(error_message, cx);
239
240                    cx.notify();
241                });
242            });
243
244            anyhow::Ok(Box::new(Self {
245                process,
246                request_tx,
247                stdin_tx,
248                working_directory,
249                _process_status_task: Some(process_status_task),
250                connection_path,
251                execution_state: ExecutionState::Idle,
252                kernel_info: None,
253            }) as Box<dyn RunningKernel>)
254        })
255    }
256}
257
258impl RunningKernel for NativeRunningKernel {
259    fn request_tx(&self) -> mpsc::Sender<JupyterMessage> {
260        self.request_tx.clone()
261    }
262
263    fn stdin_tx(&self) -> mpsc::Sender<JupyterMessage> {
264        self.stdin_tx.clone()
265    }
266
267    fn working_directory(&self) -> &PathBuf {
268        &self.working_directory
269    }
270
271    fn execution_state(&self) -> &ExecutionState {
272        &self.execution_state
273    }
274
275    fn set_execution_state(&mut self, state: ExecutionState) {
276        self.execution_state = state;
277    }
278
279    fn kernel_info(&self) -> Option<&KernelInfoReply> {
280        self.kernel_info.as_ref()
281    }
282
283    fn set_kernel_info(&mut self, info: KernelInfoReply) {
284        self.kernel_info = Some(info);
285    }
286
287    fn force_shutdown(&mut self, _window: &mut Window, _cx: &mut App) -> Task<anyhow::Result<()>> {
288        self.kill();
289        Task::ready(Ok(()))
290    }
291
292    fn kill(&mut self) {
293        self._process_status_task.take();
294        self.request_tx.close_channel();
295        self.stdin_tx.close_channel();
296        self.process.kill().ok();
297    }
298}
299
300impl Drop for NativeRunningKernel {
301    fn drop(&mut self) {
302        std::fs::remove_file(&self.connection_path).ok();
303        self.kill();
304    }
305}
306
307async fn read_kernelspec_at(
308    // Path should be a directory to a jupyter kernelspec, as in
309    // /usr/local/share/jupyter/kernels/python3
310    kernel_dir: PathBuf,
311    fs: &dyn Fs,
312) -> Result<LocalKernelSpecification> {
313    let path = kernel_dir;
314    let kernel_name = if let Some(kernel_name) = path.file_name() {
315        kernel_name.to_string_lossy().into_owned()
316    } else {
317        anyhow::bail!("Invalid kernelspec directory: {path:?}");
318    };
319
320    if !fs.is_dir(path.as_path()).await {
321        anyhow::bail!("Not a directory: {path:?}");
322    }
323
324    let expected_kernel_json = path.join("kernel.json");
325    let spec = fs.load(expected_kernel_json.as_path()).await?;
326    let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
327
328    Ok(LocalKernelSpecification {
329        name: kernel_name,
330        path,
331        kernelspec: spec,
332    })
333}
334
335/// Read a directory of kernelspec directories
336async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> Result<Vec<LocalKernelSpecification>> {
337    let mut kernelspec_dirs = fs.read_dir(&path).await?;
338
339    let mut valid_kernelspecs = Vec::new();
340    while let Some(path) = kernelspec_dirs.next().await {
341        match path {
342            Ok(path) => {
343                if fs.is_dir(path.as_path()).await
344                    && let Ok(kernelspec) = read_kernelspec_at(path, fs).await
345                {
346                    valid_kernelspecs.push(kernelspec);
347                }
348            }
349            Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
350        }
351    }
352
353    Ok(valid_kernelspecs)
354}
355
356pub async fn local_kernel_specifications(fs: Arc<dyn Fs>) -> Result<Vec<LocalKernelSpecification>> {
357    let mut data_dirs = dirs::data_dirs();
358
359    // Pick up any kernels from conda or conda environment
360    if let Ok(conda_prefix) = env::var("CONDA_PREFIX") {
361        let conda_prefix = PathBuf::from(conda_prefix);
362        let conda_data_dir = conda_prefix.join("share").join("jupyter");
363        data_dirs.push(conda_data_dir);
364    }
365
366    // Search for kernels inside the base python environment
367    let command = util::command::new_command("python")
368        .arg("-c")
369        .arg("import sys; print(sys.prefix)")
370        .output()
371        .await;
372
373    if let Ok(command) = command
374        && command.status.success()
375    {
376        let python_prefix = String::from_utf8(command.stdout);
377        if let Ok(python_prefix) = python_prefix {
378            let python_prefix = PathBuf::from(python_prefix.trim());
379            let python_data_dir = python_prefix.join("share").join("jupyter");
380            data_dirs.push(python_data_dir);
381        }
382    }
383
384    let kernel_dirs = data_dirs
385        .iter()
386        .map(|dir| dir.join("kernels"))
387        .map(|path| read_kernels_dir(path, fs.as_ref()))
388        .collect::<Vec<_>>();
389
390    let kernel_dirs = futures::future::join_all(kernel_dirs).await;
391    let kernel_dirs = kernel_dirs
392        .into_iter()
393        .filter_map(Result::ok)
394        .flatten()
395        .collect::<Vec<_>>();
396
397    Ok(kernel_dirs)
398}
399
400#[cfg(test)]
401mod test {
402    use super::*;
403    use std::path::PathBuf;
404
405    use gpui::TestAppContext;
406    use project::FakeFs;
407    use serde_json::json;
408
409    #[gpui::test]
410    async fn test_get_kernelspecs(cx: &mut TestAppContext) {
411        let fs = FakeFs::new(cx.executor());
412        fs.insert_tree(
413            "/jupyter",
414            json!({
415                ".zed": {
416                    "settings.json": r#"{ "tab_size": 8 }"#,
417                    "tasks.json": r#"[{
418                        "label": "cargo check",
419                        "command": "cargo",
420                        "args": ["check", "--all"]
421                    },]"#,
422                },
423                "kernels": {
424                    "python": {
425                        "kernel.json": r#"{
426                            "display_name": "Python 3",
427                            "language": "python",
428                            "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
429                            "env": {}
430                        }"#
431                    },
432                    "deno": {
433                        "kernel.json": r#"{
434                            "display_name": "Deno",
435                            "language": "typescript",
436                            "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
437                            "env": {}
438                        }"#
439                    }
440                },
441            }),
442        )
443        .await;
444
445        let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
446            .await
447            .unwrap();
448
449        kernels.sort_by(|a, b| a.name.cmp(&b.name));
450
451        assert_eq!(
452            kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
453            vec!["deno", "python"]
454        );
455    }
456}