1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
9 RenderBlock,
10 },
11 scroll::Autoscroll,
12 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
13};
14use futures::{FutureExt as _, StreamExt as _};
15use gpui::{
16 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
17 WeakView,
18};
19use language::Point;
20use project::Fs;
21use runtimelib::{
22 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
23};
24use settings::Settings as _;
25use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
26use theme::{ActiveTheme, ThemeSettings};
27use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, IconButtonShape, Label, Tooltip};
28
29pub struct Session {
30 editor: WeakView<Editor>,
31 pub kernel: Kernel,
32 blocks: HashMap<String, EditorBlock>,
33 messaging_task: Task<()>,
34 pub kernel_specification: KernelSpecification,
35 _buffer_subscription: Subscription,
36}
37
38struct EditorBlock {
39 editor: WeakView<Editor>,
40 code_range: Range<Anchor>,
41 invalidation_anchor: Anchor,
42 block_id: CustomBlockId,
43 execution_view: View<ExecutionView>,
44 on_close: CloseBlockFn,
45}
46
47type CloseBlockFn =
48 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
49
50impl EditorBlock {
51 fn new(
52 editor: WeakView<Editor>,
53 code_range: Range<Anchor>,
54 status: ExecutionStatus,
55 on_close: CloseBlockFn,
56 cx: &mut ViewContext<Session>,
57 ) -> anyhow::Result<Self> {
58 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
59
60 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
61 let buffer = editor.buffer().clone();
62 let buffer_snapshot = buffer.read(cx).snapshot(cx);
63 let end_point = code_range.end.to_point(&buffer_snapshot);
64 let next_row_start = end_point + Point::new(1, 0);
65 if next_row_start > buffer_snapshot.max_point() {
66 buffer.update(cx, |buffer, cx| {
67 buffer.edit(
68 [(
69 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
70 "\n",
71 )],
72 None,
73 cx,
74 )
75 });
76 }
77
78 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
79 let block = BlockProperties {
80 position: code_range.end,
81 height: execution_view.num_lines(cx).saturating_add(1),
82 style: BlockStyle::Sticky,
83 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
84 disposition: BlockDisposition::Below,
85 };
86
87 let block_id = editor.insert_blocks([block], None, cx)[0];
88 (block_id, invalidation_anchor)
89 })?;
90
91 anyhow::Ok(Self {
92 editor,
93 code_range,
94 invalidation_anchor,
95 block_id,
96 execution_view,
97 on_close,
98 })
99 }
100
101 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
102 self.execution_view.update(cx, |execution_view, cx| {
103 execution_view.push_message(&message.content, cx);
104 });
105
106 self.editor
107 .update(cx, |editor, cx| {
108 let mut replacements = HashMap::default();
109
110 replacements.insert(
111 self.block_id,
112 (
113 Some(self.execution_view.num_lines(cx).saturating_add(1)),
114 Self::create_output_area_renderer(
115 self.execution_view.clone(),
116 self.on_close.clone(),
117 ),
118 ),
119 );
120 editor.replace_blocks(replacements, None, cx);
121 })
122 .ok();
123 }
124
125 fn create_output_area_renderer(
126 execution_view: View<ExecutionView>,
127 on_close: CloseBlockFn,
128 ) -> RenderBlock {
129 let render = move |cx: &mut BlockContext| {
130 let execution_view = execution_view.clone();
131 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
132 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
133
134 let gutter = cx.gutter_dimensions;
135 let close_button_size = IconSize::XSmall;
136
137 let block_id = cx.block_id;
138 let on_close = on_close.clone();
139
140 let rem_size = cx.rem_size();
141 let line_height = cx.text_style().line_height_in_pixels(rem_size);
142
143 let (close_button_width, close_button_padding) =
144 close_button_size.square_components(cx);
145
146 div()
147 .min_h(line_height)
148 .flex()
149 .flex_row()
150 .items_start()
151 .w_full()
152 .bg(cx.theme().colors().background)
153 .border_y_1()
154 .border_color(cx.theme().colors().border)
155 .child(
156 v_flex().min_h(cx.line_height()).justify_center().child(
157 h_flex()
158 .w(gutter.full_width())
159 .justify_end()
160 .pt(line_height / 2.)
161 .child(
162 h_flex()
163 .pr(gutter.width / 2. - close_button_width
164 + close_button_padding / 2.)
165 .child(
166 IconButton::new(
167 ("close_output_area", EntityId::from(cx.block_id)),
168 IconName::Close,
169 )
170 .shape(IconButtonShape::Square)
171 .icon_size(close_button_size)
172 .icon_color(Color::Muted)
173 .tooltip(|cx| Tooltip::text("Close output area", cx))
174 .on_click(
175 move |_, cx| {
176 if let BlockId::Custom(block_id) = block_id {
177 (on_close)(block_id, cx)
178 }
179 },
180 ),
181 ),
182 ),
183 ),
184 )
185 .child(
186 div()
187 .flex_1()
188 .size_full()
189 .my_2()
190 .mr(gutter.width)
191 .text_size(text_font_size)
192 .font_family(text_font)
193 .child(execution_view),
194 )
195 .into_any_element()
196 };
197
198 Box::new(render)
199 }
200}
201
202impl Session {
203 pub fn new(
204 editor: WeakView<Editor>,
205 fs: Arc<dyn Fs>,
206 kernel_specification: KernelSpecification,
207 cx: &mut ViewContext<Self>,
208 ) -> Self {
209 let entity_id = editor.entity_id();
210 let working_directory = editor
211 .upgrade()
212 .and_then(|editor| editor.read(cx).working_directory(cx))
213 .unwrap_or_else(temp_dir);
214 let kernel = RunningKernel::new(
215 kernel_specification.clone(),
216 entity_id,
217 working_directory,
218 fs.clone(),
219 cx,
220 );
221
222 let pending_kernel = cx
223 .spawn(|this, mut cx| async move {
224 let kernel = kernel.await;
225
226 match kernel {
227 Ok((mut kernel, mut messages_rx)) => {
228 this.update(&mut cx, |this, cx| {
229 // At this point we can create a new kind of kernel that has the process and our long running background tasks
230
231 let status = kernel.process.status();
232 this.kernel = Kernel::RunningKernel(kernel);
233
234 cx.spawn(|session, mut cx| async move {
235 let error_message = match status.await {
236 Ok(status) => {
237 if status.success() {
238 log::info!("kernel process exited successfully");
239 return;
240 }
241
242 format!("kernel process exited with status: {:?}", status)
243 }
244 Err(err) => {
245 format!("kernel process exited with error: {:?}", err)
246 }
247 };
248
249 log::error!("{}", error_message);
250
251 session
252 .update(&mut cx, |session, cx| {
253 session.kernel =
254 Kernel::ErroredLaunch(error_message.clone());
255
256 session.blocks.values().for_each(|block| {
257 block.execution_view.update(
258 cx,
259 |execution_view, cx| {
260 match execution_view.status {
261 ExecutionStatus::Finished => {
262 // Do nothing when the output was good
263 }
264 _ => {
265 // All other cases, set the status to errored
266 execution_view.status =
267 ExecutionStatus::KernelErrored(
268 error_message.clone(),
269 )
270 }
271 }
272 cx.notify();
273 },
274 );
275 });
276
277 cx.notify();
278 })
279 .ok();
280 })
281 .detach();
282
283 this.messaging_task = cx.spawn(|session, mut cx| async move {
284 while let Some(message) = messages_rx.next().await {
285 session
286 .update(&mut cx, |session, cx| {
287 session.route(&message, cx);
288 })
289 .ok();
290 }
291 });
292 })
293 .ok();
294 }
295 Err(err) => {
296 this.update(&mut cx, |this, _cx| {
297 this.kernel = Kernel::ErroredLaunch(err.to_string());
298 })
299 .ok();
300 }
301 }
302 })
303 .shared();
304
305 let subscription = match editor.upgrade() {
306 Some(editor) => {
307 let buffer = editor.read(cx).buffer().clone();
308 cx.subscribe(&buffer, Self::on_buffer_event)
309 }
310 None => Subscription::new(|| {}),
311 };
312
313 return Self {
314 editor,
315 kernel: Kernel::StartingKernel(pending_kernel),
316 messaging_task: Task::ready(()),
317 blocks: HashMap::default(),
318 kernel_specification,
319 _buffer_subscription: subscription,
320 };
321 }
322
323 fn on_buffer_event(
324 &mut self,
325 buffer: Model<MultiBuffer>,
326 event: &multi_buffer::Event,
327 cx: &mut ViewContext<Self>,
328 ) {
329 if let multi_buffer::Event::Edited { .. } = event {
330 let snapshot = buffer.read(cx).snapshot(cx);
331
332 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
333
334 self.blocks.retain(|_id, block| {
335 if block.invalidation_anchor.is_valid(&snapshot) {
336 true
337 } else {
338 blocks_to_remove.insert(block.block_id);
339 false
340 }
341 });
342
343 if !blocks_to_remove.is_empty() {
344 self.editor
345 .update(cx, |editor, cx| {
346 editor.remove_blocks(blocks_to_remove, None, cx);
347 })
348 .ok();
349 cx.notify();
350 }
351 }
352 }
353
354 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
355 match &mut self.kernel {
356 Kernel::RunningKernel(kernel) => {
357 kernel.request_tx.try_send(message).ok();
358 }
359 _ => {}
360 }
361
362 anyhow::Ok(())
363 }
364
365 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
366 let blocks_to_remove: HashSet<CustomBlockId> =
367 self.blocks.values().map(|block| block.block_id).collect();
368
369 self.editor
370 .update(cx, |editor, cx| {
371 editor.remove_blocks(blocks_to_remove, None, cx);
372 })
373 .ok();
374
375 self.blocks.clear();
376 }
377
378 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
379 let Some(editor) = self.editor.upgrade() else {
380 return;
381 };
382
383 if code.is_empty() {
384 return;
385 }
386
387 let execute_request = ExecuteRequest {
388 code: code.to_string(),
389 ..ExecuteRequest::default()
390 };
391
392 let message: JupyterMessage = execute_request.into();
393
394 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
395
396 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
397
398 self.blocks.retain(|_key, block| {
399 if anchor_range.overlaps(&block.code_range, &buffer) {
400 blocks_to_remove.insert(block.block_id);
401 false
402 } else {
403 true
404 }
405 });
406
407 self.editor
408 .update(cx, |editor, cx| {
409 editor.remove_blocks(blocks_to_remove, None, cx);
410 })
411 .ok();
412
413 let status = match &self.kernel {
414 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
415 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
416 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
417 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
418 Kernel::Shutdown => ExecutionStatus::Shutdown,
419 };
420
421 let parent_message_id = message.header.msg_id.clone();
422 let session_view = cx.view().downgrade();
423 let weak_editor = self.editor.clone();
424
425 let on_close: CloseBlockFn =
426 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
427 if let Some(session) = session_view.upgrade() {
428 session.update(cx, |session, cx| {
429 session.blocks.remove(&parent_message_id);
430 cx.notify();
431 });
432 }
433
434 if let Some(editor) = weak_editor.upgrade() {
435 editor.update(cx, |editor, cx| {
436 let mut block_ids = HashSet::default();
437 block_ids.insert(block_id);
438 editor.remove_blocks(block_ids, None, cx);
439 });
440 }
441 });
442
443 let Ok(editor_block) =
444 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
445 else {
446 return;
447 };
448
449 let new_cursor_pos = editor_block.invalidation_anchor;
450
451 self.blocks
452 .insert(message.header.msg_id.clone(), editor_block);
453
454 match &self.kernel {
455 Kernel::RunningKernel(_) => {
456 self.send(message, cx).ok();
457 }
458 Kernel::StartingKernel(task) => {
459 // Queue up the execution as a task to run after the kernel starts
460 let task = task.clone();
461 let message = message.clone();
462
463 cx.spawn(|this, mut cx| async move {
464 task.await;
465 this.update(&mut cx, |this, cx| {
466 this.send(message, cx).ok();
467 })
468 .ok();
469 })
470 .detach();
471 }
472 _ => {}
473 }
474
475 // Now move the cursor to after the block
476 editor.update(cx, move |editor, cx| {
477 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
478 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
479 });
480 });
481 }
482
483 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
484 let parent_message_id = match message.parent_header.as_ref() {
485 Some(header) => &header.msg_id,
486 None => return,
487 };
488
489 match &message.content {
490 JupyterMessageContent::Status(status) => {
491 self.kernel.set_execution_state(&status.execution_state);
492 cx.notify();
493 }
494 JupyterMessageContent::KernelInfoReply(reply) => {
495 self.kernel.set_kernel_info(&reply);
496 cx.notify();
497 }
498 _ => {}
499 }
500
501 if let Some(block) = self.blocks.get_mut(parent_message_id) {
502 block.handle_message(&message, cx);
503 return;
504 }
505 }
506
507 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
508 match &mut self.kernel {
509 Kernel::RunningKernel(_kernel) => {
510 self.send(InterruptRequest {}.into(), cx).ok();
511 }
512 Kernel::StartingKernel(_task) => {
513 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
514 }
515 _ => {}
516 }
517 }
518
519 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
520 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
521
522 match kernel {
523 Kernel::RunningKernel(mut kernel) => {
524 let mut request_tx = kernel.request_tx.clone();
525
526 cx.spawn(|this, mut cx| async move {
527 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
528 request_tx.try_send(message).ok();
529
530 // Give the kernel a bit of time to clean up
531 cx.background_executor().timer(Duration::from_secs(3)).await;
532
533 kernel.process.kill().ok();
534
535 this.update(&mut cx, |this, cx| {
536 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
537 this.clear_outputs(cx);
538 this.kernel = Kernel::Shutdown;
539 cx.notify();
540 })
541 .ok();
542 })
543 .detach();
544 }
545 Kernel::StartingKernel(_kernel) => {
546 self.kernel = Kernel::Shutdown;
547 }
548 _ => {
549 self.kernel = Kernel::Shutdown;
550 }
551 }
552 cx.notify();
553 }
554}
555
556pub enum SessionEvent {
557 Shutdown(WeakView<Editor>),
558}
559
560impl EventEmitter<SessionEvent> for Session {}
561
562impl Render for Session {
563 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
564 let mut buttons = vec![];
565
566 buttons.push(
567 ButtonLike::new("shutdown")
568 .child(Label::new("Shutdown"))
569 .style(ButtonStyle::Subtle)
570 .on_click(cx.listener(move |session, _, cx| {
571 session.shutdown(cx);
572 })),
573 );
574
575 let status_text = match &self.kernel {
576 Kernel::RunningKernel(kernel) => {
577 buttons.push(
578 ButtonLike::new("interrupt")
579 .child(Label::new("Interrupt"))
580 .style(ButtonStyle::Subtle)
581 .on_click(cx.listener(move |session, _, cx| {
582 session.interrupt(cx);
583 })),
584 );
585 let mut name = self.kernel_specification.name.clone();
586
587 if let Some(info) = &kernel.kernel_info {
588 name.push_str(" (");
589 name.push_str(&info.language_info.name);
590 name.push_str(")");
591 }
592 name
593 }
594 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
595 Kernel::ErroredLaunch(err) => {
596 format!("{} (Error: {})", self.kernel_specification.name, err)
597 }
598 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
599 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
600 };
601
602 return v_flex()
603 .gap_1()
604 .child(
605 h_flex()
606 .gap_2()
607 .child(self.kernel.dot())
608 .child(Label::new(status_text)),
609 )
610 .child(h_flex().gap_2().children(buttons));
611 }
612}