1use crate::components::KernelListItem;
2use crate::setup_editor_session_actions;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6 KernelStatus,
7};
8use client::telemetry::Telemetry;
9use collections::{HashMap, HashSet};
10use editor::{
11 display_map::{
12 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
13 RenderBlock,
14 },
15 scroll::Autoscroll,
16 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
17};
18use futures::io::BufReader;
19use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
20use gpui::{
21 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Option<Task<()>>,
39 process_status_task: Option<Task<()>>,
40 pub kernel_specification: KernelSpecification,
41 telemetry: Arc<Telemetry>,
42 _buffer_subscription: Subscription,
43}
44
45struct EditorBlock {
46 code_range: Range<Anchor>,
47 invalidation_anchor: Anchor,
48 block_id: CustomBlockId,
49 execution_view: View<ExecutionView>,
50}
51
52type CloseBlockFn =
53 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
54
55impl EditorBlock {
56 fn new(
57 editor: WeakView<Editor>,
58 code_range: Range<Anchor>,
59 status: ExecutionStatus,
60 on_close: CloseBlockFn,
61 cx: &mut ViewContext<Session>,
62 ) -> anyhow::Result<Self> {
63 let editor = editor
64 .upgrade()
65 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
66 let workspace = editor
67 .read(cx)
68 .workspace()
69 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
70
71 let execution_view =
72 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
73
74 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
75 let buffer = editor.buffer().clone();
76 let buffer_snapshot = buffer.read(cx).snapshot(cx);
77 let end_point = code_range.end.to_point(&buffer_snapshot);
78 let next_row_start = end_point + Point::new(1, 0);
79 if next_row_start > buffer_snapshot.max_point() {
80 buffer.update(cx, |buffer, cx| {
81 buffer.edit(
82 [(
83 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
84 "\n",
85 )],
86 None,
87 cx,
88 )
89 });
90 }
91
92 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
93 let block = BlockProperties {
94 placement: BlockPlacement::Below(code_range.end),
95 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
96 height: 1,
97 style: BlockStyle::Sticky,
98 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
99 priority: 0,
100 };
101
102 let block_id = editor.insert_blocks([block], None, cx)[0];
103 (block_id, invalidation_anchor)
104 });
105
106 anyhow::Ok(Self {
107 code_range,
108 invalidation_anchor,
109 block_id,
110 execution_view,
111 })
112 }
113
114 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
115 self.execution_view.update(cx, |execution_view, cx| {
116 execution_view.push_message(&message.content, cx);
117 });
118 }
119
120 fn create_output_area_renderer(
121 execution_view: View<ExecutionView>,
122 on_close: CloseBlockFn,
123 ) -> RenderBlock {
124 Arc::new(move |cx: &mut BlockContext| {
125 let execution_view = execution_view.clone();
126 let text_style = crate::outputs::plain::text_style(cx);
127
128 let gutter = cx.gutter_dimensions;
129
130 let block_id = cx.block_id;
131 let on_close = on_close.clone();
132
133 let rem_size = cx.rem_size();
134
135 let text_line_height = text_style.line_height_in_pixels(rem_size);
136
137 let close_button = h_flex()
138 .flex_none()
139 .items_center()
140 .justify_center()
141 .absolute()
142 .top(text_line_height / 2.)
143 .right(
144 // 2px is a magic number to nudge the button just a bit closer to
145 // the line number start
146 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
147 )
148 .w(text_line_height)
149 .h(text_line_height)
150 .child(
151 IconButton::new("close_output_area", IconName::Close)
152 .icon_size(IconSize::Small)
153 .icon_color(Color::Muted)
154 .size(ButtonSize::Compact)
155 .shape(IconButtonShape::Square)
156 .tooltip(|cx| Tooltip::text("Close output area", cx))
157 .on_click(move |_, cx| {
158 if let BlockId::Custom(block_id) = block_id {
159 (on_close)(block_id, cx)
160 }
161 }),
162 );
163
164 div()
165 .id(cx.block_id)
166 .block_mouse_down()
167 .flex()
168 .items_start()
169 .min_h(text_line_height)
170 .w_full()
171 .border_y_1()
172 .border_color(cx.theme().colors().border)
173 .bg(cx.theme().colors().background)
174 .child(
175 div()
176 .relative()
177 .w(gutter.full_width())
178 .h(text_line_height * 2)
179 .child(close_button),
180 )
181 .child(
182 div()
183 .flex_1()
184 .size_full()
185 .py(text_line_height / 2.)
186 .mr(gutter.width)
187 .child(execution_view),
188 )
189 .into_any_element()
190 })
191 }
192}
193
194impl Session {
195 pub fn new(
196 editor: WeakView<Editor>,
197 fs: Arc<dyn Fs>,
198 telemetry: Arc<Telemetry>,
199 kernel_specification: KernelSpecification,
200 cx: &mut ViewContext<Self>,
201 ) -> Self {
202 let subscription = match editor.upgrade() {
203 Some(editor) => {
204 let buffer = editor.read(cx).buffer().clone();
205 cx.subscribe(&buffer, Self::on_buffer_event)
206 }
207 None => Subscription::new(|| {}),
208 };
209
210 let editor_handle = editor.clone();
211
212 editor
213 .update(cx, |editor, _cx| {
214 setup_editor_session_actions(editor, editor_handle);
215 })
216 .ok();
217
218 let mut session = Self {
219 fs,
220 editor,
221 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
222 messaging_task: None,
223 process_status_task: None,
224 blocks: HashMap::default(),
225 kernel_specification,
226 _buffer_subscription: subscription,
227 telemetry,
228 };
229
230 session.start_kernel(cx);
231 session
232 }
233
234 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
235 let kernel_language = self.kernel_specification.language();
236 let entity_id = self.editor.entity_id();
237 let working_directory = self
238 .editor
239 .upgrade()
240 .and_then(|editor| editor.read(cx).working_directory(cx))
241 .unwrap_or_else(temp_dir);
242
243 self.telemetry.report_repl_event(
244 kernel_language.into(),
245 KernelStatus::Starting.to_string(),
246 cx.entity_id().to_string(),
247 );
248
249 let kernel = RunningKernel::new(
250 self.kernel_specification.clone(),
251 entity_id,
252 working_directory,
253 self.fs.clone(),
254 cx,
255 );
256
257 let pending_kernel = cx
258 .spawn(|this, mut cx| async move {
259 let kernel = kernel.await;
260
261 match kernel {
262 Ok((mut kernel, mut messages_rx)) => {
263 this.update(&mut cx, |session, cx| {
264 let stderr = kernel.process.stderr.take();
265
266 cx.spawn(|_session, mut _cx| async move {
267 if stderr.is_none() {
268 return;
269 }
270 let reader = BufReader::new(stderr.unwrap());
271 let mut lines = reader.lines();
272 while let Some(Ok(line)) = lines.next().await {
273 // todo!(): Log stdout and stderr to something the session can show
274 log::error!("kernel: {}", line);
275 }
276 })
277 .detach();
278
279 let stdout = kernel.process.stdout.take();
280
281 cx.spawn(|_session, mut _cx| async move {
282 if stdout.is_none() {
283 return;
284 }
285 let reader = BufReader::new(stdout.unwrap());
286 let mut lines = reader.lines();
287 while let Some(Ok(line)) = lines.next().await {
288 log::info!("kernel: {}", line);
289 }
290 })
291 .detach();
292
293 let status = kernel.process.status();
294 session.kernel(Kernel::RunningKernel(kernel), cx);
295
296 let process_status_task = cx.spawn(|session, mut cx| async move {
297 let error_message = match status.await {
298 Ok(status) => {
299 if status.success() {
300 log::info!("kernel process exited successfully");
301 return;
302 }
303
304 format!("kernel process exited with status: {:?}", status)
305 }
306 Err(err) => {
307 format!("kernel process exited with error: {:?}", err)
308 }
309 };
310
311 log::error!("{}", error_message);
312
313 session
314 .update(&mut cx, |session, cx| {
315 session.kernel(
316 Kernel::ErroredLaunch(error_message.clone()),
317 cx,
318 );
319
320 session.blocks.values().for_each(|block| {
321 block.execution_view.update(
322 cx,
323 |execution_view, cx| {
324 match execution_view.status {
325 ExecutionStatus::Finished => {
326 // Do nothing when the output was good
327 }
328 _ => {
329 // All other cases, set the status to errored
330 execution_view.status =
331 ExecutionStatus::KernelErrored(
332 error_message.clone(),
333 )
334 }
335 }
336 cx.notify();
337 },
338 );
339 });
340
341 cx.notify();
342 })
343 .ok();
344 });
345
346 session.process_status_task = Some(process_status_task);
347
348 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
349 while let Some(message) = messages_rx.next().await {
350 session
351 .update(&mut cx, |session, cx| {
352 session.route(&message, cx);
353 })
354 .ok();
355 }
356 }));
357
358 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
359 // cx.spawn(|this, mut cx| async move {
360 // cx.background_executor()
361 // .timer(Duration::from_millis(120))
362 // .await;
363 // this.update(&mut cx, |this, cx| {
364 // this.send(KernelInfoRequest {}.into(), cx).ok();
365 // })
366 // .ok();
367 // })
368 // .detach();
369 })
370 .ok();
371 }
372 Err(err) => {
373 this.update(&mut cx, |session, cx| {
374 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
375 })
376 .ok();
377 }
378 }
379 })
380 .shared();
381
382 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
383 cx.notify();
384 }
385
386 fn on_buffer_event(
387 &mut self,
388 buffer: Model<MultiBuffer>,
389 event: &multi_buffer::Event,
390 cx: &mut ViewContext<Self>,
391 ) {
392 if let multi_buffer::Event::Edited { .. } = event {
393 let snapshot = buffer.read(cx).snapshot(cx);
394
395 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
396
397 self.blocks.retain(|_id, block| {
398 if block.invalidation_anchor.is_valid(&snapshot) {
399 true
400 } else {
401 blocks_to_remove.insert(block.block_id);
402 false
403 }
404 });
405
406 if !blocks_to_remove.is_empty() {
407 self.editor
408 .update(cx, |editor, cx| {
409 editor.remove_blocks(blocks_to_remove, None, cx);
410 })
411 .ok();
412 cx.notify();
413 }
414 }
415 }
416
417 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
418 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
419 kernel.request_tx.try_send(message).ok();
420 }
421
422 anyhow::Ok(())
423 }
424
425 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
426 let blocks_to_remove: HashSet<CustomBlockId> =
427 self.blocks.values().map(|block| block.block_id).collect();
428
429 self.editor
430 .update(cx, |editor, cx| {
431 editor.remove_blocks(blocks_to_remove, None, cx);
432 })
433 .ok();
434
435 self.blocks.clear();
436 }
437
438 pub fn execute(
439 &mut self,
440 code: String,
441 anchor_range: Range<Anchor>,
442 next_cell: Option<Anchor>,
443 move_down: bool,
444 cx: &mut ViewContext<Self>,
445 ) {
446 let Some(editor) = self.editor.upgrade() else {
447 return;
448 };
449
450 if code.is_empty() {
451 return;
452 }
453
454 let execute_request = ExecuteRequest {
455 code,
456 ..ExecuteRequest::default()
457 };
458
459 let message: JupyterMessage = execute_request.into();
460
461 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
462
463 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
464
465 self.blocks.retain(|_key, block| {
466 if anchor_range.overlaps(&block.code_range, &buffer) {
467 blocks_to_remove.insert(block.block_id);
468 false
469 } else {
470 true
471 }
472 });
473
474 self.editor
475 .update(cx, |editor, cx| {
476 editor.remove_blocks(blocks_to_remove, None, cx);
477 })
478 .ok();
479
480 let status = match &self.kernel {
481 Kernel::Restarting => ExecutionStatus::Restarting,
482 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
483 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
484 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
485 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
486 Kernel::Shutdown => ExecutionStatus::Shutdown,
487 };
488
489 let parent_message_id = message.header.msg_id.clone();
490 let session_view = cx.view().downgrade();
491 let weak_editor = self.editor.clone();
492
493 let on_close: CloseBlockFn =
494 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
495 if let Some(session) = session_view.upgrade() {
496 session.update(cx, |session, cx| {
497 session.blocks.remove(&parent_message_id);
498 cx.notify();
499 });
500 }
501
502 if let Some(editor) = weak_editor.upgrade() {
503 editor.update(cx, |editor, cx| {
504 let mut block_ids = HashSet::default();
505 block_ids.insert(block_id);
506 editor.remove_blocks(block_ids, None, cx);
507 });
508 }
509 });
510
511 let Ok(editor_block) =
512 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
513 else {
514 return;
515 };
516
517 let new_cursor_pos = if let Some(next_cursor) = next_cell {
518 next_cursor
519 } else {
520 editor_block.invalidation_anchor
521 };
522
523 self.blocks
524 .insert(message.header.msg_id.clone(), editor_block);
525
526 match &self.kernel {
527 Kernel::RunningKernel(_) => {
528 self.send(message, cx).ok();
529 }
530 Kernel::StartingKernel(task) => {
531 // Queue up the execution as a task to run after the kernel starts
532 let task = task.clone();
533 let message = message.clone();
534
535 cx.spawn(|this, mut cx| async move {
536 task.await;
537 this.update(&mut cx, |session, cx| {
538 session.send(message, cx).ok();
539 })
540 .ok();
541 })
542 .detach();
543 }
544 _ => {}
545 }
546
547 if move_down {
548 editor.update(cx, move |editor, cx| {
549 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
550 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
551 });
552 });
553 }
554 }
555
556 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
557 let parent_message_id = match message.parent_header.as_ref() {
558 Some(header) => &header.msg_id,
559 None => return,
560 };
561
562 match &message.content {
563 JupyterMessageContent::Status(status) => {
564 self.kernel.set_execution_state(&status.execution_state);
565
566 self.telemetry.report_repl_event(
567 self.kernel_specification.language().into(),
568 KernelStatus::from(&self.kernel).to_string(),
569 cx.entity_id().to_string(),
570 );
571
572 cx.notify();
573 }
574 JupyterMessageContent::KernelInfoReply(reply) => {
575 self.kernel.set_kernel_info(reply);
576 cx.notify();
577 }
578 JupyterMessageContent::UpdateDisplayData(update) => {
579 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
580 display_id
581 } else {
582 return;
583 };
584
585 self.blocks.iter_mut().for_each(|(_, block)| {
586 block.execution_view.update(cx, |execution_view, cx| {
587 execution_view.update_display_data(&update.data, &display_id, cx);
588 });
589 });
590 return;
591 }
592 _ => {}
593 }
594
595 if let Some(block) = self.blocks.get_mut(parent_message_id) {
596 block.handle_message(message, cx);
597 }
598 }
599
600 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
601 match &mut self.kernel {
602 Kernel::RunningKernel(_kernel) => {
603 self.send(InterruptRequest {}.into(), cx).ok();
604 }
605 Kernel::StartingKernel(_task) => {
606 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
607 }
608 _ => {}
609 }
610 }
611
612 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
613 if let Kernel::Shutdown = kernel {
614 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
615 }
616
617 let kernel_status = KernelStatus::from(&kernel).to_string();
618 let kernel_language = self.kernel_specification.language().into();
619
620 self.telemetry.report_repl_event(
621 kernel_language,
622 kernel_status,
623 cx.entity_id().to_string(),
624 );
625
626 self.kernel = kernel;
627 }
628
629 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
630 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
631
632 match kernel {
633 Kernel::RunningKernel(mut kernel) => {
634 let mut request_tx = kernel.request_tx.clone();
635
636 cx.spawn(|this, mut cx| async move {
637 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
638 request_tx.try_send(message).ok();
639
640 // Give the kernel a bit of time to clean up
641 cx.background_executor().timer(Duration::from_secs(3)).await;
642
643 this.update(&mut cx, |session, _cx| {
644 session.messaging_task.take();
645 session.process_status_task.take();
646 })
647 .ok();
648
649 kernel.process.kill().ok();
650
651 this.update(&mut cx, |session, cx| {
652 session.clear_outputs(cx);
653 session.kernel(Kernel::Shutdown, cx);
654 cx.notify();
655 })
656 .ok();
657 })
658 .detach();
659 }
660 _ => {
661 self.messaging_task.take();
662 self.process_status_task.take();
663 self.kernel(Kernel::Shutdown, cx);
664 }
665 }
666 cx.notify();
667 }
668
669 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
670 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
671
672 match kernel {
673 Kernel::Restarting => {
674 // Do nothing if already restarting
675 }
676 Kernel::RunningKernel(mut kernel) => {
677 let mut request_tx = kernel.request_tx.clone();
678
679 cx.spawn(|this, mut cx| async move {
680 // Send shutdown request with restart flag
681 log::debug!("restarting kernel");
682 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
683 request_tx.try_send(message).ok();
684
685 this.update(&mut cx, |session, _cx| {
686 session.messaging_task.take();
687 session.process_status_task.take();
688 })
689 .ok();
690
691 // Wait for kernel to shutdown
692 cx.background_executor().timer(Duration::from_secs(1)).await;
693
694 // Force kill the kernel if it hasn't shut down
695 kernel.process.kill().ok();
696
697 // Start a new kernel
698 this.update(&mut cx, |session, cx| {
699 // todo!(): Differentiate between restart and restart+clear-outputs
700 session.clear_outputs(cx);
701 session.start_kernel(cx);
702 })
703 .ok();
704 })
705 .detach();
706 }
707 _ => {
708 // If it's not already running, we can just clean up and start a new kernel
709 self.messaging_task.take();
710 self.process_status_task.take();
711 self.clear_outputs(cx);
712 self.start_kernel(cx);
713 }
714 }
715 cx.notify();
716 }
717}
718
719pub enum SessionEvent {
720 Shutdown(WeakView<Editor>),
721}
722
723impl EventEmitter<SessionEvent> for Session {}
724
725impl Render for Session {
726 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
727 let (status_text, interrupt_button) = match &self.kernel {
728 Kernel::RunningKernel(kernel) => (
729 kernel
730 .kernel_info
731 .as_ref()
732 .map(|info| info.language_info.name.clone()),
733 Some(
734 Button::new("interrupt", "Interrupt")
735 .style(ButtonStyle::Subtle)
736 .on_click(cx.listener(move |session, _, cx| {
737 session.interrupt(cx);
738 })),
739 ),
740 ),
741 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
742 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
743 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
744 Kernel::Shutdown => (Some("Shutdown".into()), None),
745 Kernel::Restarting => (Some("Restarting".into()), None),
746 };
747
748 KernelListItem::new(self.kernel_specification.clone())
749 .status_color(match &self.kernel {
750 Kernel::RunningKernel(kernel) => match kernel.execution_state {
751 ExecutionState::Idle => Color::Success,
752 ExecutionState::Busy => Color::Modified,
753 },
754 Kernel::StartingKernel(_) => Color::Modified,
755 Kernel::ErroredLaunch(_) => Color::Error,
756 Kernel::ShuttingDown => Color::Modified,
757 Kernel::Shutdown => Color::Disabled,
758 Kernel::Restarting => Color::Modified,
759 })
760 .child(Label::new(self.kernel_specification.name()))
761 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
762 .button(
763 Button::new("shutdown", "Shutdown")
764 .style(ButtonStyle::Subtle)
765 .disabled(self.kernel.is_shutting_down())
766 .on_click(cx.listener(move |session, _, cx| {
767 session.shutdown(cx);
768 })),
769 )
770 .buttons(interrupt_button)
771 }
772}