kernels.rs

  1use anyhow::{Context as _, Result};
  2use futures::{
  3    channel::mpsc::{self, Receiver},
  4    future::Shared,
  5    stream::{self, SelectAll, StreamExt},
  6    SinkExt as _,
  7};
  8use gpui::{AppContext, EntityId, Task};
  9use project::Fs;
 10use runtimelib::{
 11    dirs, ConnectionInfo, ExecutionState, JupyterKernelspec, JupyterMessage, JupyterMessageContent,
 12    KernelInfoReply,
 13};
 14use smol::{net::TcpListener, process::Command};
 15use std::{
 16    fmt::Debug,
 17    net::{IpAddr, Ipv4Addr, SocketAddr},
 18    path::PathBuf,
 19    sync::Arc,
 20};
 21use ui::{Color, Indicator};
 22
 23#[derive(Debug, Clone)]
 24pub struct KernelSpecification {
 25    pub name: String,
 26    pub path: PathBuf,
 27    pub kernelspec: JupyterKernelspec,
 28}
 29
 30impl KernelSpecification {
 31    #[must_use]
 32    fn command(&self, connection_path: &PathBuf) -> anyhow::Result<Command> {
 33        let argv = &self.kernelspec.argv;
 34
 35        anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
 36        anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
 37        anyhow::ensure!(
 38            argv.iter().any(|arg| arg == "{connection_file}"),
 39            "Missing 'connection_file' in argv in kernelspec {}",
 40            self.name
 41        );
 42
 43        let mut cmd = Command::new(&argv[0]);
 44
 45        for arg in &argv[1..] {
 46            if arg == "{connection_file}" {
 47                cmd.arg(connection_path);
 48            } else {
 49                cmd.arg(arg);
 50            }
 51        }
 52
 53        if let Some(env) = &self.kernelspec.env {
 54            cmd.envs(env);
 55        }
 56
 57        Ok(cmd)
 58    }
 59}
 60
 61// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
 62// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
 63async fn peek_ports(ip: IpAddr) -> anyhow::Result<[u16; 5]> {
 64    let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
 65    addr_zeroport.set_port(0);
 66    let mut ports: [u16; 5] = [0; 5];
 67    for i in 0..5 {
 68        let listener = TcpListener::bind(addr_zeroport).await?;
 69        let addr = listener.local_addr()?;
 70        ports[i] = addr.port();
 71    }
 72    Ok(ports)
 73}
 74
 75#[derive(Debug)]
 76pub enum Kernel {
 77    RunningKernel(RunningKernel),
 78    StartingKernel(Shared<Task<()>>),
 79    ErroredLaunch(String),
 80    ShuttingDown,
 81    Shutdown,
 82}
 83
 84#[derive(Debug, Clone)]
 85pub enum KernelStatus {
 86    Idle,
 87    Busy,
 88    Starting,
 89    Error,
 90    ShuttingDown,
 91    Shutdown,
 92}
 93impl KernelStatus {
 94    pub fn is_connected(&self) -> bool {
 95        match self {
 96            KernelStatus::Idle | KernelStatus::Busy => true,
 97            _ => false,
 98        }
 99    }
100}
101
102impl ToString for KernelStatus {
103    fn to_string(&self) -> String {
104        match self {
105            KernelStatus::Idle => "Idle".to_string(),
106            KernelStatus::Busy => "Busy".to_string(),
107            KernelStatus::Starting => "Starting".to_string(),
108            KernelStatus::Error => "Error".to_string(),
109            KernelStatus::ShuttingDown => "Shutting Down".to_string(),
110            KernelStatus::Shutdown => "Shutdown".to_string(),
111        }
112    }
113}
114
115impl From<&Kernel> for KernelStatus {
116    fn from(kernel: &Kernel) -> Self {
117        match kernel {
118            Kernel::RunningKernel(kernel) => match kernel.execution_state {
119                ExecutionState::Idle => KernelStatus::Idle,
120                ExecutionState::Busy => KernelStatus::Busy,
121            },
122            Kernel::StartingKernel(_) => KernelStatus::Starting,
123            Kernel::ErroredLaunch(_) => KernelStatus::Error,
124            Kernel::ShuttingDown => KernelStatus::ShuttingDown,
125            Kernel::Shutdown => KernelStatus::Shutdown,
126        }
127    }
128}
129
130impl Kernel {
131    pub fn dot(&self) -> Indicator {
132        match self {
133            Kernel::RunningKernel(kernel) => match kernel.execution_state {
134                ExecutionState::Idle => Indicator::dot().color(Color::Success),
135                ExecutionState::Busy => Indicator::dot().color(Color::Modified),
136            },
137            Kernel::StartingKernel(_) => Indicator::dot().color(Color::Modified),
138            Kernel::ErroredLaunch(_) => Indicator::dot().color(Color::Error),
139            Kernel::ShuttingDown => Indicator::dot().color(Color::Modified),
140            Kernel::Shutdown => Indicator::dot().color(Color::Disabled),
141        }
142    }
143
144    pub fn status(&self) -> KernelStatus {
145        self.into()
146    }
147
148    pub fn set_execution_state(&mut self, status: &ExecutionState) {
149        match self {
150            Kernel::RunningKernel(running_kernel) => {
151                running_kernel.execution_state = status.clone();
152            }
153            _ => {}
154        }
155    }
156
157    pub fn set_kernel_info(&mut self, kernel_info: &KernelInfoReply) {
158        match self {
159            Kernel::RunningKernel(running_kernel) => {
160                running_kernel.kernel_info = Some(kernel_info.clone());
161            }
162            _ => {}
163        }
164    }
165}
166
167pub struct RunningKernel {
168    pub process: smol::process::Child,
169    _shell_task: Task<anyhow::Result<()>>,
170    _iopub_task: Task<anyhow::Result<()>>,
171    _control_task: Task<anyhow::Result<()>>,
172    _routing_task: Task<anyhow::Result<()>>,
173    connection_path: PathBuf,
174    pub working_directory: PathBuf,
175    pub request_tx: mpsc::Sender<JupyterMessage>,
176    pub execution_state: ExecutionState,
177    pub kernel_info: Option<KernelInfoReply>,
178}
179
180type JupyterMessageChannel = stream::SelectAll<Receiver<JupyterMessage>>;
181
182impl Debug for RunningKernel {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        f.debug_struct("RunningKernel")
185            .field("process", &self.process)
186            .finish()
187    }
188}
189
190impl RunningKernel {
191    pub fn new(
192        kernel_specification: KernelSpecification,
193        entity_id: EntityId,
194        working_directory: PathBuf,
195        fs: Arc<dyn Fs>,
196        cx: &mut AppContext,
197    ) -> Task<anyhow::Result<(Self, JupyterMessageChannel)>> {
198        cx.spawn(|cx| async move {
199            let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
200            let ports = peek_ports(ip).await?;
201
202            let connection_info = ConnectionInfo {
203                transport: "tcp".to_string(),
204                ip: ip.to_string(),
205                stdin_port: ports[0],
206                control_port: ports[1],
207                hb_port: ports[2],
208                shell_port: ports[3],
209                iopub_port: ports[4],
210                signature_scheme: "hmac-sha256".to_string(),
211                key: uuid::Uuid::new_v4().to_string(),
212                kernel_name: Some(format!("zed-{}", kernel_specification.name)),
213            };
214
215            let runtime_dir = dirs::runtime_dir();
216            fs.create_dir(&runtime_dir)
217                .await
218                .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
219            let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
220            let content = serde_json::to_string(&connection_info)?;
221            // write out file to disk for kernel
222            fs.atomic_write(connection_path.clone(), content).await?;
223
224            let mut cmd = kernel_specification.command(&connection_path)?;
225
226            let process = cmd
227                .current_dir(&working_directory)
228                // .stdout(Stdio::null())
229                // .stderr(Stdio::null())
230                .kill_on_drop(true)
231                .spawn()
232                .context("failed to start the kernel process")?;
233
234            let mut iopub_socket = connection_info.create_client_iopub_connection("").await?;
235            let mut shell_socket = connection_info.create_client_shell_connection().await?;
236            let mut control_socket = connection_info.create_client_control_connection().await?;
237
238            let (mut iopub, iosub) = futures::channel::mpsc::channel(100);
239
240            let (request_tx, mut request_rx) =
241                futures::channel::mpsc::channel::<JupyterMessage>(100);
242
243            let (mut control_reply_tx, control_reply_rx) = futures::channel::mpsc::channel(100);
244            let (mut shell_reply_tx, shell_reply_rx) = futures::channel::mpsc::channel(100);
245
246            let mut messages_rx = SelectAll::new();
247            messages_rx.push(iosub);
248            messages_rx.push(control_reply_rx);
249            messages_rx.push(shell_reply_rx);
250
251            let _iopub_task = cx.background_executor().spawn({
252                async move {
253                    while let Ok(message) = iopub_socket.read().await {
254                        iopub.send(message).await?;
255                    }
256                    anyhow::Ok(())
257                }
258            });
259
260            let (mut control_request_tx, mut control_request_rx) =
261                futures::channel::mpsc::channel(100);
262            let (mut shell_request_tx, mut shell_request_rx) = futures::channel::mpsc::channel(100);
263
264            let _routing_task = cx.background_executor().spawn({
265                async move {
266                    while let Some(message) = request_rx.next().await {
267                        match message.content {
268                            JupyterMessageContent::DebugRequest(_)
269                            | JupyterMessageContent::InterruptRequest(_)
270                            | JupyterMessageContent::ShutdownRequest(_) => {
271                                control_request_tx.send(message).await?;
272                            }
273                            _ => {
274                                shell_request_tx.send(message).await?;
275                            }
276                        }
277                    }
278                    anyhow::Ok(())
279                }
280            });
281
282            let _shell_task = cx.background_executor().spawn({
283                async move {
284                    while let Some(message) = shell_request_rx.next().await {
285                        shell_socket.send(message).await.ok();
286                        let reply = shell_socket.read().await?;
287                        shell_reply_tx.send(reply).await?;
288                    }
289                    anyhow::Ok(())
290                }
291            });
292
293            let _control_task = cx.background_executor().spawn({
294                async move {
295                    while let Some(message) = control_request_rx.next().await {
296                        control_socket.send(message).await.ok();
297                        let reply = control_socket.read().await?;
298                        control_reply_tx.send(reply).await?;
299                    }
300                    anyhow::Ok(())
301                }
302            });
303
304            anyhow::Ok((
305                Self {
306                    process,
307                    request_tx,
308                    working_directory,
309                    _shell_task,
310                    _iopub_task,
311                    _control_task,
312                    _routing_task,
313                    connection_path,
314                    execution_state: ExecutionState::Busy,
315                    kernel_info: None,
316                },
317                messages_rx,
318            ))
319        })
320    }
321}
322
323impl Drop for RunningKernel {
324    fn drop(&mut self) {
325        std::fs::remove_file(&self.connection_path).ok();
326
327        self.request_tx.close_channel();
328    }
329}
330
331async fn read_kernelspec_at(
332    // Path should be a directory to a jupyter kernelspec, as in
333    // /usr/local/share/jupyter/kernels/python3
334    kernel_dir: PathBuf,
335    fs: &dyn Fs,
336) -> anyhow::Result<KernelSpecification> {
337    let path = kernel_dir;
338    let kernel_name = if let Some(kernel_name) = path.file_name() {
339        kernel_name.to_string_lossy().to_string()
340    } else {
341        anyhow::bail!("Invalid kernelspec directory: {path:?}");
342    };
343
344    if !fs.is_dir(path.as_path()).await {
345        anyhow::bail!("Not a directory: {path:?}");
346    }
347
348    let expected_kernel_json = path.join("kernel.json");
349    let spec = fs.load(expected_kernel_json.as_path()).await?;
350    let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
351
352    Ok(KernelSpecification {
353        name: kernel_name,
354        path,
355        kernelspec: spec,
356    })
357}
358
359/// Read a directory of kernelspec directories
360async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> anyhow::Result<Vec<KernelSpecification>> {
361    let mut kernelspec_dirs = fs.read_dir(&path).await?;
362
363    let mut valid_kernelspecs = Vec::new();
364    while let Some(path) = kernelspec_dirs.next().await {
365        match path {
366            Ok(path) => {
367                if fs.is_dir(path.as_path()).await {
368                    if let Ok(kernelspec) = read_kernelspec_at(path, fs).await {
369                        valid_kernelspecs.push(kernelspec);
370                    }
371                }
372            }
373            Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
374        }
375    }
376
377    Ok(valid_kernelspecs)
378}
379
380pub async fn kernel_specifications(fs: Arc<dyn Fs>) -> anyhow::Result<Vec<KernelSpecification>> {
381    let data_dirs = dirs::data_dirs();
382    let kernel_dirs = data_dirs
383        .iter()
384        .map(|dir| dir.join("kernels"))
385        .map(|path| read_kernels_dir(path, fs.as_ref()))
386        .collect::<Vec<_>>();
387
388    let kernel_dirs = futures::future::join_all(kernel_dirs).await;
389    let kernel_dirs = kernel_dirs
390        .into_iter()
391        .filter_map(Result::ok)
392        .flatten()
393        .collect::<Vec<_>>();
394
395    Ok(kernel_dirs)
396}
397
398#[cfg(test)]
399mod test {
400    use super::*;
401    use std::path::PathBuf;
402
403    use gpui::TestAppContext;
404    use project::FakeFs;
405    use serde_json::json;
406
407    #[gpui::test]
408    async fn test_get_kernelspecs(cx: &mut TestAppContext) {
409        let fs = FakeFs::new(cx.executor());
410        fs.insert_tree(
411            "/jupyter",
412            json!({
413                ".zed": {
414                    "settings.json": r#"{ "tab_size": 8 }"#,
415                    "tasks.json": r#"[{
416                        "label": "cargo check",
417                        "command": "cargo",
418                        "args": ["check", "--all"]
419                    },]"#,
420                },
421                "kernels": {
422                    "python": {
423                        "kernel.json": r#"{
424                            "display_name": "Python 3",
425                            "language": "python",
426                            "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
427                            "env": {}
428                        }"#
429                    },
430                    "deno": {
431                        "kernel.json": r#"{
432                            "display_name": "Deno",
433                            "language": "typescript",
434                            "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
435                            "env": {}
436                        }"#
437                    }
438                },
439            }),
440        )
441        .await;
442
443        let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
444            .await
445            .unwrap();
446
447        kernels.sort_by(|a, b| a.name.cmp(&b.name));
448
449        assert_eq!(
450            kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
451            vec!["deno", "python"]
452        );
453    }
454}