1use anyhow::{Context as _, Result};
2use futures::{
3 AsyncBufReadExt as _, FutureExt as _, StreamExt as _,
4 channel::mpsc::{self},
5 io::BufReader,
6 stream::FuturesUnordered,
7};
8use gpui::{App, AppContext as _, ClipboardItem, Entity, EntityId, Task, Window};
9use jupyter_protocol::{
10 ExecutionState, JupyterKernelspec, JupyterMessage, JupyterMessageContent, KernelInfoReply,
11 connection_info::{ConnectionInfo, Transport},
12};
13use project::Fs;
14use runtimelib::{RuntimeError, dirs};
15use smol::{net::TcpListener, process::Command};
16use std::{
17 env,
18 fmt::Debug,
19 net::{IpAddr, Ipv4Addr, SocketAddr},
20 path::PathBuf,
21 sync::Arc,
22};
23use uuid::Uuid;
24
25use super::{KernelSession, RunningKernel};
26
27#[derive(Debug, Clone)]
28pub struct LocalKernelSpecification {
29 pub name: String,
30 pub path: PathBuf,
31 pub kernelspec: JupyterKernelspec,
32}
33
34impl PartialEq for LocalKernelSpecification {
35 fn eq(&self, other: &Self) -> bool {
36 self.name == other.name && self.path == other.path
37 }
38}
39
40impl Eq for LocalKernelSpecification {}
41
42impl LocalKernelSpecification {
43 #[must_use]
44 fn command(&self, connection_path: &PathBuf) -> Result<Command> {
45 let argv = &self.kernelspec.argv;
46
47 anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
48 anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
49 anyhow::ensure!(
50 argv.iter().any(|arg| arg == "{connection_file}"),
51 "Missing 'connection_file' in argv in kernelspec {}",
52 self.name
53 );
54
55 let mut cmd = util::command::new_smol_command(&argv[0]);
56
57 for arg in &argv[1..] {
58 if arg == "{connection_file}" {
59 cmd.arg(connection_path);
60 } else {
61 cmd.arg(arg);
62 }
63 }
64
65 if let Some(env) = &self.kernelspec.env {
66 cmd.envs(env);
67 }
68
69 Ok(cmd)
70 }
71}
72
73// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
74// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
75async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> {
76 let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
77 addr_zeroport.set_port(0);
78 let mut ports: [u16; 5] = [0; 5];
79 for i in 0..5 {
80 let listener = TcpListener::bind(addr_zeroport).await?;
81 let addr = listener.local_addr()?;
82 ports[i] = addr.port();
83 }
84 Ok(ports)
85}
86
87pub struct NativeRunningKernel {
88 pub process: smol::process::Child,
89 connection_path: PathBuf,
90 _process_status_task: Option<Task<()>>,
91 pub working_directory: PathBuf,
92 pub request_tx: mpsc::Sender<JupyterMessage>,
93 pub execution_state: ExecutionState,
94 pub kernel_info: Option<KernelInfoReply>,
95}
96
97impl Debug for NativeRunningKernel {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("RunningKernel")
100 .field("process", &self.process)
101 .finish()
102 }
103}
104
105impl NativeRunningKernel {
106 pub fn new<S: KernelSession + 'static>(
107 kernel_specification: LocalKernelSpecification,
108 entity_id: EntityId,
109 working_directory: PathBuf,
110 fs: Arc<dyn Fs>,
111 // todo: convert to weak view
112 session: Entity<S>,
113 window: &mut Window,
114 cx: &mut App,
115 ) -> Task<Result<Box<dyn RunningKernel>>> {
116 window.spawn(cx, async move |cx| {
117 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
118 let ports = peek_ports(ip).await?;
119
120 let connection_info = ConnectionInfo {
121 transport: Transport::TCP,
122 ip: ip.to_string(),
123 stdin_port: ports[0],
124 control_port: ports[1],
125 hb_port: ports[2],
126 shell_port: ports[3],
127 iopub_port: ports[4],
128 signature_scheme: "hmac-sha256".to_string(),
129 key: uuid::Uuid::new_v4().to_string(),
130 kernel_name: Some(format!("zed-{}", kernel_specification.name)),
131 };
132
133 let runtime_dir = dirs::runtime_dir();
134 fs.create_dir(&runtime_dir)
135 .await
136 .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
137 let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
138 let content = serde_json::to_string(&connection_info)?;
139 fs.atomic_write(connection_path.clone(), content).await?;
140
141 let mut cmd = kernel_specification.command(&connection_path)?;
142
143 let mut process = cmd
144 .current_dir(&working_directory)
145 .stdout(std::process::Stdio::piped())
146 .stderr(std::process::Stdio::piped())
147 .stdin(std::process::Stdio::piped())
148 .kill_on_drop(true)
149 .spawn()
150 .context("failed to start the kernel process")?;
151
152 let session_id = Uuid::new_v4().to_string();
153
154 let iopub_socket =
155 runtimelib::create_client_iopub_connection(&connection_info, "", &session_id)
156 .await?;
157 let shell_socket =
158 runtimelib::create_client_shell_connection(&connection_info, &session_id).await?;
159 let control_socket =
160 runtimelib::create_client_control_connection(&connection_info, &session_id).await?;
161
162 let (mut shell_send, shell_recv) = shell_socket.split();
163 let (mut control_send, control_recv) = control_socket.split();
164
165 let (request_tx, mut request_rx) =
166 futures::channel::mpsc::channel::<JupyterMessage>(100);
167
168 let recv_task = cx.spawn({
169 let session = session.clone();
170 let mut iopub = iopub_socket;
171 let mut shell = shell_recv;
172 let mut control = control_recv;
173
174 async move |cx| -> anyhow::Result<()> {
175 loop {
176 let (channel, result) = futures::select! {
177 msg = iopub.read().fuse() => ("iopub", msg),
178 msg = shell.read().fuse() => ("shell", msg),
179 msg = control.read().fuse() => ("control", msg),
180 };
181 match result {
182 Ok(message) => {
183 session
184 .update_in(cx, |session, window, cx| {
185 session.route(&message, window, cx);
186 })
187 .ok();
188 }
189 Err(
190 ref err @ (RuntimeError::ParseError { .. }
191 | RuntimeError::SerdeError(_)),
192 ) => {
193 let error_detail =
194 format!("Kernel issue on {channel} channel\n\n{err}");
195 log::warn!("kernel: {error_detail}");
196 let workspace_window = session
197 .update_in(cx, |_, window, _cx| {
198 window
199 .window_handle()
200 .downcast::<workspace::Workspace>()
201 })
202 .ok()
203 .flatten();
204 if let Some(workspace_window) = workspace_window {
205 workspace_window
206 .update(cx, |workspace, _window, cx| {
207 struct KernelReadError;
208 workspace.show_toast(
209 workspace::Toast::new(
210 workspace::notifications::NotificationId::unique::<KernelReadError>(),
211 error_detail.clone(),
212 )
213 .on_click(
214 "Copy Error",
215 move |_window, cx| {
216 cx.write_to_clipboard(
217 ClipboardItem::new_string(
218 error_detail.clone(),
219 ),
220 );
221 },
222 ),
223 workspace::notifications::NotificationSource::Repl,
224 cx,
225 );
226 })
227 .ok();
228 }
229 }
230 Err(err) => {
231 anyhow::bail!("{channel} recv: {err}");
232 }
233 }
234 }
235 }
236 });
237
238 let routing_task = cx.background_spawn({
239 async move {
240 while let Some(message) = request_rx.next().await {
241 match message.content {
242 JupyterMessageContent::DebugRequest(_)
243 | JupyterMessageContent::InterruptRequest(_)
244 | JupyterMessageContent::ShutdownRequest(_) => {
245 control_send.send(message).await?;
246 }
247 _ => {
248 shell_send.send(message).await?;
249 }
250 }
251 }
252 anyhow::Ok(())
253 }
254 });
255
256 let stderr = process.stderr.take();
257 let stdout = process.stdout.take();
258
259 cx.spawn(async move |_cx| {
260 use futures::future::Either;
261
262 let stderr_lines = match stderr {
263 Some(s) => Either::Left(
264 BufReader::new(s)
265 .lines()
266 .map(|line| (log::Level::Error, line)),
267 ),
268 None => Either::Right(futures::stream::empty()),
269 };
270 let stdout_lines = match stdout {
271 Some(s) => Either::Left(
272 BufReader::new(s)
273 .lines()
274 .map(|line| (log::Level::Info, line)),
275 ),
276 None => Either::Right(futures::stream::empty()),
277 };
278 let mut lines = futures::stream::select(stderr_lines, stdout_lines);
279 while let Some((level, Ok(line))) = lines.next().await {
280 log::log!(level, "kernel: {}", line);
281 }
282 })
283 .detach();
284
285 cx.spawn({
286 let session = session.clone();
287 async move |cx| {
288 async fn with_name(
289 name: &'static str,
290 task: Task<Result<()>>,
291 ) -> (&'static str, Result<()>) {
292 (name, task.await)
293 }
294
295 let mut tasks = FuturesUnordered::new();
296 tasks.push(with_name("recv task", recv_task));
297 tasks.push(with_name("routing task", routing_task));
298
299 while let Some((name, result)) = tasks.next().await {
300 if let Err(err) = result {
301 log::error!("kernel: handling failed for {name}: {err:?}");
302
303 session.update(cx, |session, cx| {
304 session.kernel_errored(
305 format!("handling failed for {name}: {err}"),
306 cx,
307 );
308 cx.notify();
309 });
310 }
311 }
312 }
313 })
314 .detach();
315
316 let status = process.status();
317
318 let process_status_task = cx.spawn(async move |cx| {
319 let error_message = match status.await {
320 Ok(status) => {
321 if status.success() {
322 log::info!("kernel process exited successfully");
323 return;
324 }
325
326 format!("kernel process exited with status: {:?}", status)
327 }
328 Err(err) => {
329 format!("kernel process exited with error: {:?}", err)
330 }
331 };
332
333 log::error!("{}", error_message);
334
335 session.update(cx, |session, cx| {
336 session.kernel_errored(error_message, cx);
337
338 cx.notify();
339 });
340 });
341
342 anyhow::Ok(Box::new(Self {
343 process,
344 request_tx,
345 working_directory,
346 _process_status_task: Some(process_status_task),
347 connection_path,
348 execution_state: ExecutionState::Idle,
349 kernel_info: None,
350 }) as Box<dyn RunningKernel>)
351 })
352 }
353}
354
355impl RunningKernel for NativeRunningKernel {
356 fn request_tx(&self) -> mpsc::Sender<JupyterMessage> {
357 self.request_tx.clone()
358 }
359
360 fn working_directory(&self) -> &PathBuf {
361 &self.working_directory
362 }
363
364 fn execution_state(&self) -> &ExecutionState {
365 &self.execution_state
366 }
367
368 fn set_execution_state(&mut self, state: ExecutionState) {
369 self.execution_state = state;
370 }
371
372 fn kernel_info(&self) -> Option<&KernelInfoReply> {
373 self.kernel_info.as_ref()
374 }
375
376 fn set_kernel_info(&mut self, info: KernelInfoReply) {
377 self.kernel_info = Some(info);
378 }
379
380 fn force_shutdown(&mut self, _window: &mut Window, _cx: &mut App) -> Task<anyhow::Result<()>> {
381 self.kill();
382 Task::ready(Ok(()))
383 }
384
385 fn kill(&mut self) {
386 self._process_status_task.take();
387 self.request_tx.close_channel();
388 self.process.kill().ok();
389 }
390}
391
392impl Drop for NativeRunningKernel {
393 fn drop(&mut self) {
394 std::fs::remove_file(&self.connection_path).ok();
395 self.kill();
396 }
397}
398
399async fn read_kernelspec_at(
400 // Path should be a directory to a jupyter kernelspec, as in
401 // /usr/local/share/jupyter/kernels/python3
402 kernel_dir: PathBuf,
403 fs: &dyn Fs,
404) -> Result<LocalKernelSpecification> {
405 let path = kernel_dir;
406 let kernel_name = if let Some(kernel_name) = path.file_name() {
407 kernel_name.to_string_lossy().into_owned()
408 } else {
409 anyhow::bail!("Invalid kernelspec directory: {path:?}");
410 };
411
412 if !fs.is_dir(path.as_path()).await {
413 anyhow::bail!("Not a directory: {path:?}");
414 }
415
416 let expected_kernel_json = path.join("kernel.json");
417 let spec = fs.load(expected_kernel_json.as_path()).await?;
418 let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
419
420 Ok(LocalKernelSpecification {
421 name: kernel_name,
422 path,
423 kernelspec: spec,
424 })
425}
426
427/// Read a directory of kernelspec directories
428async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> Result<Vec<LocalKernelSpecification>> {
429 let mut kernelspec_dirs = fs.read_dir(&path).await?;
430
431 let mut valid_kernelspecs = Vec::new();
432 while let Some(path) = kernelspec_dirs.next().await {
433 match path {
434 Ok(path) => {
435 if fs.is_dir(path.as_path()).await
436 && let Ok(kernelspec) = read_kernelspec_at(path, fs).await
437 {
438 valid_kernelspecs.push(kernelspec);
439 }
440 }
441 Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
442 }
443 }
444
445 Ok(valid_kernelspecs)
446}
447
448pub async fn local_kernel_specifications(fs: Arc<dyn Fs>) -> Result<Vec<LocalKernelSpecification>> {
449 let mut data_dirs = dirs::data_dirs();
450
451 // Pick up any kernels from conda or conda environment
452 if let Ok(conda_prefix) = env::var("CONDA_PREFIX") {
453 let conda_prefix = PathBuf::from(conda_prefix);
454 let conda_data_dir = conda_prefix.join("share").join("jupyter");
455 data_dirs.push(conda_data_dir);
456 }
457
458 // Search for kernels inside the base python environment
459 let command = util::command::new_smol_command("python")
460 .arg("-c")
461 .arg("import sys; print(sys.prefix)")
462 .output()
463 .await;
464
465 if let Ok(command) = command
466 && command.status.success()
467 {
468 let python_prefix = String::from_utf8(command.stdout);
469 if let Ok(python_prefix) = python_prefix {
470 let python_prefix = PathBuf::from(python_prefix.trim());
471 let python_data_dir = python_prefix.join("share").join("jupyter");
472 data_dirs.push(python_data_dir);
473 }
474 }
475
476 let kernel_dirs = data_dirs
477 .iter()
478 .map(|dir| dir.join("kernels"))
479 .map(|path| read_kernels_dir(path, fs.as_ref()))
480 .collect::<Vec<_>>();
481
482 let kernel_dirs = futures::future::join_all(kernel_dirs).await;
483 let kernel_dirs = kernel_dirs
484 .into_iter()
485 .filter_map(Result::ok)
486 .flatten()
487 .collect::<Vec<_>>();
488
489 Ok(kernel_dirs)
490}
491
492#[cfg(test)]
493mod test {
494 use super::*;
495 use std::path::PathBuf;
496
497 use gpui::TestAppContext;
498 use project::FakeFs;
499 use serde_json::json;
500
501 #[gpui::test]
502 async fn test_get_kernelspecs(cx: &mut TestAppContext) {
503 let fs = FakeFs::new(cx.executor());
504 fs.insert_tree(
505 "/jupyter",
506 json!({
507 ".zed": {
508 "settings.json": r#"{ "tab_size": 8 }"#,
509 "tasks.json": r#"[{
510 "label": "cargo check",
511 "command": "cargo",
512 "args": ["check", "--all"]
513 },]"#,
514 },
515 "kernels": {
516 "python": {
517 "kernel.json": r#"{
518 "display_name": "Python 3",
519 "language": "python",
520 "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
521 "env": {}
522 }"#
523 },
524 "deno": {
525 "kernel.json": r#"{
526 "display_name": "Deno",
527 "language": "typescript",
528 "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
529 "env": {}
530 }"#
531 }
532 },
533 }),
534 )
535 .await;
536
537 let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
538 .await
539 .unwrap();
540
541 kernels.sort_by(|a, b| a.name.cmp(&b.name));
542
543 assert_eq!(
544 kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
545 vec!["deno", "python"]
546 );
547 }
548}