session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::{FutureExt as _, StreamExt as _};
 18use gpui::{
 19    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 20    WeakView,
 21};
 22use language::Point;
 23use project::Fs;
 24use runtimelib::{
 25    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 26    ShutdownRequest,
 27};
 28use settings::Settings as _;
 29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 30use theme::{ActiveTheme, ThemeSettings};
 31use ui::{prelude::*, IconButtonShape, Tooltip};
 32
 33pub struct Session {
 34    editor: WeakView<Editor>,
 35    pub kernel: Kernel,
 36    blocks: HashMap<String, EditorBlock>,
 37    messaging_task: Task<()>,
 38    pub kernel_specification: KernelSpecification,
 39    telemetry: Arc<Telemetry>,
 40    _buffer_subscription: Subscription,
 41}
 42
 43struct EditorBlock {
 44    editor: WeakView<Editor>,
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: View<ExecutionView>,
 49    on_close: CloseBlockFn,
 50}
 51
 52type CloseBlockFn =
 53    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 54
 55impl EditorBlock {
 56    fn new(
 57        editor: WeakView<Editor>,
 58        code_range: Range<Anchor>,
 59        status: ExecutionStatus,
 60        on_close: CloseBlockFn,
 61        cx: &mut ViewContext<Session>,
 62    ) -> anyhow::Result<Self> {
 63        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 64
 65        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 66            let buffer = editor.buffer().clone();
 67            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 68            let end_point = code_range.end.to_point(&buffer_snapshot);
 69            let next_row_start = end_point + Point::new(1, 0);
 70            if next_row_start > buffer_snapshot.max_point() {
 71                buffer.update(cx, |buffer, cx| {
 72                    buffer.edit(
 73                        [(
 74                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 75                            "\n",
 76                        )],
 77                        None,
 78                        cx,
 79                    )
 80                });
 81            }
 82
 83            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 84            let block = BlockProperties {
 85                position: code_range.end,
 86                height: execution_view.num_lines(cx).saturating_add(1),
 87                style: BlockStyle::Sticky,
 88                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 89                disposition: BlockDisposition::Below,
 90            };
 91
 92            let block_id = editor.insert_blocks([block], None, cx)[0];
 93            (block_id, invalidation_anchor)
 94        })?;
 95
 96        anyhow::Ok(Self {
 97            editor,
 98            code_range,
 99            invalidation_anchor,
100            block_id,
101            execution_view,
102            on_close,
103        })
104    }
105
106    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
107        self.execution_view.update(cx, |execution_view, cx| {
108            execution_view.push_message(&message.content, cx);
109        });
110
111        self.editor
112            .update(cx, |editor, cx| {
113                let mut replacements = HashMap::default();
114
115                replacements.insert(
116                    self.block_id,
117                    (
118                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
119                        Self::create_output_area_renderer(
120                            self.execution_view.clone(),
121                            self.on_close.clone(),
122                        ),
123                    ),
124                );
125                editor.replace_blocks(replacements, None, cx);
126            })
127            .ok();
128    }
129
130    fn create_output_area_renderer(
131        execution_view: View<ExecutionView>,
132        on_close: CloseBlockFn,
133    ) -> RenderBlock {
134        let render = move |cx: &mut BlockContext| {
135            let execution_view = execution_view.clone();
136            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
137            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
138
139            let gutter = cx.gutter_dimensions;
140            let close_button_size = IconSize::XSmall;
141
142            let block_id = cx.block_id;
143            let on_close = on_close.clone();
144
145            let rem_size = cx.rem_size();
146            let line_height = cx.text_style().line_height_in_pixels(rem_size);
147
148            let (close_button_width, close_button_padding) =
149                close_button_size.square_components(cx);
150
151            div()
152                .min_h(line_height)
153                .flex()
154                .flex_row()
155                .items_start()
156                .w_full()
157                .bg(cx.theme().colors().background)
158                .border_y_1()
159                .border_color(cx.theme().colors().border)
160                .child(
161                    v_flex().min_h(cx.line_height()).justify_center().child(
162                        h_flex()
163                            .w(gutter.full_width())
164                            .justify_end()
165                            .pt(line_height / 2.)
166                            .child(
167                                h_flex()
168                                    .pr(gutter.width / 2. - close_button_width
169                                        + close_button_padding / 2.)
170                                    .child(
171                                        IconButton::new(
172                                            ("close_output_area", EntityId::from(cx.block_id)),
173                                            IconName::Close,
174                                        )
175                                        .shape(IconButtonShape::Square)
176                                        .icon_size(close_button_size)
177                                        .icon_color(Color::Muted)
178                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
179                                        .on_click(
180                                            move |_, cx| {
181                                                if let BlockId::Custom(block_id) = block_id {
182                                                    (on_close)(block_id, cx)
183                                                }
184                                            },
185                                        ),
186                                    ),
187                            ),
188                    ),
189                )
190                .child(
191                    div()
192                        .flex_1()
193                        .size_full()
194                        .my_2()
195                        .mr(gutter.width)
196                        .text_size(text_font_size)
197                        .font_family(text_font)
198                        .child(execution_view),
199                )
200                .into_any_element()
201        };
202
203        Box::new(render)
204    }
205}
206
207impl Session {
208    pub fn new(
209        editor: WeakView<Editor>,
210        fs: Arc<dyn Fs>,
211        telemetry: Arc<Telemetry>,
212        kernel_specification: KernelSpecification,
213        cx: &mut ViewContext<Self>,
214    ) -> Self {
215        let kernel_language = kernel_specification.kernelspec.language.clone();
216
217        telemetry.report_repl_event(
218            kernel_language.clone(),
219            KernelStatus::Starting.to_string(),
220            cx.entity_id().to_string(),
221        );
222
223        let entity_id = editor.entity_id();
224        let working_directory = editor
225            .upgrade()
226            .and_then(|editor| editor.read(cx).working_directory(cx))
227            .unwrap_or_else(temp_dir);
228        let kernel = RunningKernel::new(
229            kernel_specification.clone(),
230            entity_id,
231            working_directory,
232            fs.clone(),
233            cx,
234        );
235
236        let pending_kernel = cx
237            .spawn(|this, mut cx| async move {
238                let kernel = kernel.await;
239
240                match kernel {
241                    Ok((mut kernel, mut messages_rx)) => {
242                        this.update(&mut cx, |session, cx| {
243                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
244
245                            let status = kernel.process.status();
246                            session.kernel(Kernel::RunningKernel(kernel), cx);
247
248                            cx.spawn(|session, mut cx| async move {
249                                let error_message = match status.await {
250                                    Ok(status) => {
251                                        if status.success() {
252                                            log::info!("kernel process exited successfully");
253                                            return;
254                                        }
255
256                                        format!("kernel process exited with status: {:?}", status)
257                                    }
258                                    Err(err) => {
259                                        format!("kernel process exited with error: {:?}", err)
260                                    }
261                                };
262
263                                log::error!("{}", error_message);
264
265                                session
266                                    .update(&mut cx, |session, cx| {
267                                        session.kernel(
268                                            Kernel::ErroredLaunch(error_message.clone()),
269                                            cx,
270                                        );
271
272                                        session.blocks.values().for_each(|block| {
273                                            block.execution_view.update(
274                                                cx,
275                                                |execution_view, cx| {
276                                                    match execution_view.status {
277                                                        ExecutionStatus::Finished => {
278                                                            // Do nothing when the output was good
279                                                        }
280                                                        _ => {
281                                                            // All other cases, set the status to errored
282                                                            execution_view.status =
283                                                                ExecutionStatus::KernelErrored(
284                                                                    error_message.clone(),
285                                                                )
286                                                        }
287                                                    }
288                                                    cx.notify();
289                                                },
290                                            );
291                                        });
292
293                                        cx.notify();
294                                    })
295                                    .ok();
296                            })
297                            .detach();
298
299                            session.messaging_task = cx.spawn(|session, mut cx| async move {
300                                while let Some(message) = messages_rx.next().await {
301                                    session
302                                        .update(&mut cx, |session, cx| {
303                                            session.route(&message, cx);
304                                        })
305                                        .ok();
306                                }
307                            });
308                        })
309                        .ok();
310                    }
311                    Err(err) => {
312                        this.update(&mut cx, |session, cx| {
313                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
314                        })
315                        .ok();
316                    }
317                }
318            })
319            .shared();
320
321        let subscription = match editor.upgrade() {
322            Some(editor) => {
323                let buffer = editor.read(cx).buffer().clone();
324                cx.subscribe(&buffer, Self::on_buffer_event)
325            }
326            None => Subscription::new(|| {}),
327        };
328
329        return Self {
330            editor,
331            kernel: Kernel::StartingKernel(pending_kernel),
332            messaging_task: Task::ready(()),
333            blocks: HashMap::default(),
334            kernel_specification,
335            _buffer_subscription: subscription,
336            telemetry,
337        };
338    }
339
340    fn on_buffer_event(
341        &mut self,
342        buffer: Model<MultiBuffer>,
343        event: &multi_buffer::Event,
344        cx: &mut ViewContext<Self>,
345    ) {
346        if let multi_buffer::Event::Edited { .. } = event {
347            let snapshot = buffer.read(cx).snapshot(cx);
348
349            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
350
351            self.blocks.retain(|_id, block| {
352                if block.invalidation_anchor.is_valid(&snapshot) {
353                    true
354                } else {
355                    blocks_to_remove.insert(block.block_id);
356                    false
357                }
358            });
359
360            if !blocks_to_remove.is_empty() {
361                self.editor
362                    .update(cx, |editor, cx| {
363                        editor.remove_blocks(blocks_to_remove, None, cx);
364                    })
365                    .ok();
366                cx.notify();
367            }
368        }
369    }
370
371    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
372        match &mut self.kernel {
373            Kernel::RunningKernel(kernel) => {
374                kernel.request_tx.try_send(message).ok();
375            }
376            _ => {}
377        }
378
379        anyhow::Ok(())
380    }
381
382    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
383        let blocks_to_remove: HashSet<CustomBlockId> =
384            self.blocks.values().map(|block| block.block_id).collect();
385
386        self.editor
387            .update(cx, |editor, cx| {
388                editor.remove_blocks(blocks_to_remove, None, cx);
389            })
390            .ok();
391
392        self.blocks.clear();
393    }
394
395    pub fn execute(
396        &mut self,
397        code: String,
398        anchor_range: Range<Anchor>,
399        next_cell: Option<Anchor>,
400        cx: &mut ViewContext<Self>,
401    ) {
402        let Some(editor) = self.editor.upgrade() else {
403            return;
404        };
405
406        if code.is_empty() {
407            return;
408        }
409
410        let execute_request = ExecuteRequest {
411            code,
412            ..ExecuteRequest::default()
413        };
414
415        let message: JupyterMessage = execute_request.into();
416
417        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
418
419        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
420
421        self.blocks.retain(|_key, block| {
422            if anchor_range.overlaps(&block.code_range, &buffer) {
423                blocks_to_remove.insert(block.block_id);
424                false
425            } else {
426                true
427            }
428        });
429
430        self.editor
431            .update(cx, |editor, cx| {
432                editor.remove_blocks(blocks_to_remove, None, cx);
433            })
434            .ok();
435
436        let status = match &self.kernel {
437            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
438            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
439            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
440            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
441            Kernel::Shutdown => ExecutionStatus::Shutdown,
442        };
443
444        let parent_message_id = message.header.msg_id.clone();
445        let session_view = cx.view().downgrade();
446        let weak_editor = self.editor.clone();
447
448        let on_close: CloseBlockFn =
449            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
450                if let Some(session) = session_view.upgrade() {
451                    session.update(cx, |session, cx| {
452                        session.blocks.remove(&parent_message_id);
453                        cx.notify();
454                    });
455                }
456
457                if let Some(editor) = weak_editor.upgrade() {
458                    editor.update(cx, |editor, cx| {
459                        let mut block_ids = HashSet::default();
460                        block_ids.insert(block_id);
461                        editor.remove_blocks(block_ids, None, cx);
462                    });
463                }
464            });
465
466        let Ok(editor_block) =
467            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
468        else {
469            return;
470        };
471
472        let new_cursor_pos = if let Some(next_cursor) = next_cell {
473            next_cursor
474        } else {
475            editor_block.invalidation_anchor
476        };
477
478        self.blocks
479            .insert(message.header.msg_id.clone(), editor_block);
480
481        match &self.kernel {
482            Kernel::RunningKernel(_) => {
483                self.send(message, cx).ok();
484            }
485            Kernel::StartingKernel(task) => {
486                // Queue up the execution as a task to run after the kernel starts
487                let task = task.clone();
488                let message = message.clone();
489
490                cx.spawn(|this, mut cx| async move {
491                    task.await;
492                    this.update(&mut cx, |session, cx| {
493                        session.send(message, cx).ok();
494                    })
495                    .ok();
496                })
497                .detach();
498            }
499            _ => {}
500        }
501
502        // Now move the cursor to after the block
503        editor.update(cx, move |editor, cx| {
504            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
505                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
506            });
507        });
508    }
509
510    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
511        let parent_message_id = match message.parent_header.as_ref() {
512            Some(header) => &header.msg_id,
513            None => return,
514        };
515
516        match &message.content {
517            JupyterMessageContent::Status(status) => {
518                self.kernel.set_execution_state(&status.execution_state);
519
520                self.telemetry.report_repl_event(
521                    self.kernel_specification.kernelspec.language.clone(),
522                    KernelStatus::from(&self.kernel).to_string(),
523                    cx.entity_id().to_string(),
524                );
525
526                cx.notify();
527            }
528            JupyterMessageContent::KernelInfoReply(reply) => {
529                self.kernel.set_kernel_info(&reply);
530                cx.notify();
531            }
532            _ => {}
533        }
534
535        if let Some(block) = self.blocks.get_mut(parent_message_id) {
536            block.handle_message(&message, cx);
537            return;
538        }
539    }
540
541    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
542        match &mut self.kernel {
543            Kernel::RunningKernel(_kernel) => {
544                self.send(InterruptRequest {}.into(), cx).ok();
545            }
546            Kernel::StartingKernel(_task) => {
547                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
548            }
549            _ => {}
550        }
551    }
552
553    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
554        if let Kernel::Shutdown = kernel {
555            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
556        }
557
558        let kernel_status = KernelStatus::from(&kernel).to_string();
559        let kernel_language = self.kernel_specification.kernelspec.language.clone();
560
561        self.telemetry.report_repl_event(
562            kernel_language,
563            kernel_status,
564            cx.entity_id().to_string(),
565        );
566
567        self.kernel = kernel;
568    }
569
570    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
571        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
572
573        match kernel {
574            Kernel::RunningKernel(mut kernel) => {
575                let mut request_tx = kernel.request_tx.clone();
576
577                cx.spawn(|this, mut cx| async move {
578                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
579                    request_tx.try_send(message).ok();
580
581                    // Give the kernel a bit of time to clean up
582                    cx.background_executor().timer(Duration::from_secs(3)).await;
583
584                    kernel.process.kill().ok();
585
586                    this.update(&mut cx, |session, cx| {
587                        session.clear_outputs(cx);
588                        session.kernel(Kernel::Shutdown, cx);
589                        cx.notify();
590                    })
591                    .ok();
592                })
593                .detach();
594            }
595            Kernel::StartingKernel(_kernel) => {
596                self.kernel = Kernel::Shutdown;
597            }
598            _ => {
599                self.kernel = Kernel::Shutdown;
600            }
601        }
602        cx.notify();
603    }
604}
605
606pub enum SessionEvent {
607    Shutdown(WeakView<Editor>),
608}
609
610impl EventEmitter<SessionEvent> for Session {}
611
612impl Render for Session {
613    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
614        let (status_text, interrupt_button) = match &self.kernel {
615            Kernel::RunningKernel(kernel) => (
616                kernel
617                    .kernel_info
618                    .as_ref()
619                    .map(|info| info.language_info.name.clone()),
620                Some(
621                    Button::new("interrupt", "Interrupt")
622                        .style(ButtonStyle::Subtle)
623                        .on_click(cx.listener(move |session, _, cx| {
624                            session.interrupt(cx);
625                        })),
626                ),
627            ),
628            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
629            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
630            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
631            Kernel::Shutdown => (Some("Shutdown".into()), None),
632        };
633
634        KernelListItem::new(self.kernel_specification.clone())
635            .status_color(match &self.kernel {
636                Kernel::RunningKernel(kernel) => match kernel.execution_state {
637                    ExecutionState::Idle => Color::Success,
638                    ExecutionState::Busy => Color::Modified,
639                },
640                Kernel::StartingKernel(_) => Color::Modified,
641                Kernel::ErroredLaunch(_) => Color::Error,
642                Kernel::ShuttingDown => Color::Modified,
643                Kernel::Shutdown => Color::Disabled,
644            })
645            .child(Label::new(self.kernel_specification.name.clone()))
646            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
647            .button(
648                Button::new("shutdown", "Shutdown")
649                    .style(ButtonStyle::Subtle)
650                    .disabled(self.kernel.is_shutting_down())
651                    .on_click(cx.listener(move |session, _, cx| {
652                        session.shutdown(cx);
653                    })),
654            )
655            .buttons(interrupt_button)
656    }
657}