session.rs

  1use crate::components::KernelListItem;
  2use crate::kernels::RemoteRunningKernel;
  3use crate::setup_editor_session_actions;
  4use crate::{
  5    kernels::{Kernel, KernelSpecification, NativeRunningKernel},
  6    outputs::{ExecutionStatus, ExecutionView},
  7    KernelStatus,
  8};
  9use client::telemetry::Telemetry;
 10use collections::{HashMap, HashSet};
 11use editor::{
 12    display_map::{
 13        BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
 14        RenderBlock,
 15    },
 16    scroll::Autoscroll,
 17    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 18};
 19use futures::FutureExt as _;
 20use gpui::{
 21    div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 30use theme::ActiveTheme;
 31use ui::{prelude::*, IconButtonShape, Tooltip};
 32use util::ResultExt as _;
 33
 34pub struct Session {
 35    fs: Arc<dyn Fs>,
 36    editor: WeakView<Editor>,
 37    pub kernel: Kernel,
 38    blocks: HashMap<String, EditorBlock>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: View<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakView<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut ViewContext<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let editor = editor
 63            .upgrade()
 64            .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
 65        let workspace = editor
 66            .read(cx)
 67            .workspace()
 68            .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
 69
 70        let execution_view =
 71            cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
 72
 73        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 74            let buffer = editor.buffer().clone();
 75            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 76            let end_point = code_range.end.to_point(&buffer_snapshot);
 77            let next_row_start = end_point + Point::new(1, 0);
 78            if next_row_start > buffer_snapshot.max_point() {
 79                buffer.update(cx, |buffer, cx| {
 80                    buffer.edit(
 81                        [(
 82                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 83                            "\n",
 84                        )],
 85                        None,
 86                        cx,
 87                    )
 88                });
 89            }
 90
 91            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 92            let block = BlockProperties {
 93                placement: BlockPlacement::Below(code_range.end),
 94                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 95                height: 1,
 96                style: BlockStyle::Sticky,
 97                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 98                priority: 0,
 99            };
100
101            let block_id = editor.insert_blocks([block], None, cx)[0];
102            (block_id, invalidation_anchor)
103        });
104
105        anyhow::Ok(Self {
106            code_range,
107            invalidation_anchor,
108            block_id,
109            execution_view,
110        })
111    }
112
113    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
114        self.execution_view.update(cx, |execution_view, cx| {
115            execution_view.push_message(&message.content, cx);
116        });
117    }
118
119    fn create_output_area_renderer(
120        execution_view: View<ExecutionView>,
121        on_close: CloseBlockFn,
122    ) -> RenderBlock {
123        Arc::new(move |cx: &mut BlockContext| {
124            let execution_view = execution_view.clone();
125            let text_style = crate::outputs::plain::text_style(cx);
126
127            let gutter = cx.gutter_dimensions;
128
129            let block_id = cx.block_id;
130            let on_close = on_close.clone();
131
132            let rem_size = cx.rem_size();
133
134            let text_line_height = text_style.line_height_in_pixels(rem_size);
135
136            let close_button = h_flex()
137                .flex_none()
138                .items_center()
139                .justify_center()
140                .absolute()
141                .top(text_line_height / 2.)
142                .right(
143                    // 2px is a magic number to nudge the button just a bit closer to
144                    // the line number start
145                    gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
146                )
147                .w(text_line_height)
148                .h(text_line_height)
149                .child(
150                    IconButton::new("close_output_area", IconName::Close)
151                        .icon_size(IconSize::Small)
152                        .icon_color(Color::Muted)
153                        .size(ButtonSize::Compact)
154                        .shape(IconButtonShape::Square)
155                        .tooltip(|cx| Tooltip::text("Close output area", cx))
156                        .on_click(move |_, cx| {
157                            if let BlockId::Custom(block_id) = block_id {
158                                (on_close)(block_id, cx)
159                            }
160                        }),
161                );
162
163            div()
164                .id(cx.block_id)
165                .block_mouse_down()
166                .flex()
167                .items_start()
168                .min_h(text_line_height)
169                .w_full()
170                .border_y_1()
171                .border_color(cx.theme().colors().border)
172                .bg(cx.theme().colors().background)
173                .child(
174                    div()
175                        .relative()
176                        .w(gutter.full_width())
177                        .h(text_line_height * 2)
178                        .child(close_button),
179                )
180                .child(
181                    div()
182                        .flex_1()
183                        .size_full()
184                        .py(text_line_height / 2.)
185                        .mr(gutter.width)
186                        .child(execution_view),
187                )
188                .into_any_element()
189        })
190    }
191}
192
193impl Session {
194    pub fn new(
195        editor: WeakView<Editor>,
196        fs: Arc<dyn Fs>,
197        telemetry: Arc<Telemetry>,
198        kernel_specification: KernelSpecification,
199        cx: &mut ViewContext<Self>,
200    ) -> Self {
201        let subscription = match editor.upgrade() {
202            Some(editor) => {
203                let buffer = editor.read(cx).buffer().clone();
204                cx.subscribe(&buffer, Self::on_buffer_event)
205            }
206            None => Subscription::new(|| {}),
207        };
208
209        let editor_handle = editor.clone();
210
211        editor
212            .update(cx, |editor, _cx| {
213                setup_editor_session_actions(editor, editor_handle);
214            })
215            .ok();
216
217        let mut session = Self {
218            fs,
219            editor,
220            kernel: Kernel::StartingKernel(Task::ready(()).shared()),
221            blocks: HashMap::default(),
222            kernel_specification,
223            _buffer_subscription: subscription,
224            telemetry,
225        };
226
227        session.start_kernel(cx);
228        session
229    }
230
231    fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
232        let kernel_language = self.kernel_specification.language();
233        let entity_id = self.editor.entity_id();
234        let working_directory = self
235            .editor
236            .upgrade()
237            .and_then(|editor| editor.read(cx).working_directory(cx))
238            .unwrap_or_else(temp_dir);
239
240        self.telemetry.report_repl_event(
241            kernel_language.into(),
242            KernelStatus::Starting.to_string(),
243            cx.entity_id().to_string(),
244        );
245
246        let session_view = cx.view().clone();
247
248        let kernel = match self.kernel_specification.clone() {
249            KernelSpecification::Jupyter(kernel_specification)
250            | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
251                kernel_specification,
252                entity_id,
253                working_directory,
254                self.fs.clone(),
255                session_view,
256                cx,
257            ),
258            KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
259                remote_kernel_specification,
260                working_directory,
261                session_view,
262                cx,
263            ),
264        };
265
266        let pending_kernel = cx
267            .spawn(|this, mut cx| async move {
268                let kernel = kernel.await;
269
270                match kernel {
271                    Ok(kernel) => {
272                        this.update(&mut cx, |session, cx| {
273                            session.kernel(Kernel::RunningKernel(kernel), cx);
274                        })
275                        .ok();
276                    }
277                    Err(err) => {
278                        this.update(&mut cx, |session, cx| {
279                            session.kernel_errored(err.to_string(), cx);
280                        })
281                        .ok();
282                    }
283                }
284            })
285            .shared();
286
287        self.kernel(Kernel::StartingKernel(pending_kernel), cx);
288        cx.notify();
289    }
290
291    pub fn kernel_errored(&mut self, error_message: String, cx: &mut ViewContext<Self>) {
292        self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
293
294        self.blocks.values().for_each(|block| {
295            block.execution_view.update(cx, |execution_view, cx| {
296                match execution_view.status {
297                    ExecutionStatus::Finished => {
298                        // Do nothing when the output was good
299                    }
300                    _ => {
301                        // All other cases, set the status to errored
302                        execution_view.status =
303                            ExecutionStatus::KernelErrored(error_message.clone())
304                    }
305                }
306                cx.notify();
307            });
308        });
309    }
310
311    fn on_buffer_event(
312        &mut self,
313        buffer: Model<MultiBuffer>,
314        event: &multi_buffer::Event,
315        cx: &mut ViewContext<Self>,
316    ) {
317        if let multi_buffer::Event::Edited { .. } = event {
318            let snapshot = buffer.read(cx).snapshot(cx);
319
320            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
321
322            self.blocks.retain(|_id, block| {
323                if block.invalidation_anchor.is_valid(&snapshot) {
324                    true
325                } else {
326                    blocks_to_remove.insert(block.block_id);
327                    false
328                }
329            });
330
331            if !blocks_to_remove.is_empty() {
332                self.editor
333                    .update(cx, |editor, cx| {
334                        editor.remove_blocks(blocks_to_remove, None, cx);
335                    })
336                    .ok();
337                cx.notify();
338            }
339        }
340    }
341
342    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
343        if let Kernel::RunningKernel(kernel) = &mut self.kernel {
344            kernel.request_tx().try_send(message).ok();
345        }
346
347        anyhow::Ok(())
348    }
349
350    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
351        let blocks_to_remove: HashSet<CustomBlockId> =
352            self.blocks.values().map(|block| block.block_id).collect();
353
354        self.editor
355            .update(cx, |editor, cx| {
356                editor.remove_blocks(blocks_to_remove, None, cx);
357            })
358            .ok();
359
360        self.blocks.clear();
361    }
362
363    pub fn execute(
364        &mut self,
365        code: String,
366        anchor_range: Range<Anchor>,
367        next_cell: Option<Anchor>,
368        move_down: bool,
369        cx: &mut ViewContext<Self>,
370    ) {
371        let Some(editor) = self.editor.upgrade() else {
372            return;
373        };
374
375        if code.is_empty() {
376            return;
377        }
378
379        let execute_request = ExecuteRequest {
380            code,
381            ..ExecuteRequest::default()
382        };
383
384        let message: JupyterMessage = execute_request.into();
385
386        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
387
388        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
389
390        self.blocks.retain(|_key, block| {
391            if anchor_range.overlaps(&block.code_range, &buffer) {
392                blocks_to_remove.insert(block.block_id);
393                false
394            } else {
395                true
396            }
397        });
398
399        self.editor
400            .update(cx, |editor, cx| {
401                editor.remove_blocks(blocks_to_remove, None, cx);
402            })
403            .ok();
404
405        let status = match &self.kernel {
406            Kernel::Restarting => ExecutionStatus::Restarting,
407            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
408            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
409            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
410            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
411            Kernel::Shutdown => ExecutionStatus::Shutdown,
412        };
413
414        let parent_message_id = message.header.msg_id.clone();
415        let session_view = cx.view().downgrade();
416        let weak_editor = self.editor.clone();
417
418        let on_close: CloseBlockFn =
419            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
420                if let Some(session) = session_view.upgrade() {
421                    session.update(cx, |session, cx| {
422                        session.blocks.remove(&parent_message_id);
423                        cx.notify();
424                    });
425                }
426
427                if let Some(editor) = weak_editor.upgrade() {
428                    editor.update(cx, |editor, cx| {
429                        let mut block_ids = HashSet::default();
430                        block_ids.insert(block_id);
431                        editor.remove_blocks(block_ids, None, cx);
432                    });
433                }
434            });
435
436        let Ok(editor_block) =
437            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
438        else {
439            return;
440        };
441
442        let new_cursor_pos = if let Some(next_cursor) = next_cell {
443            next_cursor
444        } else {
445            editor_block.invalidation_anchor
446        };
447
448        self.blocks
449            .insert(message.header.msg_id.clone(), editor_block);
450
451        match &self.kernel {
452            Kernel::RunningKernel(_) => {
453                self.send(message, cx).ok();
454            }
455            Kernel::StartingKernel(task) => {
456                // Queue up the execution as a task to run after the kernel starts
457                let task = task.clone();
458                let message = message.clone();
459
460                cx.spawn(|this, mut cx| async move {
461                    task.await;
462                    this.update(&mut cx, |session, cx| {
463                        session.send(message, cx).ok();
464                    })
465                    .ok();
466                })
467                .detach();
468            }
469            _ => {}
470        }
471
472        if move_down {
473            editor.update(cx, move |editor, cx| {
474                editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
475                    selections.select_ranges([new_cursor_pos..new_cursor_pos]);
476                });
477            });
478        }
479    }
480
481    pub fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
482        let parent_message_id = match message.parent_header.as_ref() {
483            Some(header) => &header.msg_id,
484            None => return,
485        };
486
487        match &message.content {
488            JupyterMessageContent::Status(status) => {
489                self.kernel.set_execution_state(&status.execution_state);
490
491                self.telemetry.report_repl_event(
492                    self.kernel_specification.language().into(),
493                    KernelStatus::from(&self.kernel).to_string(),
494                    cx.entity_id().to_string(),
495                );
496
497                cx.notify();
498            }
499            JupyterMessageContent::KernelInfoReply(reply) => {
500                self.kernel.set_kernel_info(reply);
501                cx.notify();
502            }
503            JupyterMessageContent::UpdateDisplayData(update) => {
504                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
505                    display_id
506                } else {
507                    return;
508                };
509
510                self.blocks.iter_mut().for_each(|(_, block)| {
511                    block.execution_view.update(cx, |execution_view, cx| {
512                        execution_view.update_display_data(&update.data, &display_id, cx);
513                    });
514                });
515                return;
516            }
517            _ => {}
518        }
519
520        if let Some(block) = self.blocks.get_mut(parent_message_id) {
521            block.handle_message(message, cx);
522        }
523    }
524
525    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
526        match &mut self.kernel {
527            Kernel::RunningKernel(_kernel) => {
528                self.send(InterruptRequest {}.into(), cx).ok();
529            }
530            Kernel::StartingKernel(_task) => {
531                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
532            }
533            _ => {}
534        }
535    }
536
537    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
538        if let Kernel::Shutdown = kernel {
539            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
540        }
541
542        let kernel_status = KernelStatus::from(&kernel).to_string();
543        let kernel_language = self.kernel_specification.language().into();
544
545        self.telemetry.report_repl_event(
546            kernel_language,
547            kernel_status,
548            cx.entity_id().to_string(),
549        );
550
551        self.kernel = kernel;
552    }
553
554    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
555        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
556
557        match kernel {
558            Kernel::RunningKernel(mut kernel) => {
559                let mut request_tx = kernel.request_tx().clone();
560
561                let forced = kernel.force_shutdown(cx);
562
563                cx.spawn(|this, mut cx| async move {
564                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
565                    request_tx.try_send(message).ok();
566
567                    forced.await.log_err();
568
569                    // Give the kernel a bit of time to clean up
570                    cx.background_executor().timer(Duration::from_secs(3)).await;
571
572                    this.update(&mut cx, |session, cx| {
573                        session.clear_outputs(cx);
574                        session.kernel(Kernel::Shutdown, cx);
575                        cx.notify();
576                    })
577                    .ok();
578                })
579                .detach();
580            }
581            _ => {
582                self.kernel(Kernel::Shutdown, cx);
583            }
584        }
585        cx.notify();
586    }
587
588    pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
589        let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
590
591        match kernel {
592            Kernel::Restarting => {
593                // Do nothing if already restarting
594            }
595            Kernel::RunningKernel(mut kernel) => {
596                let mut request_tx = kernel.request_tx().clone();
597
598                let forced = kernel.force_shutdown(cx);
599
600                cx.spawn(|this, mut cx| async move {
601                    // Send shutdown request with restart flag
602                    log::debug!("restarting kernel");
603                    let message: JupyterMessage = ShutdownRequest { restart: true }.into();
604                    request_tx.try_send(message).ok();
605
606                    // Wait for kernel to shutdown
607                    cx.background_executor().timer(Duration::from_secs(1)).await;
608
609                    // Force kill the kernel if it hasn't shut down
610                    forced.await.log_err();
611
612                    // Start a new kernel
613                    this.update(&mut cx, |session, cx| {
614                        // todo!(): Differentiate between restart and restart+clear-outputs
615                        session.clear_outputs(cx);
616                        session.start_kernel(cx);
617                    })
618                    .ok();
619                })
620                .detach();
621            }
622            _ => {
623                self.clear_outputs(cx);
624                self.start_kernel(cx);
625            }
626        }
627        cx.notify();
628    }
629}
630
631pub enum SessionEvent {
632    Shutdown(WeakView<Editor>),
633}
634
635impl EventEmitter<SessionEvent> for Session {}
636
637impl Render for Session {
638    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
639        let (status_text, interrupt_button) = match &self.kernel {
640            Kernel::RunningKernel(kernel) => (
641                kernel
642                    .kernel_info()
643                    .as_ref()
644                    .map(|info| info.language_info.name.clone()),
645                Some(
646                    Button::new("interrupt", "Interrupt")
647                        .style(ButtonStyle::Subtle)
648                        .on_click(cx.listener(move |session, _, cx| {
649                            session.interrupt(cx);
650                        })),
651                ),
652            ),
653            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
654            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
655            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
656            Kernel::Shutdown => (Some("Shutdown".into()), None),
657            Kernel::Restarting => (Some("Restarting".into()), None),
658        };
659
660        KernelListItem::new(self.kernel_specification.clone())
661            .status_color(match &self.kernel {
662                Kernel::RunningKernel(kernel) => match kernel.execution_state() {
663                    ExecutionState::Idle => Color::Success,
664                    ExecutionState::Busy => Color::Modified,
665                },
666                Kernel::StartingKernel(_) => Color::Modified,
667                Kernel::ErroredLaunch(_) => Color::Error,
668                Kernel::ShuttingDown => Color::Modified,
669                Kernel::Shutdown => Color::Disabled,
670                Kernel::Restarting => Color::Modified,
671            })
672            .child(Label::new(self.kernel_specification.name()))
673            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
674            .button(
675                Button::new("shutdown", "Shutdown")
676                    .style(ButtonStyle::Subtle)
677                    .disabled(self.kernel.is_shutting_down())
678                    .on_click(cx.listener(move |session, _, cx| {
679                        session.shutdown(cx);
680                    })),
681            )
682            .buttons(interrupt_button)
683    }
684}