1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
21 WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Option<Task<()>>,
39 process_status_task: Option<Task<()>>,
40 pub kernel_specification: KernelSpecification,
41 telemetry: Arc<Telemetry>,
42 _buffer_subscription: Subscription,
43}
44
45struct EditorBlock {
46 code_range: Range<Anchor>,
47 invalidation_anchor: Anchor,
48 block_id: CustomBlockId,
49 execution_view: View<ExecutionView>,
50}
51
52type CloseBlockFn =
53 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
54
55impl EditorBlock {
56 fn new(
57 editor: WeakView<Editor>,
58 code_range: Range<Anchor>,
59 status: ExecutionStatus,
60 on_close: CloseBlockFn,
61 cx: &mut ViewContext<Session>,
62 ) -> anyhow::Result<Self> {
63 let editor = editor
64 .upgrade()
65 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
66 let workspace = editor
67 .read(cx)
68 .workspace()
69 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
70
71 let execution_view =
72 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
73
74 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
75 let buffer = editor.buffer().clone();
76 let buffer_snapshot = buffer.read(cx).snapshot(cx);
77 let end_point = code_range.end.to_point(&buffer_snapshot);
78 let next_row_start = end_point + Point::new(1, 0);
79 if next_row_start > buffer_snapshot.max_point() {
80 buffer.update(cx, |buffer, cx| {
81 buffer.edit(
82 [(
83 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
84 "\n",
85 )],
86 None,
87 cx,
88 )
89 });
90 }
91
92 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
93 let block = BlockProperties {
94 position: code_range.end,
95 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
96 height: 1,
97 style: BlockStyle::Sticky,
98 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
99 disposition: BlockDisposition::Below,
100 priority: 0,
101 };
102
103 let block_id = editor.insert_blocks([block], None, cx)[0];
104 (block_id, invalidation_anchor)
105 });
106
107 anyhow::Ok(Self {
108 code_range,
109 invalidation_anchor,
110 block_id,
111 execution_view,
112 })
113 }
114
115 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
116 self.execution_view.update(cx, |execution_view, cx| {
117 execution_view.push_message(&message.content, cx);
118 });
119 }
120
121 fn create_output_area_renderer(
122 execution_view: View<ExecutionView>,
123 on_close: CloseBlockFn,
124 ) -> RenderBlock {
125 let render = move |cx: &mut BlockContext| {
126 let execution_view = execution_view.clone();
127 let text_style = crate::outputs::plain::text_style(cx);
128
129 let gutter = cx.gutter_dimensions;
130
131 let block_id = cx.block_id;
132 let on_close = on_close.clone();
133
134 let rem_size = cx.rem_size();
135
136 let text_line_height = text_style.line_height_in_pixels(rem_size);
137
138 let close_button = h_flex()
139 .flex_none()
140 .items_center()
141 .justify_center()
142 .absolute()
143 .top(text_line_height / 2.)
144 .right(
145 // 2px is a magic number to nudge the button just a bit closer to
146 // the line number start
147 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
148 )
149 .w(text_line_height)
150 .h(text_line_height)
151 .child(
152 IconButton::new(
153 ("close_output_area", EntityId::from(cx.block_id)),
154 IconName::Close,
155 )
156 .icon_size(IconSize::Small)
157 .icon_color(Color::Muted)
158 .size(ButtonSize::Compact)
159 .shape(IconButtonShape::Square)
160 .tooltip(|cx| Tooltip::text("Close output area", cx))
161 .on_click(move |_, cx| {
162 if let BlockId::Custom(block_id) = block_id {
163 (on_close)(block_id, cx)
164 }
165 }),
166 );
167
168 div()
169 .flex()
170 .items_start()
171 .min_h(text_line_height)
172 .w_full()
173 .border_y_1()
174 .border_color(cx.theme().colors().border)
175 .bg(cx.theme().colors().background)
176 .child(
177 div()
178 .relative()
179 .w(gutter.full_width())
180 .h(text_line_height * 2)
181 .child(close_button),
182 )
183 .child(
184 div()
185 .flex_1()
186 .size_full()
187 .py(text_line_height / 2.)
188 .mr(gutter.width)
189 .child(execution_view),
190 )
191 .into_any_element()
192 };
193
194 Box::new(render)
195 }
196}
197
198impl Session {
199 pub fn new(
200 editor: WeakView<Editor>,
201 fs: Arc<dyn Fs>,
202 telemetry: Arc<Telemetry>,
203 kernel_specification: KernelSpecification,
204 cx: &mut ViewContext<Self>,
205 ) -> Self {
206 let subscription = match editor.upgrade() {
207 Some(editor) => {
208 let buffer = editor.read(cx).buffer().clone();
209 cx.subscribe(&buffer, Self::on_buffer_event)
210 }
211 None => Subscription::new(|| {}),
212 };
213
214 let mut session = Self {
215 fs,
216 editor,
217 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
218 messaging_task: None,
219 process_status_task: None,
220 blocks: HashMap::default(),
221 kernel_specification,
222 _buffer_subscription: subscription,
223 telemetry,
224 };
225
226 session.start_kernel(cx);
227 session
228 }
229
230 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
231 let kernel_language = self.kernel_specification.kernelspec.language.clone();
232 let entity_id = self.editor.entity_id();
233 let working_directory = self
234 .editor
235 .upgrade()
236 .and_then(|editor| editor.read(cx).working_directory(cx))
237 .unwrap_or_else(temp_dir);
238
239 self.telemetry.report_repl_event(
240 kernel_language.clone(),
241 KernelStatus::Starting.to_string(),
242 cx.entity_id().to_string(),
243 );
244
245 let kernel = RunningKernel::new(
246 self.kernel_specification.clone(),
247 entity_id,
248 working_directory,
249 self.fs.clone(),
250 cx,
251 );
252
253 let pending_kernel = cx
254 .spawn(|this, mut cx| async move {
255 let kernel = kernel.await;
256
257 match kernel {
258 Ok((mut kernel, mut messages_rx)) => {
259 this.update(&mut cx, |session, cx| {
260 let stderr = kernel.process.stderr.take();
261
262 cx.spawn(|_session, mut _cx| async move {
263 if let None = stderr {
264 return;
265 }
266 let reader = BufReader::new(stderr.unwrap());
267 let mut lines = reader.lines();
268 while let Some(Ok(line)) = lines.next().await {
269 // todo!(): Log stdout and stderr to something the session can show
270 log::error!("kernel: {}", line);
271 }
272 })
273 .detach();
274
275 let stdout = kernel.process.stdout.take();
276
277 cx.spawn(|_session, mut _cx| async move {
278 if let None = stdout {
279 return;
280 }
281 let reader = BufReader::new(stdout.unwrap());
282 let mut lines = reader.lines();
283 while let Some(Ok(line)) = lines.next().await {
284 log::info!("kernel: {}", line);
285 }
286 })
287 .detach();
288
289 let status = kernel.process.status();
290 session.kernel(Kernel::RunningKernel(kernel), cx);
291
292 let process_status_task = cx.spawn(|session, mut cx| async move {
293 let error_message = match status.await {
294 Ok(status) => {
295 if status.success() {
296 log::info!("kernel process exited successfully");
297 return;
298 }
299
300 format!("kernel process exited with status: {:?}", status)
301 }
302 Err(err) => {
303 format!("kernel process exited with error: {:?}", err)
304 }
305 };
306
307 log::error!("{}", error_message);
308
309 session
310 .update(&mut cx, |session, cx| {
311 session.kernel(
312 Kernel::ErroredLaunch(error_message.clone()),
313 cx,
314 );
315
316 session.blocks.values().for_each(|block| {
317 block.execution_view.update(
318 cx,
319 |execution_view, cx| {
320 match execution_view.status {
321 ExecutionStatus::Finished => {
322 // Do nothing when the output was good
323 }
324 _ => {
325 // All other cases, set the status to errored
326 execution_view.status =
327 ExecutionStatus::KernelErrored(
328 error_message.clone(),
329 )
330 }
331 }
332 cx.notify();
333 },
334 );
335 });
336
337 cx.notify();
338 })
339 .ok();
340 });
341
342 session.process_status_task = Some(process_status_task);
343
344 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
345 while let Some(message) = messages_rx.next().await {
346 session
347 .update(&mut cx, |session, cx| {
348 session.route(&message, cx);
349 })
350 .ok();
351 }
352 }));
353
354 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
355 // cx.spawn(|this, mut cx| async move {
356 // cx.background_executor()
357 // .timer(Duration::from_millis(120))
358 // .await;
359 // this.update(&mut cx, |this, cx| {
360 // this.send(KernelInfoRequest {}.into(), cx).ok();
361 // })
362 // .ok();
363 // })
364 // .detach();
365 })
366 .ok();
367 }
368 Err(err) => {
369 this.update(&mut cx, |session, cx| {
370 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
371 })
372 .ok();
373 }
374 }
375 })
376 .shared();
377
378 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
379 cx.notify();
380 }
381
382 fn on_buffer_event(
383 &mut self,
384 buffer: Model<MultiBuffer>,
385 event: &multi_buffer::Event,
386 cx: &mut ViewContext<Self>,
387 ) {
388 if let multi_buffer::Event::Edited { .. } = event {
389 let snapshot = buffer.read(cx).snapshot(cx);
390
391 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
392
393 self.blocks.retain(|_id, block| {
394 if block.invalidation_anchor.is_valid(&snapshot) {
395 true
396 } else {
397 blocks_to_remove.insert(block.block_id);
398 false
399 }
400 });
401
402 if !blocks_to_remove.is_empty() {
403 self.editor
404 .update(cx, |editor, cx| {
405 editor.remove_blocks(blocks_to_remove, None, cx);
406 })
407 .ok();
408 cx.notify();
409 }
410 }
411 }
412
413 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
414 match &mut self.kernel {
415 Kernel::RunningKernel(kernel) => {
416 kernel.request_tx.try_send(message).ok();
417 }
418 _ => {}
419 }
420
421 anyhow::Ok(())
422 }
423
424 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
425 let blocks_to_remove: HashSet<CustomBlockId> =
426 self.blocks.values().map(|block| block.block_id).collect();
427
428 self.editor
429 .update(cx, |editor, cx| {
430 editor.remove_blocks(blocks_to_remove, None, cx);
431 })
432 .ok();
433
434 self.blocks.clear();
435 }
436
437 pub fn execute(
438 &mut self,
439 code: String,
440 anchor_range: Range<Anchor>,
441 next_cell: Option<Anchor>,
442 move_down: bool,
443 cx: &mut ViewContext<Self>,
444 ) {
445 let Some(editor) = self.editor.upgrade() else {
446 return;
447 };
448
449 if code.is_empty() {
450 return;
451 }
452
453 let execute_request = ExecuteRequest {
454 code,
455 ..ExecuteRequest::default()
456 };
457
458 let message: JupyterMessage = execute_request.into();
459
460 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
461
462 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
463
464 self.blocks.retain(|_key, block| {
465 if anchor_range.overlaps(&block.code_range, &buffer) {
466 blocks_to_remove.insert(block.block_id);
467 false
468 } else {
469 true
470 }
471 });
472
473 self.editor
474 .update(cx, |editor, cx| {
475 editor.remove_blocks(blocks_to_remove, None, cx);
476 })
477 .ok();
478
479 let status = match &self.kernel {
480 Kernel::Restarting => ExecutionStatus::Restarting,
481 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
482 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
483 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
484 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
485 Kernel::Shutdown => ExecutionStatus::Shutdown,
486 };
487
488 let parent_message_id = message.header.msg_id.clone();
489 let session_view = cx.view().downgrade();
490 let weak_editor = self.editor.clone();
491
492 let on_close: CloseBlockFn =
493 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
494 if let Some(session) = session_view.upgrade() {
495 session.update(cx, |session, cx| {
496 session.blocks.remove(&parent_message_id);
497 cx.notify();
498 });
499 }
500
501 if let Some(editor) = weak_editor.upgrade() {
502 editor.update(cx, |editor, cx| {
503 let mut block_ids = HashSet::default();
504 block_ids.insert(block_id);
505 editor.remove_blocks(block_ids, None, cx);
506 });
507 }
508 });
509
510 let Ok(editor_block) =
511 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
512 else {
513 return;
514 };
515
516 let new_cursor_pos = if let Some(next_cursor) = next_cell {
517 next_cursor
518 } else {
519 editor_block.invalidation_anchor
520 };
521
522 self.blocks
523 .insert(message.header.msg_id.clone(), editor_block);
524
525 match &self.kernel {
526 Kernel::RunningKernel(_) => {
527 self.send(message, cx).ok();
528 }
529 Kernel::StartingKernel(task) => {
530 // Queue up the execution as a task to run after the kernel starts
531 let task = task.clone();
532 let message = message.clone();
533
534 cx.spawn(|this, mut cx| async move {
535 task.await;
536 this.update(&mut cx, |session, cx| {
537 session.send(message, cx).ok();
538 })
539 .ok();
540 })
541 .detach();
542 }
543 _ => {}
544 }
545
546 if move_down {
547 editor.update(cx, move |editor, cx| {
548 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
549 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
550 });
551 });
552 }
553 }
554
555 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
556 let parent_message_id = match message.parent_header.as_ref() {
557 Some(header) => &header.msg_id,
558 None => return,
559 };
560
561 match &message.content {
562 JupyterMessageContent::Status(status) => {
563 self.kernel.set_execution_state(&status.execution_state);
564
565 self.telemetry.report_repl_event(
566 self.kernel_specification.kernelspec.language.clone(),
567 KernelStatus::from(&self.kernel).to_string(),
568 cx.entity_id().to_string(),
569 );
570
571 cx.notify();
572 }
573 JupyterMessageContent::KernelInfoReply(reply) => {
574 self.kernel.set_kernel_info(&reply);
575 cx.notify();
576 }
577 JupyterMessageContent::UpdateDisplayData(update) => {
578 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
579 display_id
580 } else {
581 return;
582 };
583
584 self.blocks.iter_mut().for_each(|(_, block)| {
585 block.execution_view.update(cx, |execution_view, cx| {
586 execution_view.update_display_data(&update.data, &display_id, cx);
587 });
588 });
589 return;
590 }
591 _ => {}
592 }
593
594 if let Some(block) = self.blocks.get_mut(parent_message_id) {
595 block.handle_message(&message, cx);
596 return;
597 }
598 }
599
600 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
601 match &mut self.kernel {
602 Kernel::RunningKernel(_kernel) => {
603 self.send(InterruptRequest {}.into(), cx).ok();
604 }
605 Kernel::StartingKernel(_task) => {
606 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
607 }
608 _ => {}
609 }
610 }
611
612 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
613 if let Kernel::Shutdown = kernel {
614 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
615 }
616
617 let kernel_status = KernelStatus::from(&kernel).to_string();
618 let kernel_language = self.kernel_specification.kernelspec.language.clone();
619
620 self.telemetry.report_repl_event(
621 kernel_language,
622 kernel_status,
623 cx.entity_id().to_string(),
624 );
625
626 self.kernel = kernel;
627 }
628
629 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
630 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
631
632 match kernel {
633 Kernel::RunningKernel(mut kernel) => {
634 let mut request_tx = kernel.request_tx.clone();
635
636 cx.spawn(|this, mut cx| async move {
637 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
638 request_tx.try_send(message).ok();
639
640 // Give the kernel a bit of time to clean up
641 cx.background_executor().timer(Duration::from_secs(3)).await;
642
643 this.update(&mut cx, |session, _cx| {
644 session.messaging_task.take();
645 session.process_status_task.take();
646 })
647 .ok();
648
649 kernel.process.kill().ok();
650
651 this.update(&mut cx, |session, cx| {
652 session.clear_outputs(cx);
653 session.kernel(Kernel::Shutdown, cx);
654 cx.notify();
655 })
656 .ok();
657 })
658 .detach();
659 }
660 _ => {
661 self.messaging_task.take();
662 self.process_status_task.take();
663 self.kernel(Kernel::Shutdown, cx);
664 }
665 }
666 cx.notify();
667 }
668
669 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
670 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
671
672 match kernel {
673 Kernel::Restarting => {
674 // Do nothing if already restarting
675 }
676 Kernel::RunningKernel(mut kernel) => {
677 let mut request_tx = kernel.request_tx.clone();
678
679 cx.spawn(|this, mut cx| async move {
680 // Send shutdown request with restart flag
681 log::debug!("restarting kernel");
682 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
683 request_tx.try_send(message).ok();
684
685 this.update(&mut cx, |session, _cx| {
686 session.messaging_task.take();
687 session.process_status_task.take();
688 })
689 .ok();
690
691 // Wait for kernel to shutdown
692 cx.background_executor().timer(Duration::from_secs(1)).await;
693
694 // Force kill the kernel if it hasn't shut down
695 kernel.process.kill().ok();
696
697 // Start a new kernel
698 this.update(&mut cx, |session, cx| {
699 // todo!(): Differentiate between restart and restart+clear-outputs
700 session.clear_outputs(cx);
701 session.start_kernel(cx);
702 })
703 .ok();
704 })
705 .detach();
706 }
707 _ => {
708 // If it's not already running, we can just clean up and start a new kernel
709 self.messaging_task.take();
710 self.process_status_task.take();
711 self.clear_outputs(cx);
712 self.start_kernel(cx);
713 }
714 }
715 cx.notify();
716 }
717}
718
719pub enum SessionEvent {
720 Shutdown(WeakView<Editor>),
721}
722
723impl EventEmitter<SessionEvent> for Session {}
724
725impl Render for Session {
726 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
727 let (status_text, interrupt_button) = match &self.kernel {
728 Kernel::RunningKernel(kernel) => (
729 kernel
730 .kernel_info
731 .as_ref()
732 .map(|info| info.language_info.name.clone()),
733 Some(
734 Button::new("interrupt", "Interrupt")
735 .style(ButtonStyle::Subtle)
736 .on_click(cx.listener(move |session, _, cx| {
737 session.interrupt(cx);
738 })),
739 ),
740 ),
741 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
742 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
743 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
744 Kernel::Shutdown => (Some("Shutdown".into()), None),
745 Kernel::Restarting => (Some("Restarting".into()), None),
746 };
747
748 KernelListItem::new(self.kernel_specification.clone())
749 .status_color(match &self.kernel {
750 Kernel::RunningKernel(kernel) => match kernel.execution_state {
751 ExecutionState::Idle => Color::Success,
752 ExecutionState::Busy => Color::Modified,
753 },
754 Kernel::StartingKernel(_) => Color::Modified,
755 Kernel::ErroredLaunch(_) => Color::Error,
756 Kernel::ShuttingDown => Color::Modified,
757 Kernel::Shutdown => Color::Disabled,
758 Kernel::Restarting => Color::Modified,
759 })
760 .child(Label::new(self.kernel_specification.name.clone()))
761 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
762 .button(
763 Button::new("shutdown", "Shutdown")
764 .style(ButtonStyle::Subtle)
765 .disabled(self.kernel.is_shutting_down())
766 .on_click(cx.listener(move |session, _, cx| {
767 session.shutdown(cx);
768 })),
769 )
770 .buttons(interrupt_button)
771 }
772}