1use anyhow::{Context as _, Result};
2use futures::{
3 AsyncBufReadExt as _, StreamExt as _,
4 channel::mpsc::{self},
5 io::BufReader,
6};
7use gpui::{App, Entity, EntityId, Task, Window};
8use jupyter_protocol::{
9 ExecutionState, JupyterKernelspec, JupyterMessage, KernelInfoReply,
10 connection_info::{ConnectionInfo, Transport},
11};
12use project::Fs;
13use runtimelib::dirs;
14use smol::net::TcpListener;
15use std::{
16 env,
17 fmt::Debug,
18 net::{IpAddr, Ipv4Addr, SocketAddr},
19 path::PathBuf,
20 sync::Arc,
21};
22
23use uuid::Uuid;
24
25use super::{KernelSession, RunningKernel, start_kernel_tasks};
26
27#[derive(Debug, Clone)]
28pub struct LocalKernelSpecification {
29 pub name: String,
30 pub path: PathBuf,
31 pub kernelspec: JupyterKernelspec,
32}
33
34impl PartialEq for LocalKernelSpecification {
35 fn eq(&self, other: &Self) -> bool {
36 self.name == other.name && self.path == other.path
37 }
38}
39
40impl Eq for LocalKernelSpecification {}
41
42impl LocalKernelSpecification {
43 #[must_use]
44 fn command(&self, connection_path: &PathBuf) -> Result<std::process::Command> {
45 let argv = &self.kernelspec.argv;
46
47 anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
48 anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
49 anyhow::ensure!(
50 argv.iter().any(|arg| arg == "{connection_file}"),
51 "Missing 'connection_file' in argv in kernelspec {}",
52 self.name
53 );
54
55 let mut cmd = util::command::new_std_command(&argv[0]);
56
57 for arg in &argv[1..] {
58 if arg == "{connection_file}" {
59 cmd.arg(connection_path);
60 } else {
61 cmd.arg(arg);
62 }
63 }
64
65 if let Some(env) = &self.kernelspec.env {
66 log::info!(
67 "LocalKernelSpecification: applying env to command: {:?}",
68 env.keys()
69 );
70 cmd.envs(env);
71 } else {
72 log::info!("LocalKernelSpecification: no env in kernelspec");
73 }
74
75 Ok(cmd)
76 }
77}
78
79// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
80// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
81async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> {
82 let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
83 addr_zeroport.set_port(0);
84 let mut ports: [u16; 5] = [0; 5];
85 for i in 0..5 {
86 let listener = TcpListener::bind(addr_zeroport).await?;
87 let addr = listener.local_addr()?;
88 ports[i] = addr.port();
89 }
90 Ok(ports)
91}
92
93pub struct NativeRunningKernel {
94 pub process: util::process::Child,
95 connection_path: PathBuf,
96 _process_status_task: Option<Task<()>>,
97 pub working_directory: PathBuf,
98 pub request_tx: mpsc::Sender<JupyterMessage>,
99 pub stdin_tx: mpsc::Sender<JupyterMessage>,
100 pub execution_state: ExecutionState,
101 pub kernel_info: Option<KernelInfoReply>,
102}
103
104impl Debug for NativeRunningKernel {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("RunningKernel")
107 .field("process", &*self.process)
108 .finish()
109 }
110}
111
112impl NativeRunningKernel {
113 pub fn new<S: KernelSession + 'static>(
114 kernel_specification: LocalKernelSpecification,
115 entity_id: EntityId,
116 working_directory: PathBuf,
117 fs: Arc<dyn Fs>,
118 // todo: convert to weak view
119 session: Entity<S>,
120 window: &mut Window,
121 cx: &mut App,
122 ) -> Task<Result<Box<dyn RunningKernel>>> {
123 window.spawn(cx, async move |cx| {
124 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
125 let ports = peek_ports(ip).await?;
126
127 let connection_info = ConnectionInfo {
128 transport: Transport::TCP,
129 ip: ip.to_string(),
130 stdin_port: ports[0],
131 control_port: ports[1],
132 hb_port: ports[2],
133 shell_port: ports[3],
134 iopub_port: ports[4],
135 signature_scheme: "hmac-sha256".to_string(),
136 key: uuid::Uuid::new_v4().to_string(),
137 kernel_name: Some(format!("zed-{}", kernel_specification.name)),
138 };
139
140 let runtime_dir = dirs::runtime_dir();
141 fs.create_dir(&runtime_dir)
142 .await
143 .with_context(|| format!("Failed to create jupyter runtime dir {runtime_dir:?}"))?;
144 let connection_path = runtime_dir.join(format!("kernel-zed-{entity_id}.json"));
145 let content = serde_json::to_string(&connection_info)?;
146 fs.atomic_write(connection_path.clone(), content).await?;
147
148 let mut cmd = kernel_specification.command(&connection_path)?;
149 cmd.current_dir(&working_directory);
150
151 let mut process = util::process::Child::spawn(
152 cmd,
153 std::process::Stdio::piped(),
154 std::process::Stdio::piped(),
155 std::process::Stdio::piped(),
156 )?;
157
158 let session_id = Uuid::new_v4().to_string();
159
160 let iopub_socket =
161 runtimelib::create_client_iopub_connection(&connection_info, "", &session_id)
162 .await?;
163 let control_socket =
164 runtimelib::create_client_control_connection(&connection_info, &session_id).await?;
165
166 let peer_identity = runtimelib::peer_identity_for_session(&session_id)?;
167 let shell_socket = runtimelib::create_client_shell_connection_with_identity(
168 &connection_info,
169 &session_id,
170 peer_identity.clone(),
171 )
172 .await?;
173 let stdin_socket = runtimelib::create_client_stdin_connection_with_identity(
174 &connection_info,
175 &session_id,
176 peer_identity,
177 )
178 .await?;
179
180 let (request_tx, stdin_tx) = start_kernel_tasks(
181 session.clone(),
182 iopub_socket,
183 shell_socket,
184 control_socket,
185 stdin_socket,
186 cx,
187 );
188
189 let stderr = process.stderr.take();
190 let stdout = process.stdout.take();
191
192 cx.spawn(async move |_cx| {
193 use futures::future::Either;
194
195 let stderr_lines = match stderr {
196 Some(s) => Either::Left(
197 BufReader::new(s)
198 .lines()
199 .map(|line| (log::Level::Error, line)),
200 ),
201 None => Either::Right(futures::stream::empty()),
202 };
203 let stdout_lines = match stdout {
204 Some(s) => Either::Left(
205 BufReader::new(s)
206 .lines()
207 .map(|line| (log::Level::Info, line)),
208 ),
209 None => Either::Right(futures::stream::empty()),
210 };
211 let mut lines = futures::stream::select(stderr_lines, stdout_lines);
212 while let Some((level, Ok(line))) = lines.next().await {
213 log::log!(level, "kernel: {}", line);
214 }
215 })
216 .detach();
217
218 let status = process.status();
219
220 let process_status_task = cx.spawn(async move |cx| {
221 let error_message = match status.await {
222 Ok(status) => {
223 if status.success() {
224 log::info!("kernel process exited successfully");
225 return;
226 }
227
228 format!("kernel process exited with status: {:?}", status)
229 }
230 Err(err) => {
231 format!("kernel process exited with error: {:?}", err)
232 }
233 };
234
235 log::error!("{}", error_message);
236
237 session.update(cx, |session, cx| {
238 session.kernel_errored(error_message, cx);
239
240 cx.notify();
241 });
242 });
243
244 anyhow::Ok(Box::new(Self {
245 process,
246 request_tx,
247 stdin_tx,
248 working_directory,
249 _process_status_task: Some(process_status_task),
250 connection_path,
251 execution_state: ExecutionState::Idle,
252 kernel_info: None,
253 }) as Box<dyn RunningKernel>)
254 })
255 }
256}
257
258impl RunningKernel for NativeRunningKernel {
259 fn request_tx(&self) -> mpsc::Sender<JupyterMessage> {
260 self.request_tx.clone()
261 }
262
263 fn stdin_tx(&self) -> mpsc::Sender<JupyterMessage> {
264 self.stdin_tx.clone()
265 }
266
267 fn working_directory(&self) -> &PathBuf {
268 &self.working_directory
269 }
270
271 fn execution_state(&self) -> &ExecutionState {
272 &self.execution_state
273 }
274
275 fn set_execution_state(&mut self, state: ExecutionState) {
276 self.execution_state = state;
277 }
278
279 fn kernel_info(&self) -> Option<&KernelInfoReply> {
280 self.kernel_info.as_ref()
281 }
282
283 fn set_kernel_info(&mut self, info: KernelInfoReply) {
284 self.kernel_info = Some(info);
285 }
286
287 fn force_shutdown(&mut self, _window: &mut Window, _cx: &mut App) -> Task<anyhow::Result<()>> {
288 self.kill();
289 Task::ready(Ok(()))
290 }
291
292 fn kill(&mut self) {
293 self._process_status_task.take();
294 self.request_tx.close_channel();
295 self.stdin_tx.close_channel();
296 self.process.kill().ok();
297 }
298}
299
300impl Drop for NativeRunningKernel {
301 fn drop(&mut self) {
302 std::fs::remove_file(&self.connection_path).ok();
303 self.kill();
304 }
305}
306
307async fn read_kernelspec_at(
308 // Path should be a directory to a jupyter kernelspec, as in
309 // /usr/local/share/jupyter/kernels/python3
310 kernel_dir: PathBuf,
311 fs: &dyn Fs,
312) -> Result<LocalKernelSpecification> {
313 let path = kernel_dir;
314 let kernel_name = if let Some(kernel_name) = path.file_name() {
315 kernel_name.to_string_lossy().into_owned()
316 } else {
317 anyhow::bail!("Invalid kernelspec directory: {path:?}");
318 };
319
320 if !fs.is_dir(path.as_path()).await {
321 anyhow::bail!("Not a directory: {path:?}");
322 }
323
324 let expected_kernel_json = path.join("kernel.json");
325 let spec = fs.load(expected_kernel_json.as_path()).await?;
326 let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
327
328 Ok(LocalKernelSpecification {
329 name: kernel_name,
330 path,
331 kernelspec: spec,
332 })
333}
334
335/// Read a directory of kernelspec directories
336async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> Result<Vec<LocalKernelSpecification>> {
337 let mut kernelspec_dirs = fs.read_dir(&path).await?;
338
339 let mut valid_kernelspecs = Vec::new();
340 while let Some(path) = kernelspec_dirs.next().await {
341 match path {
342 Ok(path) => {
343 if fs.is_dir(path.as_path()).await
344 && let Ok(kernelspec) = read_kernelspec_at(path, fs).await
345 {
346 valid_kernelspecs.push(kernelspec);
347 }
348 }
349 Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
350 }
351 }
352
353 Ok(valid_kernelspecs)
354}
355
356pub async fn local_kernel_specifications(fs: Arc<dyn Fs>) -> Result<Vec<LocalKernelSpecification>> {
357 let mut data_dirs = dirs::data_dirs();
358
359 // Pick up any kernels from conda or conda environment
360 if let Ok(conda_prefix) = env::var("CONDA_PREFIX") {
361 let conda_prefix = PathBuf::from(conda_prefix);
362 let conda_data_dir = conda_prefix.join("share").join("jupyter");
363 data_dirs.push(conda_data_dir);
364 }
365
366 // Search for kernels inside the base python environment
367 let command = util::command::new_command("python")
368 .arg("-c")
369 .arg("import sys; print(sys.prefix)")
370 .output()
371 .await;
372
373 if let Ok(command) = command
374 && command.status.success()
375 {
376 let python_prefix = String::from_utf8(command.stdout);
377 if let Ok(python_prefix) = python_prefix {
378 let python_prefix = PathBuf::from(python_prefix.trim());
379 let python_data_dir = python_prefix.join("share").join("jupyter");
380 data_dirs.push(python_data_dir);
381 }
382 }
383
384 let kernel_dirs = data_dirs
385 .iter()
386 .map(|dir| dir.join("kernels"))
387 .map(|path| read_kernels_dir(path, fs.as_ref()))
388 .collect::<Vec<_>>();
389
390 let kernel_dirs = futures::future::join_all(kernel_dirs).await;
391 let kernel_dirs = kernel_dirs
392 .into_iter()
393 .filter_map(Result::ok)
394 .flatten()
395 .collect::<Vec<_>>();
396
397 Ok(kernel_dirs)
398}
399
400#[cfg(test)]
401mod test {
402 use super::*;
403 use std::path::PathBuf;
404
405 use gpui::TestAppContext;
406 use project::FakeFs;
407 use serde_json::json;
408
409 #[gpui::test]
410 async fn test_get_kernelspecs(cx: &mut TestAppContext) {
411 let fs = FakeFs::new(cx.executor());
412 fs.insert_tree(
413 "/jupyter",
414 json!({
415 ".zed": {
416 "settings.json": r#"{ "tab_size": 8 }"#,
417 "tasks.json": r#"[{
418 "label": "cargo check",
419 "command": "cargo",
420 "args": ["check", "--all"]
421 },]"#,
422 },
423 "kernels": {
424 "python": {
425 "kernel.json": r#"{
426 "display_name": "Python 3",
427 "language": "python",
428 "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
429 "env": {}
430 }"#
431 },
432 "deno": {
433 "kernel.json": r#"{
434 "display_name": "Deno",
435 "language": "typescript",
436 "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
437 "env": {}
438 }"#
439 }
440 },
441 }),
442 )
443 .await;
444
445 let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
446 .await
447 .unwrap();
448
449 kernels.sort_by(|a, b| a.name.cmp(&b.name));
450
451 assert_eq!(
452 kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
453 vec!["deno", "python"]
454 );
455 }
456}