1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
21};
22use language::Point;
23use project::Fs;
24use runtimelib::{
25 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
26 ShutdownRequest,
27};
28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
29use theme::ActiveTheme;
30use ui::{prelude::*, IconButtonShape, Tooltip};
31
32pub struct Session {
33 fs: Arc<dyn Fs>,
34 editor: WeakView<Editor>,
35 pub kernel: Kernel,
36 blocks: HashMap<String, EditorBlock>,
37 messaging_task: Option<Task<()>>,
38 process_status_task: Option<Task<()>>,
39 pub kernel_specification: KernelSpecification,
40 telemetry: Arc<Telemetry>,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: View<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakView<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut ViewContext<Session>,
61 ) -> anyhow::Result<Self> {
62 let editor = editor
63 .upgrade()
64 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
65 let workspace = editor
66 .read(cx)
67 .workspace()
68 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
69
70 let execution_view =
71 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
72
73 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
74 let buffer = editor.buffer().clone();
75 let buffer_snapshot = buffer.read(cx).snapshot(cx);
76 let end_point = code_range.end.to_point(&buffer_snapshot);
77 let next_row_start = end_point + Point::new(1, 0);
78 if next_row_start > buffer_snapshot.max_point() {
79 buffer.update(cx, |buffer, cx| {
80 buffer.edit(
81 [(
82 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
83 "\n",
84 )],
85 None,
86 cx,
87 )
88 });
89 }
90
91 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
92 let block = BlockProperties {
93 placement: BlockPlacement::Below(code_range.end),
94 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
95 height: 1,
96 style: BlockStyle::Sticky,
97 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
98 priority: 0,
99 };
100
101 let block_id = editor.insert_blocks([block], None, cx)[0];
102 (block_id, invalidation_anchor)
103 });
104
105 anyhow::Ok(Self {
106 code_range,
107 invalidation_anchor,
108 block_id,
109 execution_view,
110 })
111 }
112
113 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
114 self.execution_view.update(cx, |execution_view, cx| {
115 execution_view.push_message(&message.content, cx);
116 });
117 }
118
119 fn create_output_area_renderer(
120 execution_view: View<ExecutionView>,
121 on_close: CloseBlockFn,
122 ) -> RenderBlock {
123 let render = move |cx: &mut BlockContext| {
124 let execution_view = execution_view.clone();
125 let text_style = crate::outputs::plain::text_style(cx);
126
127 let gutter = cx.gutter_dimensions;
128
129 let block_id = cx.block_id;
130 let on_close = on_close.clone();
131
132 let rem_size = cx.rem_size();
133
134 let text_line_height = text_style.line_height_in_pixels(rem_size);
135
136 let close_button = h_flex()
137 .flex_none()
138 .items_center()
139 .justify_center()
140 .absolute()
141 .top(text_line_height / 2.)
142 .right(
143 // 2px is a magic number to nudge the button just a bit closer to
144 // the line number start
145 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
146 )
147 .w(text_line_height)
148 .h(text_line_height)
149 .child(
150 IconButton::new("close_output_area", IconName::Close)
151 .icon_size(IconSize::Small)
152 .icon_color(Color::Muted)
153 .size(ButtonSize::Compact)
154 .shape(IconButtonShape::Square)
155 .tooltip(|cx| Tooltip::text("Close output area", cx))
156 .on_click(move |_, cx| {
157 if let BlockId::Custom(block_id) = block_id {
158 (on_close)(block_id, cx)
159 }
160 }),
161 );
162
163 div()
164 .id(cx.block_id)
165 .flex()
166 .items_start()
167 .min_h(text_line_height)
168 .w_full()
169 .border_y_1()
170 .border_color(cx.theme().colors().border)
171 .bg(cx.theme().colors().background)
172 .child(
173 div()
174 .relative()
175 .w(gutter.full_width())
176 .h(text_line_height * 2)
177 .child(close_button),
178 )
179 .child(
180 div()
181 .flex_1()
182 .size_full()
183 .py(text_line_height / 2.)
184 .mr(gutter.width)
185 .child(execution_view),
186 )
187 .into_any_element()
188 };
189
190 Box::new(render)
191 }
192}
193
194impl Session {
195 pub fn new(
196 editor: WeakView<Editor>,
197 fs: Arc<dyn Fs>,
198 telemetry: Arc<Telemetry>,
199 kernel_specification: KernelSpecification,
200 cx: &mut ViewContext<Self>,
201 ) -> Self {
202 let subscription = match editor.upgrade() {
203 Some(editor) => {
204 let buffer = editor.read(cx).buffer().clone();
205 cx.subscribe(&buffer, Self::on_buffer_event)
206 }
207 None => Subscription::new(|| {}),
208 };
209
210 let mut session = Self {
211 fs,
212 editor,
213 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
214 messaging_task: None,
215 process_status_task: None,
216 blocks: HashMap::default(),
217 kernel_specification,
218 _buffer_subscription: subscription,
219 telemetry,
220 };
221
222 session.start_kernel(cx);
223 session
224 }
225
226 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
227 let kernel_language = self.kernel_specification.kernelspec.language.clone();
228 let entity_id = self.editor.entity_id();
229 let working_directory = self
230 .editor
231 .upgrade()
232 .and_then(|editor| editor.read(cx).working_directory(cx))
233 .unwrap_or_else(temp_dir);
234
235 self.telemetry.report_repl_event(
236 kernel_language.clone(),
237 KernelStatus::Starting.to_string(),
238 cx.entity_id().to_string(),
239 );
240
241 let kernel = RunningKernel::new(
242 self.kernel_specification.clone(),
243 entity_id,
244 working_directory,
245 self.fs.clone(),
246 cx,
247 );
248
249 let pending_kernel = cx
250 .spawn(|this, mut cx| async move {
251 let kernel = kernel.await;
252
253 match kernel {
254 Ok((mut kernel, mut messages_rx)) => {
255 this.update(&mut cx, |session, cx| {
256 let stderr = kernel.process.stderr.take();
257
258 cx.spawn(|_session, mut _cx| async move {
259 if stderr.is_none() {
260 return;
261 }
262 let reader = BufReader::new(stderr.unwrap());
263 let mut lines = reader.lines();
264 while let Some(Ok(line)) = lines.next().await {
265 // todo!(): Log stdout and stderr to something the session can show
266 log::error!("kernel: {}", line);
267 }
268 })
269 .detach();
270
271 let stdout = kernel.process.stdout.take();
272
273 cx.spawn(|_session, mut _cx| async move {
274 if stdout.is_none() {
275 return;
276 }
277 let reader = BufReader::new(stdout.unwrap());
278 let mut lines = reader.lines();
279 while let Some(Ok(line)) = lines.next().await {
280 log::info!("kernel: {}", line);
281 }
282 })
283 .detach();
284
285 let status = kernel.process.status();
286 session.kernel(Kernel::RunningKernel(kernel), cx);
287
288 let process_status_task = cx.spawn(|session, mut cx| async move {
289 let error_message = match status.await {
290 Ok(status) => {
291 if status.success() {
292 log::info!("kernel process exited successfully");
293 return;
294 }
295
296 format!("kernel process exited with status: {:?}", status)
297 }
298 Err(err) => {
299 format!("kernel process exited with error: {:?}", err)
300 }
301 };
302
303 log::error!("{}", error_message);
304
305 session
306 .update(&mut cx, |session, cx| {
307 session.kernel(
308 Kernel::ErroredLaunch(error_message.clone()),
309 cx,
310 );
311
312 session.blocks.values().for_each(|block| {
313 block.execution_view.update(
314 cx,
315 |execution_view, cx| {
316 match execution_view.status {
317 ExecutionStatus::Finished => {
318 // Do nothing when the output was good
319 }
320 _ => {
321 // All other cases, set the status to errored
322 execution_view.status =
323 ExecutionStatus::KernelErrored(
324 error_message.clone(),
325 )
326 }
327 }
328 cx.notify();
329 },
330 );
331 });
332
333 cx.notify();
334 })
335 .ok();
336 });
337
338 session.process_status_task = Some(process_status_task);
339
340 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
341 while let Some(message) = messages_rx.next().await {
342 session
343 .update(&mut cx, |session, cx| {
344 session.route(&message, cx);
345 })
346 .ok();
347 }
348 }));
349
350 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
351 // cx.spawn(|this, mut cx| async move {
352 // cx.background_executor()
353 // .timer(Duration::from_millis(120))
354 // .await;
355 // this.update(&mut cx, |this, cx| {
356 // this.send(KernelInfoRequest {}.into(), cx).ok();
357 // })
358 // .ok();
359 // })
360 // .detach();
361 })
362 .ok();
363 }
364 Err(err) => {
365 this.update(&mut cx, |session, cx| {
366 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
367 })
368 .ok();
369 }
370 }
371 })
372 .shared();
373
374 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
375 cx.notify();
376 }
377
378 fn on_buffer_event(
379 &mut self,
380 buffer: Model<MultiBuffer>,
381 event: &multi_buffer::Event,
382 cx: &mut ViewContext<Self>,
383 ) {
384 if let multi_buffer::Event::Edited { .. } = event {
385 let snapshot = buffer.read(cx).snapshot(cx);
386
387 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
388
389 self.blocks.retain(|_id, block| {
390 if block.invalidation_anchor.is_valid(&snapshot) {
391 true
392 } else {
393 blocks_to_remove.insert(block.block_id);
394 false
395 }
396 });
397
398 if !blocks_to_remove.is_empty() {
399 self.editor
400 .update(cx, |editor, cx| {
401 editor.remove_blocks(blocks_to_remove, None, cx);
402 })
403 .ok();
404 cx.notify();
405 }
406 }
407 }
408
409 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
410 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
411 kernel.request_tx.try_send(message).ok();
412 }
413
414 anyhow::Ok(())
415 }
416
417 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
418 let blocks_to_remove: HashSet<CustomBlockId> =
419 self.blocks.values().map(|block| block.block_id).collect();
420
421 self.editor
422 .update(cx, |editor, cx| {
423 editor.remove_blocks(blocks_to_remove, None, cx);
424 })
425 .ok();
426
427 self.blocks.clear();
428 }
429
430 pub fn execute(
431 &mut self,
432 code: String,
433 anchor_range: Range<Anchor>,
434 next_cell: Option<Anchor>,
435 move_down: bool,
436 cx: &mut ViewContext<Self>,
437 ) {
438 let Some(editor) = self.editor.upgrade() else {
439 return;
440 };
441
442 if code.is_empty() {
443 return;
444 }
445
446 let execute_request = ExecuteRequest {
447 code,
448 ..ExecuteRequest::default()
449 };
450
451 let message: JupyterMessage = execute_request.into();
452
453 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
454
455 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
456
457 self.blocks.retain(|_key, block| {
458 if anchor_range.overlaps(&block.code_range, &buffer) {
459 blocks_to_remove.insert(block.block_id);
460 false
461 } else {
462 true
463 }
464 });
465
466 self.editor
467 .update(cx, |editor, cx| {
468 editor.remove_blocks(blocks_to_remove, None, cx);
469 })
470 .ok();
471
472 let status = match &self.kernel {
473 Kernel::Restarting => ExecutionStatus::Restarting,
474 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
475 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
476 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
477 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
478 Kernel::Shutdown => ExecutionStatus::Shutdown,
479 };
480
481 let parent_message_id = message.header.msg_id.clone();
482 let session_view = cx.view().downgrade();
483 let weak_editor = self.editor.clone();
484
485 let on_close: CloseBlockFn =
486 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
487 if let Some(session) = session_view.upgrade() {
488 session.update(cx, |session, cx| {
489 session.blocks.remove(&parent_message_id);
490 cx.notify();
491 });
492 }
493
494 if let Some(editor) = weak_editor.upgrade() {
495 editor.update(cx, |editor, cx| {
496 let mut block_ids = HashSet::default();
497 block_ids.insert(block_id);
498 editor.remove_blocks(block_ids, None, cx);
499 });
500 }
501 });
502
503 let Ok(editor_block) =
504 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
505 else {
506 return;
507 };
508
509 let new_cursor_pos = if let Some(next_cursor) = next_cell {
510 next_cursor
511 } else {
512 editor_block.invalidation_anchor
513 };
514
515 self.blocks
516 .insert(message.header.msg_id.clone(), editor_block);
517
518 match &self.kernel {
519 Kernel::RunningKernel(_) => {
520 self.send(message, cx).ok();
521 }
522 Kernel::StartingKernel(task) => {
523 // Queue up the execution as a task to run after the kernel starts
524 let task = task.clone();
525 let message = message.clone();
526
527 cx.spawn(|this, mut cx| async move {
528 task.await;
529 this.update(&mut cx, |session, cx| {
530 session.send(message, cx).ok();
531 })
532 .ok();
533 })
534 .detach();
535 }
536 _ => {}
537 }
538
539 if move_down {
540 editor.update(cx, move |editor, cx| {
541 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
542 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
543 });
544 });
545 }
546 }
547
548 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
549 let parent_message_id = match message.parent_header.as_ref() {
550 Some(header) => &header.msg_id,
551 None => return,
552 };
553
554 match &message.content {
555 JupyterMessageContent::Status(status) => {
556 self.kernel.set_execution_state(&status.execution_state);
557
558 self.telemetry.report_repl_event(
559 self.kernel_specification.kernelspec.language.clone(),
560 KernelStatus::from(&self.kernel).to_string(),
561 cx.entity_id().to_string(),
562 );
563
564 cx.notify();
565 }
566 JupyterMessageContent::KernelInfoReply(reply) => {
567 self.kernel.set_kernel_info(reply);
568 cx.notify();
569 }
570 JupyterMessageContent::UpdateDisplayData(update) => {
571 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
572 display_id
573 } else {
574 return;
575 };
576
577 self.blocks.iter_mut().for_each(|(_, block)| {
578 block.execution_view.update(cx, |execution_view, cx| {
579 execution_view.update_display_data(&update.data, &display_id, cx);
580 });
581 });
582 return;
583 }
584 _ => {}
585 }
586
587 if let Some(block) = self.blocks.get_mut(parent_message_id) {
588 block.handle_message(message, cx);
589 }
590 }
591
592 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
593 match &mut self.kernel {
594 Kernel::RunningKernel(_kernel) => {
595 self.send(InterruptRequest {}.into(), cx).ok();
596 }
597 Kernel::StartingKernel(_task) => {
598 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
599 }
600 _ => {}
601 }
602 }
603
604 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
605 if let Kernel::Shutdown = kernel {
606 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
607 }
608
609 let kernel_status = KernelStatus::from(&kernel).to_string();
610 let kernel_language = self.kernel_specification.kernelspec.language.clone();
611
612 self.telemetry.report_repl_event(
613 kernel_language,
614 kernel_status,
615 cx.entity_id().to_string(),
616 );
617
618 self.kernel = kernel;
619 }
620
621 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
622 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
623
624 match kernel {
625 Kernel::RunningKernel(mut kernel) => {
626 let mut request_tx = kernel.request_tx.clone();
627
628 cx.spawn(|this, mut cx| async move {
629 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
630 request_tx.try_send(message).ok();
631
632 // Give the kernel a bit of time to clean up
633 cx.background_executor().timer(Duration::from_secs(3)).await;
634
635 this.update(&mut cx, |session, _cx| {
636 session.messaging_task.take();
637 session.process_status_task.take();
638 })
639 .ok();
640
641 kernel.process.kill().ok();
642
643 this.update(&mut cx, |session, cx| {
644 session.clear_outputs(cx);
645 session.kernel(Kernel::Shutdown, cx);
646 cx.notify();
647 })
648 .ok();
649 })
650 .detach();
651 }
652 _ => {
653 self.messaging_task.take();
654 self.process_status_task.take();
655 self.kernel(Kernel::Shutdown, cx);
656 }
657 }
658 cx.notify();
659 }
660
661 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
662 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
663
664 match kernel {
665 Kernel::Restarting => {
666 // Do nothing if already restarting
667 }
668 Kernel::RunningKernel(mut kernel) => {
669 let mut request_tx = kernel.request_tx.clone();
670
671 cx.spawn(|this, mut cx| async move {
672 // Send shutdown request with restart flag
673 log::debug!("restarting kernel");
674 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
675 request_tx.try_send(message).ok();
676
677 this.update(&mut cx, |session, _cx| {
678 session.messaging_task.take();
679 session.process_status_task.take();
680 })
681 .ok();
682
683 // Wait for kernel to shutdown
684 cx.background_executor().timer(Duration::from_secs(1)).await;
685
686 // Force kill the kernel if it hasn't shut down
687 kernel.process.kill().ok();
688
689 // Start a new kernel
690 this.update(&mut cx, |session, cx| {
691 // todo!(): Differentiate between restart and restart+clear-outputs
692 session.clear_outputs(cx);
693 session.start_kernel(cx);
694 })
695 .ok();
696 })
697 .detach();
698 }
699 _ => {
700 // If it's not already running, we can just clean up and start a new kernel
701 self.messaging_task.take();
702 self.process_status_task.take();
703 self.clear_outputs(cx);
704 self.start_kernel(cx);
705 }
706 }
707 cx.notify();
708 }
709}
710
711pub enum SessionEvent {
712 Shutdown(WeakView<Editor>),
713}
714
715impl EventEmitter<SessionEvent> for Session {}
716
717impl Render for Session {
718 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
719 let (status_text, interrupt_button) = match &self.kernel {
720 Kernel::RunningKernel(kernel) => (
721 kernel
722 .kernel_info
723 .as_ref()
724 .map(|info| info.language_info.name.clone()),
725 Some(
726 Button::new("interrupt", "Interrupt")
727 .style(ButtonStyle::Subtle)
728 .on_click(cx.listener(move |session, _, cx| {
729 session.interrupt(cx);
730 })),
731 ),
732 ),
733 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
734 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
735 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
736 Kernel::Shutdown => (Some("Shutdown".into()), None),
737 Kernel::Restarting => (Some("Restarting".into()), None),
738 };
739
740 KernelListItem::new(self.kernel_specification.clone())
741 .status_color(match &self.kernel {
742 Kernel::RunningKernel(kernel) => match kernel.execution_state {
743 ExecutionState::Idle => Color::Success,
744 ExecutionState::Busy => Color::Modified,
745 },
746 Kernel::StartingKernel(_) => Color::Modified,
747 Kernel::ErroredLaunch(_) => Color::Error,
748 Kernel::ShuttingDown => Color::Modified,
749 Kernel::Shutdown => Color::Disabled,
750 Kernel::Restarting => Color::Modified,
751 })
752 .child(Label::new(self.kernel_specification.name.clone()))
753 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
754 .button(
755 Button::new("shutdown", "Shutdown")
756 .style(ButtonStyle::Subtle)
757 .disabled(self.kernel.is_shutting_down())
758 .on_click(cx.listener(move |session, _, cx| {
759 session.shutdown(cx);
760 })),
761 )
762 .buttons(interrupt_button)
763 }
764}