1use anyhow::{Context as _, Result};
2use futures::{
3 channel::mpsc::{self, Receiver},
4 future::Shared,
5 stream::{self, SelectAll, StreamExt},
6 SinkExt as _,
7};
8use gpui::{AppContext, EntityId, Task};
9use project::Fs;
10use runtimelib::{
11 dirs, ConnectionInfo, ExecutionState, JupyterKernelspec, JupyterMessage, JupyterMessageContent,
12 KernelInfoReply,
13};
14use smol::{net::TcpListener, process::Command};
15use std::{
16 fmt::Debug,
17 net::{IpAddr, Ipv4Addr, SocketAddr},
18 path::PathBuf,
19 sync::Arc,
20};
21use ui::{Color, Indicator};
22
23#[derive(Debug, Clone)]
24pub struct KernelSpecification {
25 pub name: String,
26 pub path: PathBuf,
27 pub kernelspec: JupyterKernelspec,
28}
29
30impl KernelSpecification {
31 #[must_use]
32 fn command(&self, connection_path: &PathBuf) -> anyhow::Result<Command> {
33 let argv = &self.kernelspec.argv;
34
35 anyhow::ensure!(!argv.is_empty(), "Empty argv in kernelspec {}", self.name);
36 anyhow::ensure!(argv.len() >= 2, "Invalid argv in kernelspec {}", self.name);
37 anyhow::ensure!(
38 argv.iter().any(|arg| arg == "{connection_file}"),
39 "Missing 'connection_file' in argv in kernelspec {}",
40 self.name
41 );
42
43 let mut cmd = Command::new(&argv[0]);
44
45 for arg in &argv[1..] {
46 if arg == "{connection_file}" {
47 cmd.arg(connection_path);
48 } else {
49 cmd.arg(arg);
50 }
51 }
52
53 if let Some(env) = &self.kernelspec.env {
54 cmd.envs(env);
55 }
56
57 Ok(cmd)
58 }
59}
60
61// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
62// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
63async fn peek_ports(ip: IpAddr) -> anyhow::Result<[u16; 5]> {
64 let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
65 addr_zeroport.set_port(0);
66 let mut ports: [u16; 5] = [0; 5];
67 for i in 0..5 {
68 let listener = TcpListener::bind(addr_zeroport).await?;
69 let addr = listener.local_addr()?;
70 ports[i] = addr.port();
71 }
72 Ok(ports)
73}
74
75#[derive(Debug)]
76pub enum Kernel {
77 RunningKernel(RunningKernel),
78 StartingKernel(Shared<Task<()>>),
79 ErroredLaunch(String),
80 ShuttingDown,
81 Shutdown,
82}
83
84impl Kernel {
85 pub fn dot(&mut self) -> Indicator {
86 match self {
87 Kernel::RunningKernel(kernel) => match kernel.execution_state {
88 ExecutionState::Idle => Indicator::dot().color(Color::Success),
89 ExecutionState::Busy => Indicator::dot().color(Color::Modified),
90 },
91 Kernel::StartingKernel(_) => Indicator::dot().color(Color::Modified),
92 Kernel::ErroredLaunch(_) => Indicator::dot().color(Color::Error),
93 Kernel::ShuttingDown => Indicator::dot().color(Color::Modified),
94 Kernel::Shutdown => Indicator::dot().color(Color::Disabled),
95 }
96 }
97
98 pub fn set_execution_state(&mut self, status: &ExecutionState) {
99 match self {
100 Kernel::RunningKernel(running_kernel) => {
101 running_kernel.execution_state = status.clone();
102 }
103 _ => {}
104 }
105 }
106
107 pub fn set_kernel_info(&mut self, kernel_info: &KernelInfoReply) {
108 match self {
109 Kernel::RunningKernel(running_kernel) => {
110 running_kernel.kernel_info = Some(kernel_info.clone());
111 }
112 _ => {}
113 }
114 }
115}
116
117pub struct RunningKernel {
118 pub process: smol::process::Child,
119 _shell_task: Task<anyhow::Result<()>>,
120 _iopub_task: Task<anyhow::Result<()>>,
121 _control_task: Task<anyhow::Result<()>>,
122 _routing_task: Task<anyhow::Result<()>>,
123 connection_path: PathBuf,
124 pub request_tx: mpsc::Sender<JupyterMessage>,
125 pub execution_state: ExecutionState,
126 pub kernel_info: Option<KernelInfoReply>,
127}
128
129type JupyterMessageChannel = stream::SelectAll<Receiver<JupyterMessage>>;
130
131impl Debug for RunningKernel {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("RunningKernel")
134 .field("process", &self.process)
135 .finish()
136 }
137}
138
139impl RunningKernel {
140 pub fn new(
141 kernel_specification: KernelSpecification,
142 entity_id: EntityId,
143 fs: Arc<dyn Fs>,
144 cx: &mut AppContext,
145 ) -> Task<anyhow::Result<(Self, JupyterMessageChannel)>> {
146 cx.spawn(|cx| async move {
147 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
148 let ports = peek_ports(ip).await?;
149
150 let connection_info = ConnectionInfo {
151 transport: "tcp".to_string(),
152 ip: ip.to_string(),
153 stdin_port: ports[0],
154 control_port: ports[1],
155 hb_port: ports[2],
156 shell_port: ports[3],
157 iopub_port: ports[4],
158 signature_scheme: "hmac-sha256".to_string(),
159 key: uuid::Uuid::new_v4().to_string(),
160 kernel_name: Some(format!("zed-{}", kernel_specification.name)),
161 };
162
163 let connection_path = dirs::runtime_dir().join(format!("kernel-zed-{entity_id}.json"));
164 let content = serde_json::to_string(&connection_info)?;
165 // write out file to disk for kernel
166 fs.atomic_write(connection_path.clone(), content).await?;
167
168 let mut cmd = kernel_specification.command(&connection_path)?;
169 let process = cmd
170 // .stdout(Stdio::null())
171 // .stderr(Stdio::null())
172 .kill_on_drop(true)
173 .spawn()
174 .context("failed to start the kernel process")?;
175
176 let mut iopub_socket = connection_info.create_client_iopub_connection("").await?;
177 let mut shell_socket = connection_info.create_client_shell_connection().await?;
178 let mut control_socket = connection_info.create_client_control_connection().await?;
179
180 let (mut iopub, iosub) = futures::channel::mpsc::channel(100);
181
182 let (request_tx, mut request_rx) =
183 futures::channel::mpsc::channel::<JupyterMessage>(100);
184
185 let (mut control_reply_tx, control_reply_rx) = futures::channel::mpsc::channel(100);
186 let (mut shell_reply_tx, shell_reply_rx) = futures::channel::mpsc::channel(100);
187
188 let mut messages_rx = SelectAll::new();
189 messages_rx.push(iosub);
190 messages_rx.push(control_reply_rx);
191 messages_rx.push(shell_reply_rx);
192
193 let _iopub_task = cx.background_executor().spawn({
194 async move {
195 while let Ok(message) = iopub_socket.read().await {
196 iopub.send(message).await?;
197 }
198 anyhow::Ok(())
199 }
200 });
201
202 let (mut control_request_tx, mut control_request_rx) =
203 futures::channel::mpsc::channel(100);
204 let (mut shell_request_tx, mut shell_request_rx) = futures::channel::mpsc::channel(100);
205
206 let _routing_task = cx.background_executor().spawn({
207 async move {
208 while let Some(message) = request_rx.next().await {
209 match message.content {
210 JupyterMessageContent::DebugRequest(_)
211 | JupyterMessageContent::InterruptRequest(_)
212 | JupyterMessageContent::ShutdownRequest(_) => {
213 control_request_tx.send(message).await?;
214 }
215 _ => {
216 shell_request_tx.send(message).await?;
217 }
218 }
219 }
220 anyhow::Ok(())
221 }
222 });
223
224 let _shell_task = cx.background_executor().spawn({
225 async move {
226 while let Some(message) = shell_request_rx.next().await {
227 shell_socket.send(message).await.ok();
228 let reply = shell_socket.read().await?;
229 shell_reply_tx.send(reply).await?;
230 }
231 anyhow::Ok(())
232 }
233 });
234
235 let _control_task = cx.background_executor().spawn({
236 async move {
237 while let Some(message) = control_request_rx.next().await {
238 control_socket.send(message).await.ok();
239 let reply = control_socket.read().await?;
240 control_reply_tx.send(reply).await?;
241 }
242 anyhow::Ok(())
243 }
244 });
245
246 anyhow::Ok((
247 Self {
248 process,
249 request_tx,
250 _shell_task,
251 _iopub_task,
252 _control_task,
253 _routing_task,
254 connection_path,
255 execution_state: ExecutionState::Busy,
256 kernel_info: None,
257 },
258 messages_rx,
259 ))
260 })
261 }
262}
263
264impl Drop for RunningKernel {
265 fn drop(&mut self) {
266 std::fs::remove_file(&self.connection_path).ok();
267
268 self.request_tx.close_channel();
269 }
270}
271
272async fn read_kernelspec_at(
273 // Path should be a directory to a jupyter kernelspec, as in
274 // /usr/local/share/jupyter/kernels/python3
275 kernel_dir: PathBuf,
276 fs: &dyn Fs,
277) -> anyhow::Result<KernelSpecification> {
278 let path = kernel_dir;
279 let kernel_name = if let Some(kernel_name) = path.file_name() {
280 kernel_name.to_string_lossy().to_string()
281 } else {
282 anyhow::bail!("Invalid kernelspec directory: {path:?}");
283 };
284
285 if !fs.is_dir(path.as_path()).await {
286 anyhow::bail!("Not a directory: {path:?}");
287 }
288
289 let expected_kernel_json = path.join("kernel.json");
290 let spec = fs.load(expected_kernel_json.as_path()).await?;
291 let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
292
293 Ok(KernelSpecification {
294 name: kernel_name,
295 path,
296 kernelspec: spec,
297 })
298}
299
300/// Read a directory of kernelspec directories
301async fn read_kernels_dir(path: PathBuf, fs: &dyn Fs) -> anyhow::Result<Vec<KernelSpecification>> {
302 let mut kernelspec_dirs = fs.read_dir(&path).await?;
303
304 let mut valid_kernelspecs = Vec::new();
305 while let Some(path) = kernelspec_dirs.next().await {
306 match path {
307 Ok(path) => {
308 if fs.is_dir(path.as_path()).await {
309 if let Ok(kernelspec) = read_kernelspec_at(path, fs).await {
310 valid_kernelspecs.push(kernelspec);
311 }
312 }
313 }
314 Err(err) => log::warn!("Error reading kernelspec directory: {err:?}"),
315 }
316 }
317
318 Ok(valid_kernelspecs)
319}
320
321pub async fn kernel_specifications(fs: Arc<dyn Fs>) -> anyhow::Result<Vec<KernelSpecification>> {
322 let data_dirs = dirs::data_dirs();
323 let kernel_dirs = data_dirs
324 .iter()
325 .map(|dir| dir.join("kernels"))
326 .map(|path| read_kernels_dir(path, fs.as_ref()))
327 .collect::<Vec<_>>();
328
329 let kernel_dirs = futures::future::join_all(kernel_dirs).await;
330 let kernel_dirs = kernel_dirs
331 .into_iter()
332 .filter_map(Result::ok)
333 .flatten()
334 .collect::<Vec<_>>();
335
336 Ok(kernel_dirs)
337}
338
339#[cfg(test)]
340mod test {
341 use super::*;
342 use std::path::PathBuf;
343
344 use gpui::TestAppContext;
345 use project::FakeFs;
346 use serde_json::json;
347
348 #[gpui::test]
349 async fn test_get_kernelspecs(cx: &mut TestAppContext) {
350 let fs = FakeFs::new(cx.executor());
351 fs.insert_tree(
352 "/jupyter",
353 json!({
354 ".zed": {
355 "settings.json": r#"{ "tab_size": 8 }"#,
356 "tasks.json": r#"[{
357 "label": "cargo check",
358 "command": "cargo",
359 "args": ["check", "--all"]
360 },]"#,
361 },
362 "kernels": {
363 "python": {
364 "kernel.json": r#"{
365 "display_name": "Python 3",
366 "language": "python",
367 "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
368 "env": {}
369 }"#
370 },
371 "deno": {
372 "kernel.json": r#"{
373 "display_name": "Deno",
374 "language": "typescript",
375 "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
376 "env": {}
377 }"#
378 }
379 },
380 }),
381 )
382 .await;
383
384 let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs.as_ref())
385 .await
386 .unwrap();
387
388 kernels.sort_by(|a, b| a.name.cmp(&b.name));
389
390 assert_eq!(
391 kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
392 vec!["deno", "python"]
393 );
394 }
395}