native_kernel.rs

  1use anyhow::{Context as _, Result};
  2use futures::{
  3    AsyncBufReadExt as _, StreamExt as _,
  4    channel::mpsc::{self},
  5    io::BufReader,
  6};
  7use gpui::{App, Entity, EntityId, Task, Window};
  8use jupyter_protocol::{
  9    ExecutionState, JupyterKernelspec, JupyterMessage, KernelInfoReply,
 10    connection_info::{ConnectionInfo, Transport},
 11};
 12use project::Fs;
 13use runtimelib::dirs;
 14use smol::net::TcpListener;
 15use std::{
 16    env,
 17    fmt::Debug,
 18    net::{IpAddr, Ipv4Addr, SocketAddr},
 19    path::PathBuf,
 20    sync::Arc,
 21};
 22use util::command::Command;
 23use uuid::Uuid;
 24
 25use super::{KernelSession, RunningKernel, start_kernel_tasks};
 26
 27#[derive(Debug, Clone)]
 28pub struct LocalKernelSpecification {
 29    pub name: String,
 30    pub path: PathBuf,
 31    pub kernelspec: JupyterKernelspec,
 32}
 33
 34impl PartialEq for LocalKernelSpecification {
 35    fn eq(&self, other: &Self) -> bool {
 36        self.name == other.name && self.path == other.path
 37    }
 38}
 39
 40impl Eq for LocalKernelSpecification {}
 41
 42impl LocalKernelSpecification {
 43    #[must_use]
 44    fn command(&self, connection_path: &PathBuf) -> Result<Command> {
 45        let argv = &self.kernelspec.argv;
 46
 47        anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
 48        anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
 49        anyhow::ensure!(
 50            argv.iter().any(|arg| arg == "{connection_file}"),
 51            "Missing 'connection_file' in argv in kernelspec {}",
 52            self.name
 53        );
 54
 55        let mut cmd = util::command::new_command(&argv[0]);
 56
 57        for arg in &argv[1..] {
 58            if arg == "{connection_file}" {
 59                cmd.arg(connection_path);
 60            } else {
 61                cmd.arg(arg);
 62            }
 63        }
 64
 65        if let Some(env) = &self.kernelspec.env {
 66            log::info!(
 67                "LocalKernelSpecification: applying env to command: {:?}",
 68                env.keys()
 69            );
 70            cmd.envs(env);
 71        } else {
 72            log::info!("LocalKernelSpecification: no env in kernelspec");
 73        }
 74
 75        Ok(cmd)
 76    }
 77}
 78
 79// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
 80// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
 81async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> {
 82    let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
 83    addr_zeroport.set_port(0);
 84    let mut ports: [u16; 5] = [0; 5];
 85    for i in 0..5 {
 86        let listener = TcpListener::bind(addr_zeroport).await?;
 87        let addr = listener.local_addr()?;
 88        ports[i] = addr.port();
 89    }
 90    Ok(ports)
 91}
 92
 93pub struct NativeRunningKernel {
 94    pub process: util::command::Child,
 95    connection_path: PathBuf,
 96    _process_status_task: Option<Task<()>>,
 97    pub working_directory: PathBuf,
 98    pub request_tx: mpsc::Sender<JupyterMessage>,
 99    pub stdin_tx: mpsc::Sender<JupyterMessage>,
100    pub execution_state: ExecutionState,
101    pub kernel_info: Option<KernelInfoReply>,
102}
103
104impl Debug for NativeRunningKernel {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("RunningKernel")
107            .field("process", &self.process)
108            .finish()
109    }
110}
111
112impl NativeRunningKernel {
113    pub fn new<S: KernelSession + 'static>(
114        kernel_specification: LocalKernelSpecification,
115        entity_id: EntityId,
116        working_directory: PathBuf,
117        fs: Arc<dyn Fs>,
118        // todo: convert to weak view
119        session: Entity<S>,
120        window: &mut Window,
121        cx: &mut App,
122    ) -> Task<Result<Box<dyn RunningKernel>>> {
123        window.spawn(cx, async move |cx| {
124            let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
125            let ports = peek_ports(ip).await?;
126
127            let connection_info = ConnectionInfo {
128                transport: Transport::TCP,
129                ip: ip.to_string(),
130                stdin_port: ports[0],
131                control_port: ports[1],
132                hb_port: ports[2],
133                shell_port: ports[3],
134                iopub_port: ports[4],
135                signature_scheme: "hmac-sha256".to_string(),
136                key: uuid::Uuid::new_v4().to_string(),
137                kernel_name: Some(format!("zed-{}", kernel_specification.name)),
138            };
139
140            let runtime_dir = dirs::runtime_dir();
141            fs.create_dir(&runtime_dir)
142                .await
143                .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
144            let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
145            let content = serde_json::to_string(&connection_info)?;
146            fs.atomic_write(connection_path.clone(), content).await?;
147
148            let mut cmd = kernel_specification.command(&connection_path)?;
149
150            let mut process = cmd
151                .current_dir(&working_directory)
152                .stdout(util::command::Stdio::piped())
153                .stderr(util::command::Stdio::piped())
154                .stdin(util::command::Stdio::piped())
155                .kill_on_drop(true)
156                .spawn()
157                .context("failed to start the kernel process")?;
158
159            let session_id = Uuid::new_v4().to_string();
160
161            let iopub_socket =
162                runtimelib::create_client_iopub_connection(&connection_info, "", &session_id)
163                    .await?;
164            let control_socket =
165                runtimelib::create_client_control_connection(&connection_info, &session_id).await?;
166
167            let peer_identity = runtimelib::peer_identity_for_session(&session_id)?;
168            let shell_socket = runtimelib::create_client_shell_connection_with_identity(
169                &connection_info,
170                &session_id,
171                peer_identity.clone(),
172            )
173            .await?;
174            let stdin_socket = runtimelib::create_client_stdin_connection_with_identity(
175                &connection_info,
176                &session_id,
177                peer_identity,
178            )
179            .await?;
180
181            let (request_tx, stdin_tx) = start_kernel_tasks(
182                session.clone(),
183                iopub_socket,
184                shell_socket,
185                control_socket,
186                stdin_socket,
187                cx,
188            );
189
190            let stderr = process.stderr.take();
191            let stdout = process.stdout.take();
192
193            cx.spawn(async move |_cx| {
194                use futures::future::Either;
195
196                let stderr_lines = match stderr {
197                    Some(s) => Either::Left(
198                        BufReader::new(s)
199                            .lines()
200                            .map(|line| (log::Level::Error, line)),
201                    ),
202                    None => Either::Right(futures::stream::empty()),
203                };
204                let stdout_lines = match stdout {
205                    Some(s) => Either::Left(
206                        BufReader::new(s)
207                            .lines()
208                            .map(|line| (log::Level::Info, line)),
209                    ),
210                    None => Either::Right(futures::stream::empty()),
211                };
212                let mut lines = futures::stream::select(stderr_lines, stdout_lines);
213                while let Some((level, Ok(line))) = lines.next().await {
214                    log::log!(level, "kernel: {}", line);
215                }
216            })
217            .detach();
218
219            let status = process.status();
220
221            let process_status_task = cx.spawn(async move |cx| {
222                let error_message = match status.await {
223                    Ok(status) => {
224                        if status.success() {
225                            log::info!("kernel process exited successfully");
226                            return;
227                        }
228
229                        format!("kernel process exited with status: {:?}", status)
230                    }
231                    Err(err) => {
232                        format!("kernel process exited with error: {:?}", err)
233                    }
234                };
235
236                log::error!("{}", error_message);
237
238                session.update(cx, |session, cx| {
239                    session.kernel_errored(error_message, cx);
240
241                    cx.notify();
242                });
243            });
244
245            anyhow::Ok(Box::new(Self {
246                process,
247                request_tx,
248                stdin_tx,
249                working_directory,
250                _process_status_task: Some(process_status_task),
251                connection_path,
252                execution_state: ExecutionState::Idle,
253                kernel_info: None,
254            }) as Box<dyn RunningKernel>)
255        })
256    }
257}
258
259impl RunningKernel for NativeRunningKernel {
260    fn request_tx(&self) -> mpsc::Sender<JupyterMessage> {
261        self.request_tx.clone()
262    }
263
264    fn stdin_tx(&self) -> mpsc::Sender<JupyterMessage> {
265        self.stdin_tx.clone()
266    }
267
268    fn working_directory(&self) -> &PathBuf {
269        &self.working_directory
270    }
271
272    fn execution_state(&self) -> &ExecutionState {
273        &self.execution_state
274    }
275
276    fn set_execution_state(&mut self, state: ExecutionState) {
277        self.execution_state = state;
278    }
279
280    fn kernel_info(&self) -> Option<&KernelInfoReply> {
281        self.kernel_info.as_ref()
282    }
283
284    fn set_kernel_info(&mut self, info: KernelInfoReply) {
285        self.kernel_info = Some(info);
286    }
287
288    fn force_shutdown(&mut self, _window: &mut Window, _cx: &mut App) -> Task<anyhow::Result<()>> {
289        self.kill();
290        Task::ready(Ok(()))
291    }
292
293    fn kill(&mut self) {
294        self._process_status_task.take();
295        self.request_tx.close_channel();
296        self.stdin_tx.close_channel();
297        self.process.kill().ok();
298    }
299}
300
301impl Drop for NativeRunningKernel {
302    fn drop(&mut self) {
303        std::fs::remove_file(&self.connection_path).ok();
304        self.kill();
305    }
306}
307
308async fn read_kernelspec_at(
309    // Path should be a directory to a jupyter kernelspec, as in
310    // /usr/local/share/jupyter/kernels/python3
311    kernel_dir: PathBuf,
312    fs: &dyn Fs,
313) -> Result<LocalKernelSpecification> {
314    let path = kernel_dir;
315    let kernel_name = if let Some(kernel_name) = path.file_name() {
316        kernel_name.to_string_lossy().into_owned()
317    } else {
318        anyhow::bail!("Invalid kernelspec directory: {path:?}");
319    };
320
321    if !fs.is_dir(path.as_path()).await {
322        anyhow::bail!("Not a directory: {path:?}");
323    }
324
325    let expected_kernel_json = path.join("kernel.json");
326    let spec = fs.load(expected_kernel_json.as_path()).await?;
327    let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
328
329    Ok(LocalKernelSpecification {
330        name: kernel_name,
331        path,
332        kernelspec: spec,
333    })
334}
335
336/// Read a directory of kernelspec directories
337async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> Result<Vec<LocalKernelSpecification>> {
338    let mut kernelspec_dirs = fs.read_dir(&path).await?;
339
340    let mut valid_kernelspecs = Vec::new();
341    while let Some(path) = kernelspec_dirs.next().await {
342        match path {
343            Ok(path) => {
344                if fs.is_dir(path.as_path()).await
345                    && let Ok(kernelspec) = read_kernelspec_at(path, fs).await
346                {
347                    valid_kernelspecs.push(kernelspec);
348                }
349            }
350            Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
351        }
352    }
353
354    Ok(valid_kernelspecs)
355}
356
357pub async fn local_kernel_specifications(fs: Arc<dyn Fs>) -> Result<Vec<LocalKernelSpecification>> {
358    let mut data_dirs = dirs::data_dirs();
359
360    // Pick up any kernels from conda or conda environment
361    if let Ok(conda_prefix) = env::var("CONDA_PREFIX") {
362        let conda_prefix = PathBuf::from(conda_prefix);
363        let conda_data_dir = conda_prefix.join("share").join("jupyter");
364        data_dirs.push(conda_data_dir);
365    }
366
367    // Search for kernels inside the base python environment
368    let command = util::command::new_command("python")
369        .arg("-c")
370        .arg("import sys; print(sys.prefix)")
371        .output()
372        .await;
373
374    if let Ok(command) = command
375        && command.status.success()
376    {
377        let python_prefix = String::from_utf8(command.stdout);
378        if let Ok(python_prefix) = python_prefix {
379            let python_prefix = PathBuf::from(python_prefix.trim());
380            let python_data_dir = python_prefix.join("share").join("jupyter");
381            data_dirs.push(python_data_dir);
382        }
383    }
384
385    let kernel_dirs = data_dirs
386        .iter()
387        .map(|dir| dir.join("kernels"))
388        .map(|path| read_kernels_dir(path, fs.as_ref()))
389        .collect::<Vec<_>>();
390
391    let kernel_dirs = futures::future::join_all(kernel_dirs).await;
392    let kernel_dirs = kernel_dirs
393        .into_iter()
394        .filter_map(Result::ok)
395        .flatten()
396        .collect::<Vec<_>>();
397
398    Ok(kernel_dirs)
399}
400
401#[cfg(test)]
402mod test {
403    use super::*;
404    use std::path::PathBuf;
405
406    use gpui::TestAppContext;
407    use project::FakeFs;
408    use serde_json::json;
409
410    #[gpui::test]
411    async fn test_get_kernelspecs(cx: &mut TestAppContext) {
412        let fs = FakeFs::new(cx.executor());
413        fs.insert_tree(
414            "/jupyter",
415            json!({
416                ".zed": {
417                    "settings.json": r#"{ "tab_size": 8 }"#,
418                    "tasks.json": r#"[{
419                        "label": "cargo check",
420                        "command": "cargo",
421                        "args": ["check", "--all"]
422                    },]"#,
423                },
424                "kernels": {
425                    "python": {
426                        "kernel.json": r#"{
427                            "display_name": "Python 3",
428                            "language": "python",
429                            "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
430                            "env": {}
431                        }"#
432                    },
433                    "deno": {
434                        "kernel.json": r#"{
435                            "display_name": "Deno",
436                            "language": "typescript",
437                            "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
438                            "env": {}
439                        }"#
440                    }
441                },
442            }),
443        )
444        .await;
445
446        let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
447            .await
448            .unwrap();
449
450        kernels.sort_by(|a, b| a.name.cmp(&b.name));
451
452        assert_eq!(
453            kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
454            vec!["deno", "python"]
455        );
456    }
457}