1use anyhow::{Context as _, Result};
2use futures::{
3 AsyncBufReadExt as _, StreamExt as _,
4 channel::mpsc::{self},
5 io::BufReader,
6};
7use gpui::{App, Entity, EntityId, Task, Window};
8use jupyter_protocol::{
9 ExecutionState, JupyterKernelspec, JupyterMessage, KernelInfoReply,
10 connection_info::{ConnectionInfo, Transport},
11};
12use project::Fs;
13use runtimelib::dirs;
14use smol::net::TcpListener;
15use std::{
16 env,
17 fmt::Debug,
18 net::{IpAddr, Ipv4Addr, SocketAddr},
19 path::PathBuf,
20 sync::Arc,
21};
22use util::command::Command;
23use uuid::Uuid;
24
25use super::{KernelSession, RunningKernel, start_kernel_tasks};
26
27#[derive(Debug, Clone)]
28pub struct LocalKernelSpecification {
29 pub name: String,
30 pub path: PathBuf,
31 pub kernelspec: JupyterKernelspec,
32}
33
34impl PartialEq for LocalKernelSpecification {
35 fn eq(&self, other: &Self) -> bool {
36 self.name == other.name && self.path == other.path
37 }
38}
39
40impl Eq for LocalKernelSpecification {}
41
42impl LocalKernelSpecification {
43 #[must_use]
44 fn command(&self, connection_path: &PathBuf) -> Result<Command> {
45 let argv = &self.kernelspec.argv;
46
47 anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
48 anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
49 anyhow::ensure!(
50 argv.iter().any(|arg| arg == "{connection_file}"),
51 "Missing 'connection_file' in argv in kernelspec {}",
52 self.name
53 );
54
55 let mut cmd = util::command::new_command(&argv[0]);
56
57 for arg in &argv[1..] {
58 if arg == "{connection_file}" {
59 cmd.arg(connection_path);
60 } else {
61 cmd.arg(arg);
62 }
63 }
64
65 if let Some(env) = &self.kernelspec.env {
66 log::info!(
67 "LocalKernelSpecification: applying env to command: {:?}",
68 env.keys()
69 );
70 cmd.envs(env);
71 } else {
72 log::info!("LocalKernelSpecification: no env in kernelspec");
73 }
74
75 Ok(cmd)
76 }
77}
78
79// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
80// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
81async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> {
82 let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
83 addr_zeroport.set_port(0);
84 let mut ports: [u16; 5] = [0; 5];
85 for i in 0..5 {
86 let listener = TcpListener::bind(addr_zeroport).await?;
87 let addr = listener.local_addr()?;
88 ports[i] = addr.port();
89 }
90 Ok(ports)
91}
92
93pub struct NativeRunningKernel {
94 pub process: util::command::Child,
95 connection_path: PathBuf,
96 _process_status_task: Option<Task<()>>,
97 pub working_directory: PathBuf,
98 pub request_tx: mpsc::Sender<JupyterMessage>,
99 pub stdin_tx: mpsc::Sender<JupyterMessage>,
100 pub execution_state: ExecutionState,
101 pub kernel_info: Option<KernelInfoReply>,
102}
103
104impl Debug for NativeRunningKernel {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("RunningKernel")
107 .field("process", &self.process)
108 .finish()
109 }
110}
111
112impl NativeRunningKernel {
113 pub fn new<S: KernelSession + 'static>(
114 kernel_specification: LocalKernelSpecification,
115 entity_id: EntityId,
116 working_directory: PathBuf,
117 fs: Arc<dyn Fs>,
118 // todo: convert to weak view
119 session: Entity<S>,
120 window: &mut Window,
121 cx: &mut App,
122 ) -> Task<Result<Box<dyn RunningKernel>>> {
123 window.spawn(cx, async move |cx| {
124 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
125 let ports = peek_ports(ip).await?;
126
127 let connection_info = ConnectionInfo {
128 transport: Transport::TCP,
129 ip: ip.to_string(),
130 stdin_port: ports[0],
131 control_port: ports[1],
132 hb_port: ports[2],
133 shell_port: ports[3],
134 iopub_port: ports[4],
135 signature_scheme: "hmac-sha256".to_string(),
136 key: uuid::Uuid::new_v4().to_string(),
137 kernel_name: Some(format!("zed-{}", kernel_specification.name)),
138 };
139
140 let runtime_dir = dirs::runtime_dir();
141 fs.create_dir(&runtime_dir)
142 .await
143 .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
144 let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
145 let content = serde_json::to_string(&connection_info)?;
146 fs.atomic_write(connection_path.clone(), content).await?;
147
148 let mut cmd = kernel_specification.command(&connection_path)?;
149
150 let mut process = cmd
151 .current_dir(&working_directory)
152 .stdout(util::command::Stdio::piped())
153 .stderr(util::command::Stdio::piped())
154 .stdin(util::command::Stdio::piped())
155 .kill_on_drop(true)
156 .spawn()
157 .context("failed to start the kernel process")?;
158
159 let session_id = Uuid::new_v4().to_string();
160
161 let iopub_socket =
162 runtimelib::create_client_iopub_connection(&connection_info, "", &session_id)
163 .await?;
164 let control_socket =
165 runtimelib::create_client_control_connection(&connection_info, &session_id).await?;
166
167 let peer_identity = runtimelib::peer_identity_for_session(&session_id)?;
168 let shell_socket = runtimelib::create_client_shell_connection_with_identity(
169 &connection_info,
170 &session_id,
171 peer_identity.clone(),
172 )
173 .await?;
174 let stdin_socket = runtimelib::create_client_stdin_connection_with_identity(
175 &connection_info,
176 &session_id,
177 peer_identity,
178 )
179 .await?;
180
181 let (request_tx, stdin_tx) = start_kernel_tasks(
182 session.clone(),
183 iopub_socket,
184 shell_socket,
185 control_socket,
186 stdin_socket,
187 cx,
188 );
189
190 let stderr = process.stderr.take();
191 let stdout = process.stdout.take();
192
193 cx.spawn(async move |_cx| {
194 use futures::future::Either;
195
196 let stderr_lines = match stderr {
197 Some(s) => Either::Left(
198 BufReader::new(s)
199 .lines()
200 .map(|line| (log::Level::Error, line)),
201 ),
202 None => Either::Right(futures::stream::empty()),
203 };
204 let stdout_lines = match stdout {
205 Some(s) => Either::Left(
206 BufReader::new(s)
207 .lines()
208 .map(|line| (log::Level::Info, line)),
209 ),
210 None => Either::Right(futures::stream::empty()),
211 };
212 let mut lines = futures::stream::select(stderr_lines, stdout_lines);
213 while let Some((level, Ok(line))) = lines.next().await {
214 log::log!(level, "kernel: {}", line);
215 }
216 })
217 .detach();
218
219 let status = process.status();
220
221 let process_status_task = cx.spawn(async move |cx| {
222 let error_message = match status.await {
223 Ok(status) => {
224 if status.success() {
225 log::info!("kernel process exited successfully");
226 return;
227 }
228
229 format!("kernel process exited with status: {:?}", status)
230 }
231 Err(err) => {
232 format!("kernel process exited with error: {:?}", err)
233 }
234 };
235
236 log::error!("{}", error_message);
237
238 session.update(cx, |session, cx| {
239 session.kernel_errored(error_message, cx);
240
241 cx.notify();
242 });
243 });
244
245 anyhow::Ok(Box::new(Self {
246 process,
247 request_tx,
248 stdin_tx,
249 working_directory,
250 _process_status_task: Some(process_status_task),
251 connection_path,
252 execution_state: ExecutionState::Idle,
253 kernel_info: None,
254 }) as Box<dyn RunningKernel>)
255 })
256 }
257}
258
259impl RunningKernel for NativeRunningKernel {
260 fn request_tx(&self) -> mpsc::Sender<JupyterMessage> {
261 self.request_tx.clone()
262 }
263
264 fn stdin_tx(&self) -> mpsc::Sender<JupyterMessage> {
265 self.stdin_tx.clone()
266 }
267
268 fn working_directory(&self) -> &PathBuf {
269 &self.working_directory
270 }
271
272 fn execution_state(&self) -> &ExecutionState {
273 &self.execution_state
274 }
275
276 fn set_execution_state(&mut self, state: ExecutionState) {
277 self.execution_state = state;
278 }
279
280 fn kernel_info(&self) -> Option<&KernelInfoReply> {
281 self.kernel_info.as_ref()
282 }
283
284 fn set_kernel_info(&mut self, info: KernelInfoReply) {
285 self.kernel_info = Some(info);
286 }
287
288 fn force_shutdown(&mut self, _window: &mut Window, _cx: &mut App) -> Task<anyhow::Result<()>> {
289 self.kill();
290 Task::ready(Ok(()))
291 }
292
293 fn kill(&mut self) {
294 self._process_status_task.take();
295 self.request_tx.close_channel();
296 self.stdin_tx.close_channel();
297 self.process.kill().ok();
298 }
299}
300
301impl Drop for NativeRunningKernel {
302 fn drop(&mut self) {
303 std::fs::remove_file(&self.connection_path).ok();
304 self.kill();
305 }
306}
307
308async fn read_kernelspec_at(
309 // Path should be a directory to a jupyter kernelspec, as in
310 // /usr/local/share/jupyter/kernels/python3
311 kernel_dir: PathBuf,
312 fs: &dyn Fs,
313) -> Result<LocalKernelSpecification> {
314 let path = kernel_dir;
315 let kernel_name = if let Some(kernel_name) = path.file_name() {
316 kernel_name.to_string_lossy().into_owned()
317 } else {
318 anyhow::bail!("Invalid kernelspec directory: {path:?}");
319 };
320
321 if !fs.is_dir(path.as_path()).await {
322 anyhow::bail!("Not a directory: {path:?}");
323 }
324
325 let expected_kernel_json = path.join("kernel.json");
326 let spec = fs.load(expected_kernel_json.as_path()).await?;
327 let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
328
329 Ok(LocalKernelSpecification {
330 name: kernel_name,
331 path,
332 kernelspec: spec,
333 })
334}
335
336/// Read a directory of kernelspec directories
337async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> Result<Vec<LocalKernelSpecification>> {
338 let mut kernelspec_dirs = fs.read_dir(&path).await?;
339
340 let mut valid_kernelspecs = Vec::new();
341 while let Some(path) = kernelspec_dirs.next().await {
342 match path {
343 Ok(path) => {
344 if fs.is_dir(path.as_path()).await
345 && let Ok(kernelspec) = read_kernelspec_at(path, fs).await
346 {
347 valid_kernelspecs.push(kernelspec);
348 }
349 }
350 Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
351 }
352 }
353
354 Ok(valid_kernelspecs)
355}
356
357pub async fn local_kernel_specifications(fs: Arc<dyn Fs>) -> Result<Vec<LocalKernelSpecification>> {
358 let mut data_dirs = dirs::data_dirs();
359
360 // Pick up any kernels from conda or conda environment
361 if let Ok(conda_prefix) = env::var("CONDA_PREFIX") {
362 let conda_prefix = PathBuf::from(conda_prefix);
363 let conda_data_dir = conda_prefix.join("share").join("jupyter");
364 data_dirs.push(conda_data_dir);
365 }
366
367 // Search for kernels inside the base python environment
368 let command = util::command::new_command("python")
369 .arg("-c")
370 .arg("import sys; print(sys.prefix)")
371 .output()
372 .await;
373
374 if let Ok(command) = command
375 && command.status.success()
376 {
377 let python_prefix = String::from_utf8(command.stdout);
378 if let Ok(python_prefix) = python_prefix {
379 let python_prefix = PathBuf::from(python_prefix.trim());
380 let python_data_dir = python_prefix.join("share").join("jupyter");
381 data_dirs.push(python_data_dir);
382 }
383 }
384
385 let kernel_dirs = data_dirs
386 .iter()
387 .map(|dir| dir.join("kernels"))
388 .map(|path| read_kernels_dir(path, fs.as_ref()))
389 .collect::<Vec<_>>();
390
391 let kernel_dirs = futures::future::join_all(kernel_dirs).await;
392 let kernel_dirs = kernel_dirs
393 .into_iter()
394 .filter_map(Result::ok)
395 .flatten()
396 .collect::<Vec<_>>();
397
398 Ok(kernel_dirs)
399}
400
401#[cfg(test)]
402mod test {
403 use super::*;
404 use std::path::PathBuf;
405
406 use gpui::TestAppContext;
407 use project::FakeFs;
408 use serde_json::json;
409
410 #[gpui::test]
411 async fn test_get_kernelspecs(cx: &mut TestAppContext) {
412 let fs = FakeFs::new(cx.executor());
413 fs.insert_tree(
414 "/jupyter",
415 json!({
416 ".zed": {
417 "settings.json": r#"{ "tab_size": 8 }"#,
418 "tasks.json": r#"[{
419 "label": "cargo check",
420 "command": "cargo",
421 "args": ["check", "--all"]
422 },]"#,
423 },
424 "kernels": {
425 "python": {
426 "kernel.json": r#"{
427 "display_name": "Python 3",
428 "language": "python",
429 "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
430 "env": {}
431 }"#
432 },
433 "deno": {
434 "kernel.json": r#"{
435 "display_name": "Deno",
436 "language": "typescript",
437 "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
438 "env": {}
439 }"#
440 }
441 },
442 }),
443 )
444 .await;
445
446 let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
447 .await
448 .unwrap();
449
450 kernels.sort_by(|a, b| a.name.cmp(&b.name));
451
452 assert_eq!(
453 kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
454 vec!["deno", "python"]
455 );
456 }
457}