session.rs

  1use crate::components::KernelListItem;
  2use crate::kernels::RemoteRunningKernel;
  3use crate::setup_editor_session_actions;
  4use crate::{
  5    KernelStatus,
  6    kernels::{Kernel, KernelSpecification, NativeRunningKernel},
  7    outputs::{ExecutionStatus, ExecutionView},
  8};
  9use collections::{HashMap, HashSet};
 10use editor::{
 11    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 12    display_map::{
 13        BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
 14        RenderBlock,
 15    },
 16    scroll::Autoscroll,
 17};
 18use futures::FutureExt as _;
 19use gpui::{
 20    Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
 21};
 22use language::Point;
 23use project::Fs;
 24use runtimelib::{
 25    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 26    ShutdownRequest,
 27};
 28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 29use theme::ActiveTheme;
 30use ui::{IconButtonShape, Tooltip, prelude::*};
 31use util::ResultExt as _;
 32
 33pub struct Session {
 34    fs: Arc<dyn Fs>,
 35    editor: WeakEntity<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    pub kernel_specification: KernelSpecification,
 39    _buffer_subscription: Subscription,
 40}
 41
 42struct EditorBlock {
 43    code_range: Range<Anchor>,
 44    invalidation_anchor: Anchor,
 45    block_id: CustomBlockId,
 46    execution_view: Entity<ExecutionView>,
 47}
 48
 49type CloseBlockFn =
 50    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
 51
 52impl EditorBlock {
 53    fn new(
 54        editor: WeakEntity<Editor>,
 55        code_range: Range<Anchor>,
 56        status: ExecutionStatus,
 57        on_close: CloseBlockFn,
 58        cx: &mut Context<Session>,
 59    ) -> anyhow::Result<Self> {
 60        let editor = editor
 61            .upgrade()
 62            .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
 63        let workspace = editor
 64            .read(cx)
 65            .workspace()
 66            .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
 67
 68        let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
 69
 70        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 71            let buffer = editor.buffer().clone();
 72            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 73            let end_point = code_range.end.to_point(&buffer_snapshot);
 74            let next_row_start = end_point + Point::new(1, 0);
 75            if next_row_start > buffer_snapshot.max_point() {
 76                buffer.update(cx, |buffer, cx| {
 77                    buffer.edit(
 78                        [(
 79                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 80                            "\n",
 81                        )],
 82                        None,
 83                        cx,
 84                    )
 85                });
 86            }
 87
 88            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 89            let block = BlockProperties {
 90                placement: BlockPlacement::Below(code_range.end),
 91                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 92                height: Some(1),
 93                style: BlockStyle::Sticky,
 94                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 95                priority: 0,
 96                render_in_minimap: false,
 97            };
 98
 99            let block_id = editor.insert_blocks([block], None, cx)[0];
100            (block_id, invalidation_anchor)
101        });
102
103        anyhow::Ok(Self {
104            code_range,
105            invalidation_anchor,
106            block_id,
107            execution_view,
108        })
109    }
110
111    fn handle_message(
112        &mut self,
113        message: &JupyterMessage,
114        window: &mut Window,
115        cx: &mut Context<Session>,
116    ) {
117        self.execution_view.update(cx, |execution_view, cx| {
118            execution_view.push_message(&message.content, window, cx);
119        });
120    }
121
122    fn create_output_area_renderer(
123        execution_view: Entity<ExecutionView>,
124        on_close: CloseBlockFn,
125    ) -> RenderBlock {
126        Arc::new(move |cx: &mut BlockContext| {
127            let execution_view = execution_view.clone();
128            let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
129
130            let editor_margins = cx.margins;
131            let gutter = editor_margins.gutter;
132
133            let block_id = cx.block_id;
134            let on_close = on_close.clone();
135
136            let rem_size = cx.window.rem_size();
137
138            let text_line_height = text_style.line_height_in_pixels(rem_size);
139
140            let close_button = h_flex()
141                .flex_none()
142                .items_center()
143                .justify_center()
144                .absolute()
145                .top(text_line_height / 2.)
146                .right(
147                    // 2px is a magic number to nudge the button just a bit closer to
148                    // the line number start
149                    gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
150                )
151                .w(text_line_height)
152                .h(text_line_height)
153                .child(
154                    IconButton::new("close_output_area", IconName::Close)
155                        .icon_size(IconSize::Small)
156                        .icon_color(Color::Muted)
157                        .size(ButtonSize::Compact)
158                        .shape(IconButtonShape::Square)
159                        .tooltip(Tooltip::text("Close output area"))
160                        .on_click(move |_, window, cx| {
161                            if let BlockId::Custom(block_id) = block_id {
162                                (on_close)(block_id, window, cx)
163                            }
164                        }),
165                );
166
167            div()
168                .id(cx.block_id)
169                .block_mouse_down()
170                .flex()
171                .items_start()
172                .min_h(text_line_height)
173                .w_full()
174                .border_y_1()
175                .border_color(cx.theme().colors().border)
176                .bg(cx.theme().colors().background)
177                .child(
178                    div()
179                        .relative()
180                        .w(gutter.full_width())
181                        .h(text_line_height * 2)
182                        .child(close_button),
183                )
184                .child(
185                    div()
186                        .flex_1()
187                        .size_full()
188                        .py(text_line_height / 2.)
189                        .mr(editor_margins.right)
190                        .pr_2()
191                        .child(execution_view),
192                )
193                .into_any_element()
194        })
195    }
196}
197
198impl Session {
199    pub fn new(
200        editor: WeakEntity<Editor>,
201        fs: Arc<dyn Fs>,
202        kernel_specification: KernelSpecification,
203        window: &mut Window,
204        cx: &mut Context<Self>,
205    ) -> Self {
206        let subscription = match editor.upgrade() {
207            Some(editor) => {
208                let buffer = editor.read(cx).buffer().clone();
209                cx.subscribe(&buffer, Self::on_buffer_event)
210            }
211            None => Subscription::new(|| {}),
212        };
213
214        let editor_handle = editor.clone();
215
216        editor
217            .update(cx, |editor, _cx| {
218                setup_editor_session_actions(editor, editor_handle);
219            })
220            .ok();
221
222        let mut session = Self {
223            fs,
224            editor,
225            kernel: Kernel::StartingKernel(Task::ready(()).shared()),
226            blocks: HashMap::default(),
227            kernel_specification,
228            _buffer_subscription: subscription,
229        };
230
231        session.start_kernel(window, cx);
232        session
233    }
234
235    fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
236        let kernel_language = self.kernel_specification.language();
237        let entity_id = self.editor.entity_id();
238        let working_directory = self
239            .editor
240            .upgrade()
241            .and_then(|editor| editor.read(cx).working_directory(cx))
242            .unwrap_or_else(temp_dir);
243
244        telemetry::event!(
245            "Kernel Status Changed",
246            kernel_language,
247            kernel_status = KernelStatus::Starting.to_string(),
248            repl_session_id = cx.entity_id().to_string(),
249        );
250
251        let session_view = cx.entity().clone();
252
253        let kernel = match self.kernel_specification.clone() {
254            KernelSpecification::Jupyter(kernel_specification)
255            | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
256                kernel_specification,
257                entity_id,
258                working_directory,
259                self.fs.clone(),
260                session_view,
261                window,
262                cx,
263            ),
264            KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
265                remote_kernel_specification,
266                working_directory,
267                session_view,
268                window,
269                cx,
270            ),
271        };
272
273        let pending_kernel = cx
274            .spawn(async move |this, cx| {
275                let kernel = kernel.await;
276
277                match kernel {
278                    Ok(kernel) => {
279                        this.update(cx, |session, cx| {
280                            session.kernel(Kernel::RunningKernel(kernel), cx);
281                        })
282                        .ok();
283                    }
284                    Err(err) => {
285                        this.update(cx, |session, cx| {
286                            session.kernel_errored(err.to_string(), cx);
287                        })
288                        .ok();
289                    }
290                }
291            })
292            .shared();
293
294        self.kernel(Kernel::StartingKernel(pending_kernel), cx);
295        cx.notify();
296    }
297
298    pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
299        self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
300
301        self.blocks.values().for_each(|block| {
302            block.execution_view.update(cx, |execution_view, cx| {
303                match execution_view.status {
304                    ExecutionStatus::Finished => {
305                        // Do nothing when the output was good
306                    }
307                    _ => {
308                        // All other cases, set the status to errored
309                        execution_view.status =
310                            ExecutionStatus::KernelErrored(error_message.clone())
311                    }
312                }
313                cx.notify();
314            });
315        });
316    }
317
318    fn on_buffer_event(
319        &mut self,
320        buffer: Entity<MultiBuffer>,
321        event: &multi_buffer::Event,
322        cx: &mut Context<Self>,
323    ) {
324        if let multi_buffer::Event::Edited { .. } = event {
325            let snapshot = buffer.read(cx).snapshot(cx);
326
327            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
328
329            self.blocks.retain(|_id, block| {
330                if block.invalidation_anchor.is_valid(&snapshot) {
331                    true
332                } else {
333                    blocks_to_remove.insert(block.block_id);
334                    false
335                }
336            });
337
338            if !blocks_to_remove.is_empty() {
339                self.editor
340                    .update(cx, |editor, cx| {
341                        editor.remove_blocks(blocks_to_remove, None, cx);
342                    })
343                    .ok();
344                cx.notify();
345            }
346        }
347    }
348
349    fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
350        if let Kernel::RunningKernel(kernel) = &mut self.kernel {
351            kernel.request_tx().try_send(message).ok();
352        }
353
354        anyhow::Ok(())
355    }
356
357    pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
358        let blocks_to_remove: HashSet<CustomBlockId> =
359            self.blocks.values().map(|block| block.block_id).collect();
360
361        self.editor
362            .update(cx, |editor, cx| {
363                editor.remove_blocks(blocks_to_remove, None, cx);
364            })
365            .ok();
366
367        self.blocks.clear();
368    }
369
370    pub fn execute(
371        &mut self,
372        code: String,
373        anchor_range: Range<Anchor>,
374        next_cell: Option<Anchor>,
375        move_down: bool,
376        window: &mut Window,
377        cx: &mut Context<Self>,
378    ) {
379        let Some(editor) = self.editor.upgrade() else {
380            return;
381        };
382
383        if code.is_empty() {
384            return;
385        }
386
387        let execute_request = ExecuteRequest {
388            code,
389            ..ExecuteRequest::default()
390        };
391
392        let message: JupyterMessage = execute_request.into();
393
394        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
395
396        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
397
398        self.blocks.retain(|_key, block| {
399            if anchor_range.overlaps(&block.code_range, &buffer) {
400                blocks_to_remove.insert(block.block_id);
401                false
402            } else {
403                true
404            }
405        });
406
407        self.editor
408            .update(cx, |editor, cx| {
409                editor.remove_blocks(blocks_to_remove, None, cx);
410            })
411            .ok();
412
413        let status = match &self.kernel {
414            Kernel::Restarting => ExecutionStatus::Restarting,
415            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
416            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
417            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
418            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
419            Kernel::Shutdown => ExecutionStatus::Shutdown,
420        };
421
422        let parent_message_id = message.header.msg_id.clone();
423        let session_view = cx.entity().downgrade();
424        let weak_editor = self.editor.clone();
425
426        let on_close: CloseBlockFn = Arc::new(
427            move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
428                if let Some(session) = session_view.upgrade() {
429                    session.update(cx, |session, cx| {
430                        session.blocks.remove(&parent_message_id);
431                        cx.notify();
432                    });
433                }
434
435                if let Some(editor) = weak_editor.upgrade() {
436                    editor.update(cx, |editor, cx| {
437                        let mut block_ids = HashSet::default();
438                        block_ids.insert(block_id);
439                        editor.remove_blocks(block_ids, None, cx);
440                    });
441                }
442            },
443        );
444
445        let Ok(editor_block) =
446            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
447        else {
448            return;
449        };
450
451        let new_cursor_pos = if let Some(next_cursor) = next_cell {
452            next_cursor
453        } else {
454            editor_block.invalidation_anchor
455        };
456
457        self.blocks
458            .insert(message.header.msg_id.clone(), editor_block);
459
460        match &self.kernel {
461            Kernel::RunningKernel(_) => {
462                self.send(message, cx).ok();
463            }
464            Kernel::StartingKernel(task) => {
465                // Queue up the execution as a task to run after the kernel starts
466                let task = task.clone();
467                let message = message.clone();
468
469                cx.spawn(async move |this, cx| {
470                    task.await;
471                    this.update(cx, |session, cx| {
472                        session.send(message, cx).ok();
473                    })
474                    .ok();
475                })
476                .detach();
477            }
478            _ => {}
479        }
480
481        if move_down {
482            editor.update(cx, move |editor, cx| {
483                editor.change_selections(
484                    Some(Autoscroll::top_relative(8)),
485                    window,
486                    cx,
487                    |selections| {
488                        selections.select_ranges([new_cursor_pos..new_cursor_pos]);
489                    },
490                );
491            });
492        }
493    }
494
495    pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
496        let parent_message_id = match message.parent_header.as_ref() {
497            Some(header) => &header.msg_id,
498            None => return,
499        };
500
501        match &message.content {
502            JupyterMessageContent::Status(status) => {
503                self.kernel.set_execution_state(&status.execution_state);
504
505                telemetry::event!(
506                    "Kernel Status Changed",
507                    kernel_language = self.kernel_specification.language(),
508                    kernel_status = KernelStatus::from(&self.kernel).to_string(),
509                    repl_session_id = cx.entity_id().to_string(),
510                );
511
512                cx.notify();
513            }
514            JupyterMessageContent::KernelInfoReply(reply) => {
515                self.kernel.set_kernel_info(reply);
516                cx.notify();
517            }
518            JupyterMessageContent::UpdateDisplayData(update) => {
519                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
520                    display_id
521                } else {
522                    return;
523                };
524
525                self.blocks.iter_mut().for_each(|(_, block)| {
526                    block.execution_view.update(cx, |execution_view, cx| {
527                        execution_view.update_display_data(&update.data, &display_id, window, cx);
528                    });
529                });
530                return;
531            }
532            _ => {}
533        }
534
535        if let Some(block) = self.blocks.get_mut(parent_message_id) {
536            block.handle_message(message, window, cx);
537        }
538    }
539
540    pub fn interrupt(&mut self, cx: &mut Context<Self>) {
541        match &mut self.kernel {
542            Kernel::RunningKernel(_kernel) => {
543                self.send(InterruptRequest {}.into(), cx).ok();
544            }
545            Kernel::StartingKernel(_task) => {
546                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
547            }
548            _ => {}
549        }
550    }
551
552    pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
553        if let Kernel::Shutdown = kernel {
554            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
555        }
556
557        let kernel_status = KernelStatus::from(&kernel).to_string();
558        let kernel_language = self.kernel_specification.language();
559
560        telemetry::event!(
561            "Kernel Status Changed",
562            kernel_language,
563            kernel_status,
564            repl_session_id = cx.entity_id().to_string(),
565        );
566
567        self.kernel = kernel;
568    }
569
570    pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
571        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
572
573        match kernel {
574            Kernel::RunningKernel(mut kernel) => {
575                let mut request_tx = kernel.request_tx().clone();
576
577                let forced = kernel.force_shutdown(window, cx);
578
579                cx.spawn(async move |this, cx| {
580                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
581                    request_tx.try_send(message).ok();
582
583                    forced.await.log_err();
584
585                    // Give the kernel a bit of time to clean up
586                    cx.background_executor().timer(Duration::from_secs(3)).await;
587
588                    this.update(cx, |session, cx| {
589                        session.clear_outputs(cx);
590                        session.kernel(Kernel::Shutdown, cx);
591                        cx.notify();
592                    })
593                    .ok();
594                })
595                .detach();
596            }
597            _ => {
598                self.kernel(Kernel::Shutdown, cx);
599            }
600        }
601        cx.notify();
602    }
603
604    pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
605        let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
606
607        match kernel {
608            Kernel::Restarting => {
609                // Do nothing if already restarting
610            }
611            Kernel::RunningKernel(mut kernel) => {
612                let mut request_tx = kernel.request_tx().clone();
613
614                let forced = kernel.force_shutdown(window, cx);
615
616                cx.spawn_in(window, async move |this, cx| {
617                    // Send shutdown request with restart flag
618                    log::debug!("restarting kernel");
619                    let message: JupyterMessage = ShutdownRequest { restart: true }.into();
620                    request_tx.try_send(message).ok();
621
622                    // Wait for kernel to shutdown
623                    cx.background_executor().timer(Duration::from_secs(1)).await;
624
625                    // Force kill the kernel if it hasn't shut down
626                    forced.await.log_err();
627
628                    // Start a new kernel
629                    this.update_in(cx, |session, window, cx| {
630                        // TODO: Differentiate between restart and restart+clear-outputs
631                        session.clear_outputs(cx);
632                        session.start_kernel(window, cx);
633                    })
634                    .ok();
635                })
636                .detach();
637            }
638            _ => {
639                self.clear_outputs(cx);
640                self.start_kernel(window, cx);
641            }
642        }
643        cx.notify();
644    }
645}
646
647pub enum SessionEvent {
648    Shutdown(WeakEntity<Editor>),
649}
650
651impl EventEmitter<SessionEvent> for Session {}
652
653impl Render for Session {
654    fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
655        let (status_text, interrupt_button) = match &self.kernel {
656            Kernel::RunningKernel(kernel) => (
657                kernel
658                    .kernel_info()
659                    .as_ref()
660                    .map(|info| info.language_info.name.clone()),
661                Some(
662                    Button::new("interrupt", "Interrupt")
663                        .style(ButtonStyle::Subtle)
664                        .on_click(cx.listener(move |session, _, _, cx| {
665                            session.interrupt(cx);
666                        })),
667                ),
668            ),
669            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
670            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
671            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
672            Kernel::Shutdown => (Some("Shutdown".into()), None),
673            Kernel::Restarting => (Some("Restarting".into()), None),
674        };
675
676        KernelListItem::new(self.kernel_specification.clone())
677            .status_color(match &self.kernel {
678                Kernel::RunningKernel(kernel) => match kernel.execution_state() {
679                    ExecutionState::Idle => Color::Success,
680                    ExecutionState::Busy => Color::Modified,
681                },
682                Kernel::StartingKernel(_) => Color::Modified,
683                Kernel::ErroredLaunch(_) => Color::Error,
684                Kernel::ShuttingDown => Color::Modified,
685                Kernel::Shutdown => Color::Disabled,
686                Kernel::Restarting => Color::Modified,
687            })
688            .child(Label::new(self.kernel_specification.name()))
689            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
690            .button(
691                Button::new("shutdown", "Shutdown")
692                    .style(ButtonStyle::Subtle)
693                    .disabled(self.kernel.is_shutting_down())
694                    .on_click(cx.listener(move |session, _, window, cx| {
695                        session.shutdown(window, cx);
696                    })),
697            )
698            .buttons(interrupt_button)
699    }
700}