1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
9 },
10 Anchor, AnchorRangeExt as _, Editor,
11};
12use futures::{FutureExt as _, StreamExt as _};
13use gpui::{div, prelude::*, EventEmitter, Render, Task, View, ViewContext, WeakView};
14use project::Fs;
15use runtimelib::{
16 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, KernelInfoRequest,
17 ShutdownRequest,
18};
19use settings::Settings as _;
20use std::{ops::Range, sync::Arc, time::Duration};
21use theme::{ActiveTheme, ThemeSettings};
22use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
23
24pub struct Session {
25 editor: WeakView<Editor>,
26 kernel: Kernel,
27 blocks: HashMap<String, EditorBlock>,
28 messaging_task: Task<()>,
29 kernel_specification: KernelSpecification,
30}
31
32struct EditorBlock {
33 editor: WeakView<Editor>,
34 code_range: Range<Anchor>,
35 block_id: BlockId,
36 execution_view: View<ExecutionView>,
37}
38
39impl EditorBlock {
40 fn new(
41 editor: WeakView<Editor>,
42 code_range: Range<Anchor>,
43 status: ExecutionStatus,
44 cx: &mut ViewContext<Session>,
45 ) -> anyhow::Result<Self> {
46 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
47
48 let block_id = editor.update(cx, |editor, cx| {
49 let block = BlockProperties {
50 position: code_range.end,
51 height: execution_view.num_lines(cx).saturating_add(1),
52 style: BlockStyle::Sticky,
53 render: Self::create_output_area_render(execution_view.clone()),
54 disposition: BlockDisposition::Below,
55 };
56
57 editor.insert_blocks([block], None, cx)[0]
58 })?;
59
60 anyhow::Ok(Self {
61 editor,
62 code_range,
63 block_id,
64 execution_view,
65 })
66 }
67
68 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
69 self.execution_view.update(cx, |execution_view, cx| {
70 execution_view.push_message(&message.content, cx);
71 });
72
73 self.editor
74 .update(cx, |editor, cx| {
75 let mut replacements = HashMap::default();
76 replacements.insert(
77 self.block_id,
78 (
79 Some(self.execution_view.num_lines(cx).saturating_add(1)),
80 Self::create_output_area_render(self.execution_view.clone()),
81 ),
82 );
83 editor.replace_blocks(replacements, None, cx);
84 })
85 .ok();
86 }
87
88 fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
89 let render = move |cx: &mut BlockContext| {
90 let execution_view = execution_view.clone();
91 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
92 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
93 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
94
95 let gutter_width = cx.gutter_dimensions.width;
96
97 h_flex()
98 .w_full()
99 .bg(cx.theme().colors().background)
100 .border_y_1()
101 .border_color(cx.theme().colors().border)
102 .pl(gutter_width)
103 .child(
104 div()
105 .text_size(text_font_size)
106 .font_family(text_font)
107 // .ml(gutter_width)
108 .mx_1()
109 .my_2()
110 .h_full()
111 .w_full()
112 .mr(gutter_width)
113 .child(execution_view),
114 )
115 .into_any_element()
116 };
117
118 Box::new(render)
119 }
120}
121
122impl Session {
123 pub fn new(
124 editor: WeakView<Editor>,
125 fs: Arc<dyn Fs>,
126 kernel_specification: KernelSpecification,
127 cx: &mut ViewContext<Self>,
128 ) -> Self {
129 let entity_id = editor.entity_id();
130 let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
131
132 let pending_kernel = cx
133 .spawn(|this, mut cx| async move {
134 let kernel = kernel.await;
135
136 match kernel {
137 Ok((kernel, mut messages_rx)) => {
138 this.update(&mut cx, |this, cx| {
139 // At this point we can create a new kind of kernel that has the process and our long running background tasks
140 this.kernel = Kernel::RunningKernel(kernel);
141
142 this.messaging_task = cx.spawn(|session, mut cx| async move {
143 while let Some(message) = messages_rx.next().await {
144 session
145 .update(&mut cx, |session, cx| {
146 session.route(&message, cx);
147 })
148 .ok();
149 }
150 });
151
152 // For some reason sending a kernel info request will brick the ark (R) kernel.
153 // Note that Deno and Python do not have this issue.
154 if this.kernel_specification.name == "ark" {
155 return;
156 }
157
158 // Get kernel info after (possibly) letting the kernel start
159 cx.spawn(|this, mut cx| async move {
160 cx.background_executor()
161 .timer(Duration::from_millis(120))
162 .await;
163 this.update(&mut cx, |this, _cx| {
164 this.send(KernelInfoRequest {}.into(), _cx).ok();
165 })
166 .ok();
167 })
168 .detach();
169 })
170 .ok();
171 }
172 Err(err) => {
173 this.update(&mut cx, |this, _cx| {
174 this.kernel = Kernel::ErroredLaunch(err.to_string());
175 })
176 .ok();
177 }
178 }
179 })
180 .shared();
181
182 return Self {
183 editor,
184 kernel: Kernel::StartingKernel(pending_kernel),
185 messaging_task: Task::ready(()),
186 blocks: HashMap::default(),
187 kernel_specification,
188 };
189 }
190
191 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
192 match &mut self.kernel {
193 Kernel::RunningKernel(kernel) => {
194 kernel.request_tx.try_send(message).ok();
195 }
196 _ => {}
197 }
198
199 anyhow::Ok(())
200 }
201
202 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
203 let blocks_to_remove: HashSet<BlockId> =
204 self.blocks.values().map(|block| block.block_id).collect();
205
206 self.editor
207 .update(cx, |editor, cx| {
208 editor.remove_blocks(blocks_to_remove, None, cx);
209 })
210 .ok();
211
212 self.blocks.clear();
213 }
214
215 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
216 let editor = if let Some(editor) = self.editor.upgrade() {
217 editor
218 } else {
219 return;
220 };
221
222 let execute_request = ExecuteRequest {
223 code: code.to_string(),
224 ..ExecuteRequest::default()
225 };
226
227 let message: JupyterMessage = execute_request.into();
228
229 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
230
231 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
232
233 self.blocks.retain(|_key, block| {
234 if anchor_range.overlaps(&block.code_range, &buffer) {
235 blocks_to_remove.insert(block.block_id);
236 false
237 } else {
238 true
239 }
240 });
241
242 self.editor
243 .update(cx, |editor, cx| {
244 editor.remove_blocks(blocks_to_remove, None, cx);
245 })
246 .ok();
247
248 let status = match &self.kernel {
249 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
250 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
251 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
252 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
253 Kernel::Shutdown => ExecutionStatus::Shutdown,
254 };
255
256 let editor_block = if let Ok(editor_block) =
257 EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
258 {
259 editor_block
260 } else {
261 return;
262 };
263
264 self.blocks
265 .insert(message.header.msg_id.clone(), editor_block);
266
267 match &self.kernel {
268 Kernel::RunningKernel(_) => {
269 self.send(message, cx).ok();
270 }
271 Kernel::StartingKernel(task) => {
272 // Queue up the execution as a task to run after the kernel starts
273 let task = task.clone();
274 let message = message.clone();
275
276 cx.spawn(|this, mut cx| async move {
277 task.await;
278 this.update(&mut cx, |this, cx| {
279 this.send(message, cx).ok();
280 })
281 .ok();
282 })
283 .detach();
284 }
285 _ => {}
286 }
287 }
288
289 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
290 let parent_message_id = match message.parent_header.as_ref() {
291 Some(header) => &header.msg_id,
292 None => return,
293 };
294
295 match &message.content {
296 JupyterMessageContent::Status(status) => {
297 self.kernel.set_execution_state(&status.execution_state);
298 cx.notify();
299 }
300 JupyterMessageContent::KernelInfoReply(reply) => {
301 self.kernel.set_kernel_info(&reply);
302 cx.notify();
303 }
304 _ => {}
305 }
306
307 if let Some(block) = self.blocks.get_mut(parent_message_id) {
308 block.handle_message(&message, cx);
309 return;
310 }
311 }
312
313 fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
314 match &mut self.kernel {
315 Kernel::RunningKernel(_kernel) => {
316 self.send(InterruptRequest {}.into(), cx).ok();
317 }
318 Kernel::StartingKernel(_task) => {
319 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
320 }
321 _ => {}
322 }
323 }
324
325 fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
326 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
327
328 match kernel {
329 Kernel::RunningKernel(mut kernel) => {
330 let mut request_tx = kernel.request_tx.clone();
331
332 cx.spawn(|this, mut cx| async move {
333 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
334 request_tx.try_send(message).ok();
335
336 // Give the kernel a bit of time to clean up
337 cx.background_executor().timer(Duration::from_secs(3)).await;
338
339 kernel.process.kill().ok();
340
341 this.update(&mut cx, |this, cx| {
342 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
343 this.clear_outputs(cx);
344 this.kernel = Kernel::Shutdown;
345 cx.notify();
346 })
347 .ok();
348 })
349 .detach();
350 }
351 Kernel::StartingKernel(_kernel) => {
352 self.kernel = Kernel::Shutdown;
353 }
354 _ => {
355 self.kernel = Kernel::Shutdown;
356 }
357 }
358 cx.notify();
359 }
360}
361
362pub enum SessionEvent {
363 Shutdown(WeakView<Editor>),
364}
365
366impl EventEmitter<SessionEvent> for Session {}
367
368impl Render for Session {
369 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
370 let mut buttons = vec![];
371
372 buttons.push(
373 ButtonLike::new("shutdown")
374 .child(Label::new("Shutdown"))
375 .style(ButtonStyle::Subtle)
376 .on_click(cx.listener(move |session, _, cx| {
377 session.shutdown(cx);
378 })),
379 );
380
381 let status_text = match &self.kernel {
382 Kernel::RunningKernel(kernel) => {
383 buttons.push(
384 ButtonLike::new("interrupt")
385 .child(Label::new("Interrupt"))
386 .style(ButtonStyle::Subtle)
387 .on_click(cx.listener(move |session, _, cx| {
388 session.interrupt(cx);
389 })),
390 );
391 let mut name = self.kernel_specification.name.clone();
392
393 if let Some(info) = &kernel.kernel_info {
394 name.push_str(" (");
395 name.push_str(&info.language_info.name);
396 name.push_str(")");
397 }
398 name
399 }
400 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
401 Kernel::ErroredLaunch(err) => {
402 format!("{} (Error: {})", self.kernel_specification.name, err)
403 }
404 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
405 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
406 };
407
408 return v_flex()
409 .gap_1()
410 .child(
411 h_flex()
412 .gap_2()
413 .child(self.kernel.dot())
414 .child(Label::new(status_text)),
415 )
416 .child(h_flex().gap_2().children(buttons));
417 }
418}