1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use futures::lock::Mutex;
4use futures::{channel::mpsc, SinkExt as _, StreamExt as _};
5use gpui::{AsyncAppContext, EntityId};
6use project::Fs;
7use runtimelib::{dirs, ConnectionInfo, JupyterKernelspec, JupyterMessage, JupyterMessageContent};
8use smol::{net::TcpListener, process::Command};
9use std::fmt::Debug;
10use std::net::{IpAddr, Ipv4Addr, SocketAddr};
11use std::{path::PathBuf, sync::Arc};
12
13#[derive(Debug)]
14pub struct Request {
15 pub request: runtimelib::JupyterMessageContent,
16 pub responses_rx: mpsc::UnboundedSender<JupyterMessageContent>,
17}
18
19#[derive(Debug, Clone)]
20pub struct RuntimeSpecification {
21 pub name: String,
22 pub path: PathBuf,
23 pub kernelspec: JupyterKernelspec,
24}
25
26impl RuntimeSpecification {
27 #[must_use]
28 fn command(&self, connection_path: &PathBuf) -> Result<Command> {
29 let argv = &self.kernelspec.argv;
30
31 if argv.is_empty() {
32 return Err(anyhow::anyhow!("Empty argv in kernelspec {}", self.name));
33 }
34
35 if argv.len() < 2 {
36 return Err(anyhow::anyhow!("Invalid argv in kernelspec {}", self.name));
37 }
38
39 if !argv.contains(&"{connection_file}".to_string()) {
40 return Err(anyhow::anyhow!(
41 "Missing 'connection_file' in argv in kernelspec {}",
42 self.name
43 ));
44 }
45
46 let mut cmd = Command::new(&argv[0]);
47
48 for arg in &argv[1..] {
49 if arg == "{connection_file}" {
50 cmd.arg(connection_path);
51 } else {
52 cmd.arg(arg);
53 }
54 }
55
56 if let Some(env) = &self.kernelspec.env {
57 cmd.envs(env);
58 }
59
60 Ok(cmd)
61 }
62}
63
64// Find a set of open ports. This creates a listener with port set to 0. The listener will be closed at the end when it goes out of scope.
65// There's a race condition between closing the ports and usage by a kernel, but it's inherent to the Jupyter protocol.
66async fn peek_ports(ip: IpAddr) -> anyhow::Result<[u16; 5]> {
67 let mut addr_zeroport: SocketAddr = SocketAddr::new(ip, 0);
68 addr_zeroport.set_port(0);
69 let mut ports: [u16; 5] = [0; 5];
70 for i in 0..5 {
71 let listener = TcpListener::bind(addr_zeroport).await?;
72 let addr = listener.local_addr()?;
73 ports[i] = addr.port();
74 }
75 Ok(ports)
76}
77
78#[derive(Debug)]
79pub struct RunningKernel {
80 #[allow(unused)]
81 runtime: RuntimeSpecification,
82 #[allow(unused)]
83 process: smol::process::Child,
84 pub request_tx: mpsc::UnboundedSender<Request>,
85}
86
87impl RunningKernel {
88 pub async fn new(
89 runtime: RuntimeSpecification,
90 entity_id: EntityId,
91 fs: Arc<dyn Fs>,
92 cx: AsyncAppContext,
93 ) -> anyhow::Result<Self> {
94 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
95 let ports = peek_ports(ip).await?;
96
97 let connection_info = ConnectionInfo {
98 transport: "tcp".to_string(),
99 ip: ip.to_string(),
100 stdin_port: ports[0],
101 control_port: ports[1],
102 hb_port: ports[2],
103 shell_port: ports[3],
104 iopub_port: ports[4],
105 signature_scheme: "hmac-sha256".to_string(),
106 key: uuid::Uuid::new_v4().to_string(),
107 kernel_name: Some(format!("zed-{}", runtime.name)),
108 };
109
110 let connection_path = dirs::runtime_dir().join(format!("kernel-zed-{}.json", entity_id));
111 let content = serde_json::to_string(&connection_info)?;
112 // write out file to disk for kernel
113 fs.atomic_write(connection_path.clone(), content).await?;
114
115 let mut cmd = runtime.command(&connection_path)?;
116 let process = cmd
117 // .stdout(Stdio::null())
118 // .stderr(Stdio::null())
119 .kill_on_drop(true)
120 .spawn()
121 .context("failed to start the kernel process")?;
122
123 let mut iopub = connection_info.create_client_iopub_connection("").await?;
124 let mut shell = connection_info.create_client_shell_connection().await?;
125
126 // Spawn a background task to handle incoming messages from the kernel as well
127 // as outgoing messages to the kernel
128
129 let child_messages: Arc<
130 Mutex<HashMap<String, mpsc::UnboundedSender<JupyterMessageContent>>>,
131 > = Default::default();
132
133 let (request_tx, mut request_rx) = mpsc::unbounded::<Request>();
134
135 cx.background_executor()
136 .spawn({
137 let child_messages = child_messages.clone();
138
139 async move {
140 let child_messages = child_messages.clone();
141 while let Ok(message) = iopub.read().await {
142 if let Some(parent_header) = message.parent_header {
143 let child_messages = child_messages.lock().await;
144
145 let sender = child_messages.get(&parent_header.msg_id);
146
147 match sender {
148 Some(mut sender) => {
149 sender.send(message.content).await?;
150 }
151 None => {}
152 }
153 }
154 }
155
156 anyhow::Ok(())
157 }
158 })
159 .detach();
160
161 cx.background_executor()
162 .spawn({
163 let child_messages = child_messages.clone();
164 async move {
165 while let Some(request) = request_rx.next().await {
166 let rx = request.responses_rx.clone();
167
168 let request: JupyterMessage = request.request.into();
169 let msg_id = request.header.msg_id.clone();
170
171 let mut sender = rx.clone();
172
173 child_messages
174 .lock()
175 .await
176 .insert(msg_id.clone(), sender.clone());
177
178 shell.send(request).await?;
179
180 let response = shell.read().await?;
181
182 sender.send(response.content).await?;
183 }
184
185 anyhow::Ok(())
186 }
187 })
188 .detach();
189
190 Ok(Self {
191 runtime,
192 process,
193 request_tx,
194 })
195 }
196}
197
198async fn read_kernelspec_at(
199 // Path should be a directory to a jupyter kernelspec, as in
200 // /usr/local/share/jupyter/kernels/python3
201 kernel_dir: PathBuf,
202 fs: Arc<dyn Fs>,
203) -> anyhow::Result<RuntimeSpecification> {
204 let path = kernel_dir;
205 let kernel_name = if let Some(kernel_name) = path.file_name() {
206 kernel_name.to_string_lossy().to_string()
207 } else {
208 return Err(anyhow::anyhow!("Invalid kernelspec directory: {:?}", path));
209 };
210
211 if !fs.is_dir(path.as_path()).await {
212 return Err(anyhow::anyhow!("Not a directory: {:?}", path));
213 }
214
215 let expected_kernel_json = path.join("kernel.json");
216 let spec = fs.load(expected_kernel_json.as_path()).await?;
217 let spec = serde_json::from_str::<JupyterKernelspec>(&spec)?;
218
219 Ok(RuntimeSpecification {
220 name: kernel_name,
221 path,
222 kernelspec: spec,
223 })
224}
225
226/// Read a directory of kernelspec directories
227async fn read_kernels_dir(
228 path: PathBuf,
229 fs: Arc<dyn Fs>,
230) -> anyhow::Result<Vec<RuntimeSpecification>> {
231 let mut kernelspec_dirs = fs.read_dir(&path).await?;
232
233 let mut valid_kernelspecs = Vec::new();
234 while let Some(path) = kernelspec_dirs.next().await {
235 match path {
236 Ok(path) => {
237 if fs.is_dir(path.as_path()).await {
238 let fs = fs.clone();
239 if let Ok(kernelspec) = read_kernelspec_at(path, fs).await {
240 valid_kernelspecs.push(kernelspec);
241 }
242 }
243 }
244 Err(err) => {
245 log::warn!("Error reading kernelspec directory: {:?}", err);
246 }
247 }
248 }
249
250 Ok(valid_kernelspecs)
251}
252
253pub async fn get_runtime_specifications(
254 fs: Arc<dyn Fs>,
255) -> anyhow::Result<Vec<RuntimeSpecification>> {
256 let data_dirs = dirs::data_dirs();
257 let kernel_dirs = data_dirs
258 .iter()
259 .map(|dir| dir.join("kernels"))
260 .map(|path| read_kernels_dir(path, fs.clone()))
261 .collect::<Vec<_>>();
262
263 let kernel_dirs = futures::future::join_all(kernel_dirs).await;
264 let kernel_dirs = kernel_dirs
265 .into_iter()
266 .filter_map(Result::ok)
267 .flatten()
268 .collect::<Vec<_>>();
269
270 Ok(kernel_dirs)
271}
272
273#[cfg(test)]
274mod test {
275 use super::*;
276 use std::path::PathBuf;
277
278 use gpui::TestAppContext;
279 use project::FakeFs;
280 use serde_json::json;
281
282 #[gpui::test]
283 async fn test_get_kernelspecs(cx: &mut TestAppContext) {
284 let fs = FakeFs::new(cx.executor());
285 fs.insert_tree(
286 "/jupyter",
287 json!({
288 ".zed": {
289 "settings.json": r#"{ "tab_size": 8 }"#,
290 "tasks.json": r#"[{
291 "label": "cargo check",
292 "command": "cargo",
293 "args": ["check", "--all"]
294 },]"#,
295 },
296 "kernels": {
297 "python": {
298 "kernel.json": r#"{
299 "display_name": "Python 3",
300 "language": "python",
301 "argv": ["python3", "-m", "ipykernel_launcher", "-f", "{connection_file}"],
302 "env": {}
303 }"#
304 },
305 "deno": {
306 "kernel.json": r#"{
307 "display_name": "Deno",
308 "language": "typescript",
309 "argv": ["deno", "run", "--unstable", "--allow-net", "--allow-read", "https://deno.land/std/http/file_server.ts", "{connection_file}"],
310 "env": {}
311 }"#
312 }
313 },
314 }),
315 )
316 .await;
317
318 let mut kernels = read_kernels_dir(PathBuf::from("/jupyter/kernels"), fs)
319 .await
320 .unwrap();
321
322 kernels.sort_by(|a, b| a.name.cmp(&b.name));
323
324 assert_eq!(
325 kernels.iter().map(|c| c.name.clone()).collect::<Vec<_>>(),
326 vec!["deno", "python"]
327 );
328 }
329}