1use anyhow::{Context as _, Result};
2use futures::{
3 channel::mpsc::{self, Receiver},
4 future::Shared,
5 stream::{self, SelectAll, StreamExt},
6 SinkExt as _,
7};
8use gpui::{AppContext, EntityId, Task};
9use project::Fs;
10use runtimelib::{
11 dirs, ConnectionInfo, ExecutionState, JupyterKernelspec, JupyterMessage, JupyterMessageContent,
12 KernelInfoReply,
13};
14use smol::{net::TcpListener, process::Command};
15use std::{
16 fmt::Debug,
17 net::{IpAddr, Ipv4Addr, SocketAddr},
18 path::PathBuf,
19 sync::Arc,
20};
21use ui::{Color, Indicator};
22
23#[derive(Debug, Clone)]
24pub struct KernelSpecification {
25 pub name: String,
26 pub path: PathBuf,
27 pub kernelspec: JupyterKernelspec,
28}
29
30impl KernelSpecification {
31 #[must_use]
32 fn command(&self, connection_path: &PathBuf) -> anyhow::Result<Command> {
33 let argv = &self.kernelspec.argv;
34
35 anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
36 anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
37 anyhow::ensure!(
38 argv.iter().any(|arg| arg == "{connection_file}"),
39 "Missing 'connection_file' in argv in kernelspec {}",
40 self.name
41 );
42
43 let mut cmd = Command::new(&argv[0]);
44
45 for arg in &argv[1..] {
46 if arg == "{connection_file}" {
47 cmd.arg(connection_path);
48 } else {
49 cmd.arg(arg);
50 }
51 }
52
53 if let Some(env) = &self.kernelspec.env {
54 cmd.envs(env);
55 }
56
57 Ok(cmd)
58 }
59}
60
61// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
62// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
63async fn peek_ports(ip: IpAddr) -> anyhow::Result<[u16; 5]> {
64 let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
65 addr_zeroport.set_port(0);
66 let mut ports: [u16; 5] = [0; 5];
67 for i in 0..5 {
68 let listener = TcpListener::bind(addr_zeroport).await?;
69 let addr = listener.local_addr()?;
70 ports[i] = addr.port();
71 }
72 Ok(ports)
73}
74
75#[derive(Debug)]
76pub enum Kernel {
77 RunningKernel(RunningKernel),
78 StartingKernel(Shared<Task<()>>),
79 ErroredLaunch(String),
80 ShuttingDown,
81 Shutdown,
82}
83
84#[derive(Debug, Clone)]
85pub enum KernelStatus {
86 Idle,
87 Busy,
88 Starting,
89 Error,
90 ShuttingDown,
91 Shutdown,
92}
93impl KernelStatus {
94 pub fn is_connected(&self) -> bool {
95 match self {
96 KernelStatus::Idle | KernelStatus::Busy => true,
97 _ => false,
98 }
99 }
100}
101
102impl ToString for KernelStatus {
103 fn to_string(&self) -> String {
104 match self {
105 KernelStatus::Idle => "Idle".to_string(),
106 KernelStatus::Busy => "Busy".to_string(),
107 KernelStatus::Starting => "Starting".to_string(),
108 KernelStatus::Error => "Error".to_string(),
109 KernelStatus::ShuttingDown => "Shutting Down".to_string(),
110 KernelStatus::Shutdown => "Shutdown".to_string(),
111 }
112 }
113}
114
115impl From<&Kernel> for KernelStatus {
116 fn from(kernel: &Kernel) -> Self {
117 match kernel {
118 Kernel::RunningKernel(kernel) => match kernel.execution_state {
119 ExecutionState::Idle => KernelStatus::Idle,
120 ExecutionState::Busy => KernelStatus::Busy,
121 },
122 Kernel::StartingKernel(_) => KernelStatus::Starting,
123 Kernel::ErroredLaunch(_) => KernelStatus::Error,
124 Kernel::ShuttingDown => KernelStatus::ShuttingDown,
125 Kernel::Shutdown => KernelStatus::Shutdown,
126 }
127 }
128}
129
130impl Kernel {
131 pub fn dot(&self) -> Indicator {
132 match self {
133 Kernel::RunningKernel(kernel) => match kernel.execution_state {
134 ExecutionState::Idle => Indicator::dot().color(Color::Success),
135 ExecutionState::Busy => Indicator::dot().color(Color::Modified),
136 },
137 Kernel::StartingKernel(_) => Indicator::dot().color(Color::Modified),
138 Kernel::ErroredLaunch(_) => Indicator::dot().color(Color::Error),
139 Kernel::ShuttingDown => Indicator::dot().color(Color::Modified),
140 Kernel::Shutdown => Indicator::dot().color(Color::Disabled),
141 }
142 }
143
144 pub fn status(&self) -> KernelStatus {
145 self.into()
146 }
147
148 pub fn set_execution_state(&mut self, status: &ExecutionState) {
149 match self {
150 Kernel::RunningKernel(running_kernel) => {
151 running_kernel.execution_state = status.clone();
152 }
153 _ => {}
154 }
155 }
156
157 pub fn set_kernel_info(&mut self, kernel_info: &KernelInfoReply) {
158 match self {
159 Kernel::RunningKernel(running_kernel) => {
160 running_kernel.kernel_info = Some(kernel_info.clone());
161 }
162 _ => {}
163 }
164 }
165}
166
167pub struct RunningKernel {
168 pub process: smol::process::Child,
169 _shell_task: Task<anyhow::Result<()>>,
170 _iopub_task: Task<anyhow::Result<()>>,
171 _control_task: Task<anyhow::Result<()>>,
172 _routing_task: Task<anyhow::Result<()>>,
173 connection_path: PathBuf,
174 pub working_directory: PathBuf,
175 pub request_tx: mpsc::Sender<JupyterMessage>,
176 pub execution_state: ExecutionState,
177 pub kernel_info: Option<KernelInfoReply>,
178}
179
180type JupyterMessageChannel = stream::SelectAll<Receiver<JupyterMessage>>;
181
182impl Debug for RunningKernel {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("RunningKernel")
185 .field("process", &self.process)
186 .finish()
187 }
188}
189
190impl RunningKernel {
191 pub fn new(
192 kernel_specification: KernelSpecification,
193 entity_id: EntityId,
194 working_directory: PathBuf,
195 fs: Arc<dyn Fs>,
196 cx: &mut AppContext,
197 ) -> Task<anyhow::Result<(Self, JupyterMessageChannel)>> {
198 cx.spawn(|cx| async move {
199 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
200 let ports = peek_ports(ip).await?;
201
202 let connection_info = ConnectionInfo {
203 transport: "tcp".to_string(),
204 ip: ip.to_string(),
205 stdin_port: ports[0],
206 control_port: ports[1],
207 hb_port: ports[2],
208 shell_port: ports[3],
209 iopub_port: ports[4],
210 signature_scheme: "hmac-sha256".to_string(),
211 key: uuid::Uuid::new_v4().to_string(),
212 kernel_name: Some(format!("zed-{}", kernel_specification.name)),
213 };
214
215 let runtime_dir = dirs::runtime_dir();
216 fs.create_dir(&runtime_dir)
217 .await
218 .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
219 let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
220 let content = serde_json::to_string(&connection_info)?;
221 // write out file to disk for kernel
222 fs.atomic_write(connection_path.clone(), content).await?;
223
224 let mut cmd = kernel_specification.command(&connection_path)?;
225
226 let process = cmd
227 .current_dir(&working_directory)
228 // .stdout(Stdio::null())
229 // .stderr(Stdio::null())
230 .kill_on_drop(true)
231 .spawn()
232 .context("failed to start the kernel process")?;
233
234 let mut iopub_socket = connection_info.create_client_iopub_connection("").await?;
235 let mut shell_socket = connection_info.create_client_shell_connection().await?;
236 let mut control_socket = connection_info.create_client_control_connection().await?;
237
238 let (mut iopub, iosub) = futures::channel::mpsc::channel(100);
239
240 let (request_tx, mut request_rx) =
241 futures::channel::mpsc::channel::<JupyterMessage>(100);
242
243 let (mut control_reply_tx, control_reply_rx) = futures::channel::mpsc::channel(100);
244 let (mut shell_reply_tx, shell_reply_rx) = futures::channel::mpsc::channel(100);
245
246 let mut messages_rx = SelectAll::new();
247 messages_rx.push(iosub);
248 messages_rx.push(control_reply_rx);
249 messages_rx.push(shell_reply_rx);
250
251 let _iopub_task = cx.background_executor().spawn({
252 async move {
253 while let Ok(message) = iopub_socket.read().await {
254 iopub.send(message).await?;
255 }
256 anyhow::Ok(())
257 }
258 });
259
260 let (mut control_request_tx, mut control_request_rx) =
261 futures::channel::mpsc::channel(100);
262 let (mut shell_request_tx, mut shell_request_rx) = futures::channel::mpsc::channel(100);
263
264 let _routing_task = cx.background_executor().spawn({
265 async move {
266 while let Some(message) = request_rx.next().await {
267 match message.content {
268 JupyterMessageContent::DebugRequest(_)
269 | JupyterMessageContent::InterruptRequest(_)
270 | JupyterMessageContent::ShutdownRequest(_) => {
271 control_request_tx.send(message).await?;
272 }
273 _ => {
274 shell_request_tx.send(message).await?;
275 }
276 }
277 }
278 anyhow::Ok(())
279 }
280 });
281
282 let _shell_task = cx.background_executor().spawn({
283 async move {
284 while let Some(message) = shell_request_rx.next().await {
285 shell_socket.send(message).await.ok();
286 let reply = shell_socket.read().await?;
287 shell_reply_tx.send(reply).await?;
288 }
289 anyhow::Ok(())
290 }
291 });
292
293 let _control_task = cx.background_executor().spawn({
294 async move {
295 while let Some(message) = control_request_rx.next().await {
296 control_socket.send(message).await.ok();
297 let reply = control_socket.read().await?;
298 control_reply_tx.send(reply).await?;
299 }
300 anyhow::Ok(())
301 }
302 });
303
304 anyhow::Ok((
305 Self {
306 process,
307 request_tx,
308 working_directory,
309 _shell_task,
310 _iopub_task,
311 _control_task,
312 _routing_task,
313 connection_path,
314 execution_state: ExecutionState::Busy,
315 kernel_info: None,
316 },
317 messages_rx,
318 ))
319 })
320 }
321}
322
323impl Drop for RunningKernel {
324 fn drop(&mut self) {
325 std::fs::remove_file(&self.connection_path).ok();
326
327 self.request_tx.close_channel();
328 }
329}
330
331async fn read_kernelspec_at(
332 // Path should be a directory to a jupyter kernelspec, as in
333 // /usr/local/share/jupyter/kernels/python3
334 kernel_dir: PathBuf,
335 fs: &dyn Fs,
336) -> anyhow::Result<KernelSpecification> {
337 let path = kernel_dir;
338 let kernel_name = if let Some(kernel_name) = path.file_name() {
339 kernel_name.to_string_lossy().to_string()
340 } else {
341 anyhow::bail!("Invalid kernelspec directory: {path:?}");
342 };
343
344 if !fs.is_dir(path.as_path()).await {
345 anyhow::bail!("Not a directory: {path:?}");
346 }
347
348 let expected_kernel_json = path.join("kernel.json");
349 let spec = fs.load(expected_kernel_json.as_path()).await?;
350 let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
351
352 Ok(KernelSpecification {
353 name: kernel_name,
354 path,
355 kernelspec: spec,
356 })
357}
358
359/// Read a directory of kernelspec directories
360async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> anyhow::Result<Vec<KernelSpecification>> {
361 let mut kernelspec_dirs = fs.read_dir(&path).await?;
362
363 let mut valid_kernelspecs = Vec::new();
364 while let Some(path) = kernelspec_dirs.next().await {
365 match path {
366 Ok(path) => {
367 if fs.is_dir(path.as_path()).await {
368 if let Ok(kernelspec) = read_kernelspec_at(path, fs).await {
369 valid_kernelspecs.push(kernelspec);
370 }
371 }
372 }
373 Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
374 }
375 }
376
377 Ok(valid_kernelspecs)
378}
379
380pub async fn kernel_specifications(fs: Arc<dyn Fs>) -> anyhow::Result<Vec<KernelSpecification>> {
381 let data_dirs = dirs::data_dirs();
382 let kernel_dirs = data_dirs
383 .iter()
384 .map(|dir| dir.join("kernels"))
385 .map(|path| read_kernels_dir(path, fs.as_ref()))
386 .collect::<Vec<_>>();
387
388 let kernel_dirs = futures::future::join_all(kernel_dirs).await;
389 let kernel_dirs = kernel_dirs
390 .into_iter()
391 .filter_map(Result::ok)
392 .flatten()
393 .collect::<Vec<_>>();
394
395 Ok(kernel_dirs)
396}
397
398#[cfg(test)]
399mod test {
400 use super::*;
401 use std::path::PathBuf;
402
403 use gpui::TestAppContext;
404 use project::FakeFs;
405 use serde_json::json;
406
407 #[gpui::test]
408 async fn test_get_kernelspecs(cx: &mut TestAppContext) {
409 let fs = FakeFs::new(cx.executor());
410 fs.insert_tree(
411 "/jupyter",
412 json!({
413 ".zed": {
414 "settings.json": r#"{ "tab_size": 8 }"#,
415 "tasks.json": r#"[{
416 "label": "cargo check",
417 "command": "cargo",
418 "args": ["check", "--all"]
419 },]"#,
420 },
421 "kernels": {
422 "python": {
423 "kernel.json": r#"{
424 "display_name": "Python 3",
425 "language": "python",
426 "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
427 "env": {}
428 }"#
429 },
430 "deno": {
431 "kernel.json": r#"{
432 "display_name": "Deno",
433 "language": "typescript",
434 "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
435 "env": {}
436 }"#
437 }
438 },
439 }),
440 )
441 .await;
442
443 let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
444 .await
445 .unwrap();
446
447 kernels.sort_by(|a, b| a.name.cmp(&b.name));
448
449 assert_eq!(
450 kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
451 vec!["deno", "python"]
452 );
453 }
454}