1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
21};
22use language::Point;
23use project::Fs;
24use runtimelib::{
25 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
26 ShutdownRequest,
27};
28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
29use theme::ActiveTheme;
30use ui::{prelude::*, IconButtonShape, Tooltip};
31
32pub struct Session {
33 fs: Arc<dyn Fs>,
34 editor: WeakView<Editor>,
35 pub kernel: Kernel,
36 blocks: HashMap<String, EditorBlock>,
37 messaging_task: Option<Task<()>>,
38 process_status_task: Option<Task<()>>,
39 pub kernel_specification: KernelSpecification,
40 telemetry: Arc<Telemetry>,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: View<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakView<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut ViewContext<Session>,
61 ) -> anyhow::Result<Self> {
62 let editor = editor
63 .upgrade()
64 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
65 let workspace = editor
66 .read(cx)
67 .workspace()
68 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
69
70 let execution_view =
71 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
72
73 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
74 let buffer = editor.buffer().clone();
75 let buffer_snapshot = buffer.read(cx).snapshot(cx);
76 let end_point = code_range.end.to_point(&buffer_snapshot);
77 let next_row_start = end_point + Point::new(1, 0);
78 if next_row_start > buffer_snapshot.max_point() {
79 buffer.update(cx, |buffer, cx| {
80 buffer.edit(
81 [(
82 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
83 "\n",
84 )],
85 None,
86 cx,
87 )
88 });
89 }
90
91 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
92 let block = BlockProperties {
93 position: code_range.end,
94 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
95 height: 1,
96 style: BlockStyle::Sticky,
97 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
98 disposition: BlockDisposition::Below,
99 priority: 0,
100 };
101
102 let block_id = editor.insert_blocks([block], None, cx)[0];
103 (block_id, invalidation_anchor)
104 });
105
106 anyhow::Ok(Self {
107 code_range,
108 invalidation_anchor,
109 block_id,
110 execution_view,
111 })
112 }
113
114 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
115 self.execution_view.update(cx, |execution_view, cx| {
116 execution_view.push_message(&message.content, cx);
117 });
118 }
119
120 fn create_output_area_renderer(
121 execution_view: View<ExecutionView>,
122 on_close: CloseBlockFn,
123 ) -> RenderBlock {
124 let render = move |cx: &mut BlockContext| {
125 let execution_view = execution_view.clone();
126 let text_style = crate::outputs::plain::text_style(cx);
127
128 let gutter = cx.gutter_dimensions;
129
130 let block_id = cx.block_id;
131 let on_close = on_close.clone();
132
133 let rem_size = cx.rem_size();
134
135 let text_line_height = text_style.line_height_in_pixels(rem_size);
136
137 let close_button = h_flex()
138 .flex_none()
139 .items_center()
140 .justify_center()
141 .absolute()
142 .top(text_line_height / 2.)
143 .right(
144 // 2px is a magic number to nudge the button just a bit closer to
145 // the line number start
146 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
147 )
148 .w(text_line_height)
149 .h(text_line_height)
150 .child(
151 IconButton::new("close_output_area", IconName::Close)
152 .icon_size(IconSize::Small)
153 .icon_color(Color::Muted)
154 .size(ButtonSize::Compact)
155 .shape(IconButtonShape::Square)
156 .tooltip(|cx| Tooltip::text("Close output area", cx))
157 .on_click(move |_, cx| {
158 if let BlockId::Custom(block_id) = block_id {
159 (on_close)(block_id, cx)
160 }
161 }),
162 );
163
164 div()
165 .id(cx.block_id)
166 .flex()
167 .items_start()
168 .min_h(text_line_height)
169 .w_full()
170 .border_y_1()
171 .border_color(cx.theme().colors().border)
172 .bg(cx.theme().colors().background)
173 .child(
174 div()
175 .relative()
176 .w(gutter.full_width())
177 .h(text_line_height * 2)
178 .child(close_button),
179 )
180 .child(
181 div()
182 .flex_1()
183 .size_full()
184 .py(text_line_height / 2.)
185 .mr(gutter.width)
186 .child(execution_view),
187 )
188 .into_any_element()
189 };
190
191 Box::new(render)
192 }
193}
194
195impl Session {
196 pub fn new(
197 editor: WeakView<Editor>,
198 fs: Arc<dyn Fs>,
199 telemetry: Arc<Telemetry>,
200 kernel_specification: KernelSpecification,
201 cx: &mut ViewContext<Self>,
202 ) -> Self {
203 let subscription = match editor.upgrade() {
204 Some(editor) => {
205 let buffer = editor.read(cx).buffer().clone();
206 cx.subscribe(&buffer, Self::on_buffer_event)
207 }
208 None => Subscription::new(|| {}),
209 };
210
211 let mut session = Self {
212 fs,
213 editor,
214 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
215 messaging_task: None,
216 process_status_task: None,
217 blocks: HashMap::default(),
218 kernel_specification,
219 _buffer_subscription: subscription,
220 telemetry,
221 };
222
223 session.start_kernel(cx);
224 session
225 }
226
227 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
228 let kernel_language = self.kernel_specification.kernelspec.language.clone();
229 let entity_id = self.editor.entity_id();
230 let working_directory = self
231 .editor
232 .upgrade()
233 .and_then(|editor| editor.read(cx).working_directory(cx))
234 .unwrap_or_else(temp_dir);
235
236 self.telemetry.report_repl_event(
237 kernel_language.clone(),
238 KernelStatus::Starting.to_string(),
239 cx.entity_id().to_string(),
240 );
241
242 let kernel = RunningKernel::new(
243 self.kernel_specification.clone(),
244 entity_id,
245 working_directory,
246 self.fs.clone(),
247 cx,
248 );
249
250 let pending_kernel = cx
251 .spawn(|this, mut cx| async move {
252 let kernel = kernel.await;
253
254 match kernel {
255 Ok((mut kernel, mut messages_rx)) => {
256 this.update(&mut cx, |session, cx| {
257 let stderr = kernel.process.stderr.take();
258
259 cx.spawn(|_session, mut _cx| async move {
260 if stderr.is_none() {
261 return;
262 }
263 let reader = BufReader::new(stderr.unwrap());
264 let mut lines = reader.lines();
265 while let Some(Ok(line)) = lines.next().await {
266 // todo!(): Log stdout and stderr to something the session can show
267 log::error!("kernel: {}", line);
268 }
269 })
270 .detach();
271
272 let stdout = kernel.process.stdout.take();
273
274 cx.spawn(|_session, mut _cx| async move {
275 if stdout.is_none() {
276 return;
277 }
278 let reader = BufReader::new(stdout.unwrap());
279 let mut lines = reader.lines();
280 while let Some(Ok(line)) = lines.next().await {
281 log::info!("kernel: {}", line);
282 }
283 })
284 .detach();
285
286 let status = kernel.process.status();
287 session.kernel(Kernel::RunningKernel(kernel), cx);
288
289 let process_status_task = cx.spawn(|session, mut cx| async move {
290 let error_message = match status.await {
291 Ok(status) => {
292 if status.success() {
293 log::info!("kernel process exited successfully");
294 return;
295 }
296
297 format!("kernel process exited with status: {:?}", status)
298 }
299 Err(err) => {
300 format!("kernel process exited with error: {:?}", err)
301 }
302 };
303
304 log::error!("{}", error_message);
305
306 session
307 .update(&mut cx, |session, cx| {
308 session.kernel(
309 Kernel::ErroredLaunch(error_message.clone()),
310 cx,
311 );
312
313 session.blocks.values().for_each(|block| {
314 block.execution_view.update(
315 cx,
316 |execution_view, cx| {
317 match execution_view.status {
318 ExecutionStatus::Finished => {
319 // Do nothing when the output was good
320 }
321 _ => {
322 // All other cases, set the status to errored
323 execution_view.status =
324 ExecutionStatus::KernelErrored(
325 error_message.clone(),
326 )
327 }
328 }
329 cx.notify();
330 },
331 );
332 });
333
334 cx.notify();
335 })
336 .ok();
337 });
338
339 session.process_status_task = Some(process_status_task);
340
341 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
342 while let Some(message) = messages_rx.next().await {
343 session
344 .update(&mut cx, |session, cx| {
345 session.route(&message, cx);
346 })
347 .ok();
348 }
349 }));
350
351 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
352 // cx.spawn(|this, mut cx| async move {
353 // cx.background_executor()
354 // .timer(Duration::from_millis(120))
355 // .await;
356 // this.update(&mut cx, |this, cx| {
357 // this.send(KernelInfoRequest {}.into(), cx).ok();
358 // })
359 // .ok();
360 // })
361 // .detach();
362 })
363 .ok();
364 }
365 Err(err) => {
366 this.update(&mut cx, |session, cx| {
367 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
368 })
369 .ok();
370 }
371 }
372 })
373 .shared();
374
375 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
376 cx.notify();
377 }
378
379 fn on_buffer_event(
380 &mut self,
381 buffer: Model<MultiBuffer>,
382 event: &multi_buffer::Event,
383 cx: &mut ViewContext<Self>,
384 ) {
385 if let multi_buffer::Event::Edited { .. } = event {
386 let snapshot = buffer.read(cx).snapshot(cx);
387
388 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
389
390 self.blocks.retain(|_id, block| {
391 if block.invalidation_anchor.is_valid(&snapshot) {
392 true
393 } else {
394 blocks_to_remove.insert(block.block_id);
395 false
396 }
397 });
398
399 if !blocks_to_remove.is_empty() {
400 self.editor
401 .update(cx, |editor, cx| {
402 editor.remove_blocks(blocks_to_remove, None, cx);
403 })
404 .ok();
405 cx.notify();
406 }
407 }
408 }
409
410 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
411 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
412 kernel.request_tx.try_send(message).ok();
413 }
414
415 anyhow::Ok(())
416 }
417
418 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
419 let blocks_to_remove: HashSet<CustomBlockId> =
420 self.blocks.values().map(|block| block.block_id).collect();
421
422 self.editor
423 .update(cx, |editor, cx| {
424 editor.remove_blocks(blocks_to_remove, None, cx);
425 })
426 .ok();
427
428 self.blocks.clear();
429 }
430
431 pub fn execute(
432 &mut self,
433 code: String,
434 anchor_range: Range<Anchor>,
435 next_cell: Option<Anchor>,
436 move_down: bool,
437 cx: &mut ViewContext<Self>,
438 ) {
439 let Some(editor) = self.editor.upgrade() else {
440 return;
441 };
442
443 if code.is_empty() {
444 return;
445 }
446
447 let execute_request = ExecuteRequest {
448 code,
449 ..ExecuteRequest::default()
450 };
451
452 let message: JupyterMessage = execute_request.into();
453
454 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
455
456 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
457
458 self.blocks.retain(|_key, block| {
459 if anchor_range.overlaps(&block.code_range, &buffer) {
460 blocks_to_remove.insert(block.block_id);
461 false
462 } else {
463 true
464 }
465 });
466
467 self.editor
468 .update(cx, |editor, cx| {
469 editor.remove_blocks(blocks_to_remove, None, cx);
470 })
471 .ok();
472
473 let status = match &self.kernel {
474 Kernel::Restarting => ExecutionStatus::Restarting,
475 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
476 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
477 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
478 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
479 Kernel::Shutdown => ExecutionStatus::Shutdown,
480 };
481
482 let parent_message_id = message.header.msg_id.clone();
483 let session_view = cx.view().downgrade();
484 let weak_editor = self.editor.clone();
485
486 let on_close: CloseBlockFn =
487 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
488 if let Some(session) = session_view.upgrade() {
489 session.update(cx, |session, cx| {
490 session.blocks.remove(&parent_message_id);
491 cx.notify();
492 });
493 }
494
495 if let Some(editor) = weak_editor.upgrade() {
496 editor.update(cx, |editor, cx| {
497 let mut block_ids = HashSet::default();
498 block_ids.insert(block_id);
499 editor.remove_blocks(block_ids, None, cx);
500 });
501 }
502 });
503
504 let Ok(editor_block) =
505 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
506 else {
507 return;
508 };
509
510 let new_cursor_pos = if let Some(next_cursor) = next_cell {
511 next_cursor
512 } else {
513 editor_block.invalidation_anchor
514 };
515
516 self.blocks
517 .insert(message.header.msg_id.clone(), editor_block);
518
519 match &self.kernel {
520 Kernel::RunningKernel(_) => {
521 self.send(message, cx).ok();
522 }
523 Kernel::StartingKernel(task) => {
524 // Queue up the execution as a task to run after the kernel starts
525 let task = task.clone();
526 let message = message.clone();
527
528 cx.spawn(|this, mut cx| async move {
529 task.await;
530 this.update(&mut cx, |session, cx| {
531 session.send(message, cx).ok();
532 })
533 .ok();
534 })
535 .detach();
536 }
537 _ => {}
538 }
539
540 if move_down {
541 editor.update(cx, move |editor, cx| {
542 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
543 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
544 });
545 });
546 }
547 }
548
549 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
550 let parent_message_id = match message.parent_header.as_ref() {
551 Some(header) => &header.msg_id,
552 None => return,
553 };
554
555 match &message.content {
556 JupyterMessageContent::Status(status) => {
557 self.kernel.set_execution_state(&status.execution_state);
558
559 self.telemetry.report_repl_event(
560 self.kernel_specification.kernelspec.language.clone(),
561 KernelStatus::from(&self.kernel).to_string(),
562 cx.entity_id().to_string(),
563 );
564
565 cx.notify();
566 }
567 JupyterMessageContent::KernelInfoReply(reply) => {
568 self.kernel.set_kernel_info(reply);
569 cx.notify();
570 }
571 JupyterMessageContent::UpdateDisplayData(update) => {
572 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
573 display_id
574 } else {
575 return;
576 };
577
578 self.blocks.iter_mut().for_each(|(_, block)| {
579 block.execution_view.update(cx, |execution_view, cx| {
580 execution_view.update_display_data(&update.data, &display_id, cx);
581 });
582 });
583 return;
584 }
585 _ => {}
586 }
587
588 if let Some(block) = self.blocks.get_mut(parent_message_id) {
589 block.handle_message(message, cx);
590 }
591 }
592
593 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
594 match &mut self.kernel {
595 Kernel::RunningKernel(_kernel) => {
596 self.send(InterruptRequest {}.into(), cx).ok();
597 }
598 Kernel::StartingKernel(_task) => {
599 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
600 }
601 _ => {}
602 }
603 }
604
605 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
606 if let Kernel::Shutdown = kernel {
607 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
608 }
609
610 let kernel_status = KernelStatus::from(&kernel).to_string();
611 let kernel_language = self.kernel_specification.kernelspec.language.clone();
612
613 self.telemetry.report_repl_event(
614 kernel_language,
615 kernel_status,
616 cx.entity_id().to_string(),
617 );
618
619 self.kernel = kernel;
620 }
621
622 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
623 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
624
625 match kernel {
626 Kernel::RunningKernel(mut kernel) => {
627 let mut request_tx = kernel.request_tx.clone();
628
629 cx.spawn(|this, mut cx| async move {
630 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
631 request_tx.try_send(message).ok();
632
633 // Give the kernel a bit of time to clean up
634 cx.background_executor().timer(Duration::from_secs(3)).await;
635
636 this.update(&mut cx, |session, _cx| {
637 session.messaging_task.take();
638 session.process_status_task.take();
639 })
640 .ok();
641
642 kernel.process.kill().ok();
643
644 this.update(&mut cx, |session, cx| {
645 session.clear_outputs(cx);
646 session.kernel(Kernel::Shutdown, cx);
647 cx.notify();
648 })
649 .ok();
650 })
651 .detach();
652 }
653 _ => {
654 self.messaging_task.take();
655 self.process_status_task.take();
656 self.kernel(Kernel::Shutdown, cx);
657 }
658 }
659 cx.notify();
660 }
661
662 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
663 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
664
665 match kernel {
666 Kernel::Restarting => {
667 // Do nothing if already restarting
668 }
669 Kernel::RunningKernel(mut kernel) => {
670 let mut request_tx = kernel.request_tx.clone();
671
672 cx.spawn(|this, mut cx| async move {
673 // Send shutdown request with restart flag
674 log::debug!("restarting kernel");
675 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
676 request_tx.try_send(message).ok();
677
678 this.update(&mut cx, |session, _cx| {
679 session.messaging_task.take();
680 session.process_status_task.take();
681 })
682 .ok();
683
684 // Wait for kernel to shutdown
685 cx.background_executor().timer(Duration::from_secs(1)).await;
686
687 // Force kill the kernel if it hasn't shut down
688 kernel.process.kill().ok();
689
690 // Start a new kernel
691 this.update(&mut cx, |session, cx| {
692 // todo!(): Differentiate between restart and restart+clear-outputs
693 session.clear_outputs(cx);
694 session.start_kernel(cx);
695 })
696 .ok();
697 })
698 .detach();
699 }
700 _ => {
701 // If it's not already running, we can just clean up and start a new kernel
702 self.messaging_task.take();
703 self.process_status_task.take();
704 self.clear_outputs(cx);
705 self.start_kernel(cx);
706 }
707 }
708 cx.notify();
709 }
710}
711
712pub enum SessionEvent {
713 Shutdown(WeakView<Editor>),
714}
715
716impl EventEmitter<SessionEvent> for Session {}
717
718impl Render for Session {
719 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
720 let (status_text, interrupt_button) = match &self.kernel {
721 Kernel::RunningKernel(kernel) => (
722 kernel
723 .kernel_info
724 .as_ref()
725 .map(|info| info.language_info.name.clone()),
726 Some(
727 Button::new("interrupt", "Interrupt")
728 .style(ButtonStyle::Subtle)
729 .on_click(cx.listener(move |session, _, cx| {
730 session.interrupt(cx);
731 })),
732 ),
733 ),
734 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
735 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
736 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
737 Kernel::Shutdown => (Some("Shutdown".into()), None),
738 Kernel::Restarting => (Some("Restarting".into()), None),
739 };
740
741 KernelListItem::new(self.kernel_specification.clone())
742 .status_color(match &self.kernel {
743 Kernel::RunningKernel(kernel) => match kernel.execution_state {
744 ExecutionState::Idle => Color::Success,
745 ExecutionState::Busy => Color::Modified,
746 },
747 Kernel::StartingKernel(_) => Color::Modified,
748 Kernel::ErroredLaunch(_) => Color::Error,
749 Kernel::ShuttingDown => Color::Modified,
750 Kernel::Shutdown => Color::Disabled,
751 Kernel::Restarting => Color::Modified,
752 })
753 .child(Label::new(self.kernel_specification.name.clone()))
754 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
755 .button(
756 Button::new("shutdown", "Shutdown")
757 .style(ButtonStyle::Subtle)
758 .disabled(self.kernel.is_shutting_down())
759 .on_click(cx.listener(move |session, _, cx| {
760 session.shutdown(cx);
761 })),
762 )
763 .buttons(interrupt_button)
764 }
765}