session.rs

  1use crate::components::KernelListItem;
  2use crate::kernels::RemoteRunningKernel;
  3use crate::setup_editor_session_actions;
  4use crate::{
  5    KernelStatus,
  6    kernels::{Kernel, KernelSpecification, NativeRunningKernel},
  7    outputs::{ExecutionStatus, ExecutionView},
  8};
  9use anyhow::Context as _;
 10use collections::{HashMap, HashSet};
 11use editor::SelectionEffects;
 12use editor::{
 13    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 14    display_map::{
 15        BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
 16        RenderBlock,
 17    },
 18    scroll::Autoscroll,
 19};
 20use futures::FutureExt as _;
 21use gpui::{
 22    Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
 23};
 24use language::Point;
 25use project::Fs;
 26use runtimelib::{
 27    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 28    ShutdownRequest,
 29};
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::ActiveTheme;
 32use ui::{IconButtonShape, Tooltip, prelude::*};
 33use util::ResultExt as _;
 34
 35pub struct Session {
 36    fs: Arc<dyn Fs>,
 37    editor: WeakEntity<Editor>,
 38    pub kernel: Kernel,
 39    blocks: HashMap<String, EditorBlock>,
 40    pub kernel_specification: KernelSpecification,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: Entity<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakEntity<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut Context<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let editor = editor.upgrade().context("editor is not open")?;
 63        let workspace = editor.read(cx).workspace().context("workspace dropped")?;
 64
 65        let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
 66
 67        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 68            let buffer = editor.buffer().clone();
 69            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 70            let end_point = code_range.end.to_point(&buffer_snapshot);
 71            let next_row_start = end_point + Point::new(1, 0);
 72            if next_row_start > buffer_snapshot.max_point() {
 73                buffer.update(cx, |buffer, cx| {
 74                    buffer.edit(
 75                        [(
 76                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 77                            "\n",
 78                        )],
 79                        None,
 80                        cx,
 81                    )
 82                });
 83            }
 84
 85            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 86            let block = BlockProperties {
 87                placement: BlockPlacement::Below(code_range.end),
 88                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 89                height: Some(1),
 90                style: BlockStyle::Sticky,
 91                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 92                priority: 0,
 93            };
 94
 95            let block_id = editor.insert_blocks([block], None, cx)[0];
 96            (block_id, invalidation_anchor)
 97        });
 98
 99        anyhow::Ok(Self {
100            code_range,
101            invalidation_anchor,
102            block_id,
103            execution_view,
104        })
105    }
106
107    fn handle_message(
108        &mut self,
109        message: &JupyterMessage,
110        window: &mut Window,
111        cx: &mut Context<Session>,
112    ) {
113        self.execution_view.update(cx, |execution_view, cx| {
114            execution_view.push_message(&message.content, window, cx);
115        });
116    }
117
118    fn create_output_area_renderer(
119        execution_view: Entity<ExecutionView>,
120        on_close: CloseBlockFn,
121    ) -> RenderBlock {
122        Arc::new(move |cx: &mut BlockContext| {
123            let execution_view = execution_view.clone();
124            let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
125
126            let editor_margins = cx.margins;
127            let gutter = editor_margins.gutter;
128
129            let block_id = cx.block_id;
130            let on_close = on_close.clone();
131
132            let rem_size = cx.window.rem_size();
133
134            let text_line_height = text_style.line_height_in_pixels(rem_size);
135
136            let close_button = h_flex()
137                .flex_none()
138                .items_center()
139                .justify_center()
140                .absolute()
141                .top(text_line_height / 2.)
142                .right(
143                    // 2px is a magic number to nudge the button just a bit closer to
144                    // the line number start
145                    gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
146                )
147                .w(text_line_height)
148                .h(text_line_height)
149                .child(
150                    IconButton::new("close_output_area", IconName::Close)
151                        .icon_size(IconSize::Small)
152                        .icon_color(Color::Muted)
153                        .size(ButtonSize::Compact)
154                        .shape(IconButtonShape::Square)
155                        .tooltip(Tooltip::text("Close output area"))
156                        .on_click(move |_, window, cx| {
157                            if let BlockId::Custom(block_id) = block_id {
158                                (on_close)(block_id, window, cx)
159                            }
160                        }),
161                );
162
163            div()
164                .id(cx.block_id)
165                .block_mouse_except_scroll()
166                .flex()
167                .items_start()
168                .min_h(text_line_height)
169                .w_full()
170                .border_y_1()
171                .border_color(cx.theme().colors().border)
172                .bg(cx.theme().colors().background)
173                .child(
174                    div()
175                        .relative()
176                        .w(gutter.full_width())
177                        .h(text_line_height * 2)
178                        .child(close_button),
179                )
180                .child(
181                    div()
182                        .flex_1()
183                        .size_full()
184                        .py(text_line_height / 2.)
185                        .mr(editor_margins.right)
186                        .pr_2()
187                        .child(execution_view),
188                )
189                .into_any_element()
190        })
191    }
192}
193
194impl Session {
195    pub fn new(
196        editor: WeakEntity<Editor>,
197        fs: Arc<dyn Fs>,
198        kernel_specification: KernelSpecification,
199        window: &mut Window,
200        cx: &mut Context<Self>,
201    ) -> Self {
202        let subscription = match editor.upgrade() {
203            Some(editor) => {
204                let buffer = editor.read(cx).buffer().clone();
205                cx.subscribe(&buffer, Self::on_buffer_event)
206            }
207            None => Subscription::new(|| {}),
208        };
209
210        let editor_handle = editor.clone();
211
212        editor
213            .update(cx, |editor, _cx| {
214                setup_editor_session_actions(editor, editor_handle);
215            })
216            .ok();
217
218        let mut session = Self {
219            fs,
220            editor,
221            kernel: Kernel::StartingKernel(Task::ready(()).shared()),
222            blocks: HashMap::default(),
223            kernel_specification,
224            _buffer_subscription: subscription,
225        };
226
227        session.start_kernel(window, cx);
228        session
229    }
230
231    fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
232        let kernel_language = self.kernel_specification.language();
233        let entity_id = self.editor.entity_id();
234        let working_directory = self
235            .editor
236            .upgrade()
237            .and_then(|editor| editor.read(cx).working_directory(cx))
238            .unwrap_or_else(temp_dir);
239
240        telemetry::event!(
241            "Kernel Status Changed",
242            kernel_language,
243            kernel_status = KernelStatus::Starting.to_string(),
244            repl_session_id = cx.entity_id().to_string(),
245        );
246
247        let session_view = cx.entity();
248
249        let kernel = match self.kernel_specification.clone() {
250            KernelSpecification::Jupyter(kernel_specification)
251            | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
252                kernel_specification,
253                entity_id,
254                working_directory,
255                self.fs.clone(),
256                session_view,
257                window,
258                cx,
259            ),
260            KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
261                remote_kernel_specification,
262                working_directory,
263                session_view,
264                window,
265                cx,
266            ),
267        };
268
269        let pending_kernel = cx
270            .spawn(async move |this, cx| {
271                let kernel = kernel.await;
272
273                match kernel {
274                    Ok(kernel) => {
275                        this.update(cx, |session, cx| {
276                            session.kernel(Kernel::RunningKernel(kernel), cx);
277                        })
278                        .ok();
279                    }
280                    Err(err) => {
281                        this.update(cx, |session, cx| {
282                            session.kernel_errored(err.to_string(), cx);
283                        })
284                        .ok();
285                    }
286                }
287            })
288            .shared();
289
290        self.kernel(Kernel::StartingKernel(pending_kernel), cx);
291        cx.notify();
292    }
293
294    pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
295        self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
296
297        self.blocks.values().for_each(|block| {
298            block.execution_view.update(cx, |execution_view, cx| {
299                match execution_view.status {
300                    ExecutionStatus::Finished => {
301                        // Do nothing when the output was good
302                    }
303                    _ => {
304                        // All other cases, set the status to errored
305                        execution_view.status =
306                            ExecutionStatus::KernelErrored(error_message.clone())
307                    }
308                }
309                cx.notify();
310            });
311        });
312    }
313
314    fn on_buffer_event(
315        &mut self,
316        buffer: Entity<MultiBuffer>,
317        event: &multi_buffer::Event,
318        cx: &mut Context<Self>,
319    ) {
320        if let multi_buffer::Event::Edited { .. } = event {
321            let snapshot = buffer.read(cx).snapshot(cx);
322
323            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
324
325            self.blocks.retain(|_id, block| {
326                if block.invalidation_anchor.is_valid(&snapshot) {
327                    true
328                } else {
329                    blocks_to_remove.insert(block.block_id);
330                    false
331                }
332            });
333
334            if !blocks_to_remove.is_empty() {
335                self.editor
336                    .update(cx, |editor, cx| {
337                        editor.remove_blocks(blocks_to_remove, None, cx);
338                    })
339                    .ok();
340                cx.notify();
341            }
342        }
343    }
344
345    fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
346        if let Kernel::RunningKernel(kernel) = &mut self.kernel {
347            kernel.request_tx().try_send(message).ok();
348        }
349
350        anyhow::Ok(())
351    }
352
353    pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
354        let blocks_to_remove: HashSet<CustomBlockId> =
355            self.blocks.values().map(|block| block.block_id).collect();
356
357        self.editor
358            .update(cx, |editor, cx| {
359                editor.remove_blocks(blocks_to_remove, None, cx);
360            })
361            .ok();
362
363        self.blocks.clear();
364    }
365
366    pub fn execute(
367        &mut self,
368        code: String,
369        anchor_range: Range<Anchor>,
370        next_cell: Option<Anchor>,
371        move_down: bool,
372        window: &mut Window,
373        cx: &mut Context<Self>,
374    ) {
375        let Some(editor) = self.editor.upgrade() else {
376            return;
377        };
378
379        if code.is_empty() {
380            return;
381        }
382
383        let execute_request = ExecuteRequest {
384            code,
385            ..ExecuteRequest::default()
386        };
387
388        let message: JupyterMessage = execute_request.into();
389
390        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
391
392        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
393
394        self.blocks.retain(|_key, block| {
395            if anchor_range.overlaps(&block.code_range, &buffer) {
396                blocks_to_remove.insert(block.block_id);
397                false
398            } else {
399                true
400            }
401        });
402
403        self.editor
404            .update(cx, |editor, cx| {
405                editor.remove_blocks(blocks_to_remove, None, cx);
406            })
407            .ok();
408
409        let status = match &self.kernel {
410            Kernel::Restarting => ExecutionStatus::Restarting,
411            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
412            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
413            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
414            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
415            Kernel::Shutdown => ExecutionStatus::Shutdown,
416        };
417
418        let parent_message_id = message.header.msg_id.clone();
419        let session_view = cx.entity().downgrade();
420        let weak_editor = self.editor.clone();
421
422        let on_close: CloseBlockFn = Arc::new(
423            move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
424                if let Some(session) = session_view.upgrade() {
425                    session.update(cx, |session, cx| {
426                        session.blocks.remove(&parent_message_id);
427                        cx.notify();
428                    });
429                }
430
431                if let Some(editor) = weak_editor.upgrade() {
432                    editor.update(cx, |editor, cx| {
433                        let mut block_ids = HashSet::default();
434                        block_ids.insert(block_id);
435                        editor.remove_blocks(block_ids, None, cx);
436                    });
437                }
438            },
439        );
440
441        let Ok(editor_block) =
442            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
443        else {
444            return;
445        };
446
447        let new_cursor_pos = if let Some(next_cursor) = next_cell {
448            next_cursor
449        } else {
450            editor_block.invalidation_anchor
451        };
452
453        self.blocks
454            .insert(message.header.msg_id.clone(), editor_block);
455
456        match &self.kernel {
457            Kernel::RunningKernel(_) => {
458                self.send(message, cx).ok();
459            }
460            Kernel::StartingKernel(task) => {
461                // Queue up the execution as a task to run after the kernel starts
462                let task = task.clone();
463                let message = message.clone();
464
465                cx.spawn(async move |this, cx| {
466                    task.await;
467                    this.update(cx, |session, cx| {
468                        session.send(message, cx).ok();
469                    })
470                    .ok();
471                })
472                .detach();
473            }
474            _ => {}
475        }
476
477        if move_down {
478            editor.update(cx, move |editor, cx| {
479                editor.change_selections(
480                    SelectionEffects::scroll(Autoscroll::top_relative(8)),
481                    window,
482                    cx,
483                    |selections| {
484                        selections.select_ranges([new_cursor_pos..new_cursor_pos]);
485                    },
486                );
487            });
488        }
489    }
490
491    pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
492        let parent_message_id = match message.parent_header.as_ref() {
493            Some(header) => &header.msg_id,
494            None => return,
495        };
496
497        match &message.content {
498            JupyterMessageContent::Status(status) => {
499                self.kernel.set_execution_state(&status.execution_state);
500
501                telemetry::event!(
502                    "Kernel Status Changed",
503                    kernel_language = self.kernel_specification.language(),
504                    kernel_status = KernelStatus::from(&self.kernel).to_string(),
505                    repl_session_id = cx.entity_id().to_string(),
506                );
507
508                cx.notify();
509            }
510            JupyterMessageContent::KernelInfoReply(reply) => {
511                self.kernel.set_kernel_info(reply);
512                cx.notify();
513            }
514            JupyterMessageContent::UpdateDisplayData(update) => {
515                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
516                    display_id
517                } else {
518                    return;
519                };
520
521                self.blocks.iter_mut().for_each(|(_, block)| {
522                    block.execution_view.update(cx, |execution_view, cx| {
523                        execution_view.update_display_data(&update.data, &display_id, window, cx);
524                    });
525                });
526                return;
527            }
528            _ => {}
529        }
530
531        if let Some(block) = self.blocks.get_mut(parent_message_id) {
532            block.handle_message(message, window, cx);
533        }
534    }
535
536    pub fn interrupt(&mut self, cx: &mut Context<Self>) {
537        match &mut self.kernel {
538            Kernel::RunningKernel(_kernel) => {
539                self.send(InterruptRequest {}.into(), cx).ok();
540            }
541            Kernel::StartingKernel(_task) => {
542                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
543            }
544            _ => {}
545        }
546    }
547
548    pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
549        if let Kernel::Shutdown = kernel {
550            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
551        }
552
553        let kernel_status = KernelStatus::from(&kernel).to_string();
554        let kernel_language = self.kernel_specification.language();
555
556        telemetry::event!(
557            "Kernel Status Changed",
558            kernel_language,
559            kernel_status,
560            repl_session_id = cx.entity_id().to_string(),
561        );
562
563        self.kernel = kernel;
564    }
565
566    pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
567        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
568
569        match kernel {
570            Kernel::RunningKernel(mut kernel) => {
571                let mut request_tx = kernel.request_tx().clone();
572
573                let forced = kernel.force_shutdown(window, cx);
574
575                cx.spawn(async move |this, cx| {
576                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
577                    request_tx.try_send(message).ok();
578
579                    forced.await.log_err();
580
581                    // Give the kernel a bit of time to clean up
582                    cx.background_executor().timer(Duration::from_secs(3)).await;
583
584                    this.update(cx, |session, cx| {
585                        session.clear_outputs(cx);
586                        session.kernel(Kernel::Shutdown, cx);
587                        cx.notify();
588                    })
589                    .ok();
590                })
591                .detach();
592            }
593            _ => {
594                self.kernel(Kernel::Shutdown, cx);
595            }
596        }
597        cx.notify();
598    }
599
600    pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
601        let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
602
603        match kernel {
604            Kernel::Restarting => {
605                // Do nothing if already restarting
606            }
607            Kernel::RunningKernel(mut kernel) => {
608                let mut request_tx = kernel.request_tx().clone();
609
610                let forced = kernel.force_shutdown(window, cx);
611
612                cx.spawn_in(window, async move |this, cx| {
613                    // Send shutdown request with restart flag
614                    log::debug!("restarting kernel");
615                    let message: JupyterMessage = ShutdownRequest { restart: true }.into();
616                    request_tx.try_send(message).ok();
617
618                    // Wait for kernel to shutdown
619                    cx.background_executor().timer(Duration::from_secs(1)).await;
620
621                    // Force kill the kernel if it hasn't shut down
622                    forced.await.log_err();
623
624                    // Start a new kernel
625                    this.update_in(cx, |session, window, cx| {
626                        // TODO: Differentiate between restart and restart+clear-outputs
627                        session.clear_outputs(cx);
628                        session.start_kernel(window, cx);
629                    })
630                    .ok();
631                })
632                .detach();
633            }
634            _ => {
635                self.clear_outputs(cx);
636                self.start_kernel(window, cx);
637            }
638        }
639        cx.notify();
640    }
641}
642
643pub enum SessionEvent {
644    Shutdown(WeakEntity<Editor>),
645}
646
647impl EventEmitter<SessionEvent> for Session {}
648
649impl Render for Session {
650    fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
651        let (status_text, interrupt_button) = match &self.kernel {
652            Kernel::RunningKernel(kernel) => (
653                kernel
654                    .kernel_info()
655                    .as_ref()
656                    .map(|info| info.language_info.name.clone()),
657                Some(
658                    Button::new("interrupt", "Interrupt")
659                        .style(ButtonStyle::Subtle)
660                        .on_click(cx.listener(move |session, _, _, cx| {
661                            session.interrupt(cx);
662                        })),
663                ),
664            ),
665            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
666            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
667            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
668            Kernel::Shutdown => (Some("Shutdown".into()), None),
669            Kernel::Restarting => (Some("Restarting".into()), None),
670        };
671
672        KernelListItem::new(self.kernel_specification.clone())
673            .status_color(match &self.kernel {
674                Kernel::RunningKernel(kernel) => match kernel.execution_state() {
675                    ExecutionState::Idle => Color::Success,
676                    ExecutionState::Busy => Color::Modified,
677                },
678                Kernel::StartingKernel(_) => Color::Modified,
679                Kernel::ErroredLaunch(_) => Color::Error,
680                Kernel::ShuttingDown => Color::Modified,
681                Kernel::Shutdown => Color::Disabled,
682                Kernel::Restarting => Color::Modified,
683            })
684            .child(Label::new(self.kernel_specification.name()))
685            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
686            .button(
687                Button::new("shutdown", "Shutdown")
688                    .style(ButtonStyle::Subtle)
689                    .disabled(self.kernel.is_shutting_down())
690                    .on_click(cx.listener(move |session, _, window, cx| {
691                        session.shutdown(window, cx);
692                    })),
693            )
694            .buttons(interrupt_button)
695    }
696}