session.rs

  1use crate::components::KernelListItem;
  2use crate::kernels::RemoteRunningKernel;
  3use crate::setup_editor_session_actions;
  4use crate::{
  5    KernelStatus,
  6    kernels::{Kernel, KernelSpecification, NativeRunningKernel},
  7    outputs::{ExecutionStatus, ExecutionView},
  8};
  9use anyhow::Context as _;
 10use collections::{HashMap, HashSet};
 11use editor::SelectionEffects;
 12use editor::{
 13    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 14    display_map::{
 15        BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
 16        RenderBlock,
 17    },
 18    scroll::Autoscroll,
 19};
 20use futures::FutureExt as _;
 21use gpui::{
 22    Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
 23};
 24use language::Point;
 25use project::Fs;
 26use runtimelib::{
 27    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 28    ShutdownRequest,
 29};
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::ActiveTheme;
 32use ui::{IconButtonShape, Tooltip, prelude::*};
 33use util::ResultExt as _;
 34
 35pub struct Session {
 36    fs: Arc<dyn Fs>,
 37    editor: WeakEntity<Editor>,
 38    pub kernel: Kernel,
 39    blocks: HashMap<String, EditorBlock>,
 40    pub kernel_specification: KernelSpecification,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: Entity<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakEntity<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut Context<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let editor = editor.upgrade().context("editor is not open")?;
 63        let workspace = editor.read(cx).workspace().context("workspace dropped")?;
 64
 65        let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
 66
 67        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 68            let buffer = editor.buffer().clone();
 69            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 70            let end_point = code_range.end.to_point(&buffer_snapshot);
 71            let next_row_start = end_point + Point::new(1, 0);
 72            if next_row_start > buffer_snapshot.max_point() {
 73                buffer.update(cx, |buffer, cx| {
 74                    buffer.edit(
 75                        [(
 76                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 77                            "\n",
 78                        )],
 79                        None,
 80                        cx,
 81                    )
 82                });
 83            }
 84
 85            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 86            let block = BlockProperties {
 87                placement: BlockPlacement::Below(code_range.end),
 88                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 89                height: Some(1),
 90                style: BlockStyle::Sticky,
 91                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 92                priority: 0,
 93                render_in_minimap: false,
 94            };
 95
 96            let block_id = editor.insert_blocks([block], None, cx)[0];
 97            (block_id, invalidation_anchor)
 98        });
 99
100        anyhow::Ok(Self {
101            code_range,
102            invalidation_anchor,
103            block_id,
104            execution_view,
105        })
106    }
107
108    fn handle_message(
109        &mut self,
110        message: &JupyterMessage,
111        window: &mut Window,
112        cx: &mut Context<Session>,
113    ) {
114        self.execution_view.update(cx, |execution_view, cx| {
115            execution_view.push_message(&message.content, window, cx);
116        });
117    }
118
119    fn create_output_area_renderer(
120        execution_view: Entity<ExecutionView>,
121        on_close: CloseBlockFn,
122    ) -> RenderBlock {
123        Arc::new(move |cx: &mut BlockContext| {
124            let execution_view = execution_view.clone();
125            let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
126
127            let editor_margins = cx.margins;
128            let gutter = editor_margins.gutter;
129
130            let block_id = cx.block_id;
131            let on_close = on_close.clone();
132
133            let rem_size = cx.window.rem_size();
134
135            let text_line_height = text_style.line_height_in_pixels(rem_size);
136
137            let close_button = h_flex()
138                .flex_none()
139                .items_center()
140                .justify_center()
141                .absolute()
142                .top(text_line_height / 2.)
143                .right(
144                    // 2px is a magic number to nudge the button just a bit closer to
145                    // the line number start
146                    gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
147                )
148                .w(text_line_height)
149                .h(text_line_height)
150                .child(
151                    IconButton::new("close_output_area", IconName::Close)
152                        .icon_size(IconSize::Small)
153                        .icon_color(Color::Muted)
154                        .size(ButtonSize::Compact)
155                        .shape(IconButtonShape::Square)
156                        .tooltip(Tooltip::text("Close output area"))
157                        .on_click(move |_, window, cx| {
158                            if let BlockId::Custom(block_id) = block_id {
159                                (on_close)(block_id, window, cx)
160                            }
161                        }),
162                );
163
164            div()
165                .id(cx.block_id)
166                .block_mouse_except_scroll()
167                .flex()
168                .items_start()
169                .min_h(text_line_height)
170                .w_full()
171                .border_y_1()
172                .border_color(cx.theme().colors().border)
173                .bg(cx.theme().colors().background)
174                .child(
175                    div()
176                        .relative()
177                        .w(gutter.full_width())
178                        .h(text_line_height * 2)
179                        .child(close_button),
180                )
181                .child(
182                    div()
183                        .flex_1()
184                        .size_full()
185                        .py(text_line_height / 2.)
186                        .mr(editor_margins.right)
187                        .pr_2()
188                        .child(execution_view),
189                )
190                .into_any_element()
191        })
192    }
193}
194
195impl Session {
196    pub fn new(
197        editor: WeakEntity<Editor>,
198        fs: Arc<dyn Fs>,
199        kernel_specification: KernelSpecification,
200        window: &mut Window,
201        cx: &mut Context<Self>,
202    ) -> Self {
203        let subscription = match editor.upgrade() {
204            Some(editor) => {
205                let buffer = editor.read(cx).buffer().clone();
206                cx.subscribe(&buffer, Self::on_buffer_event)
207            }
208            None => Subscription::new(|| {}),
209        };
210
211        let editor_handle = editor.clone();
212
213        editor
214            .update(cx, |editor, _cx| {
215                setup_editor_session_actions(editor, editor_handle);
216            })
217            .ok();
218
219        let mut session = Self {
220            fs,
221            editor,
222            kernel: Kernel::StartingKernel(Task::ready(()).shared()),
223            blocks: HashMap::default(),
224            kernel_specification,
225            _buffer_subscription: subscription,
226        };
227
228        session.start_kernel(window, cx);
229        session
230    }
231
232    fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
233        let kernel_language = self.kernel_specification.language();
234        let entity_id = self.editor.entity_id();
235        let working_directory = self
236            .editor
237            .upgrade()
238            .and_then(|editor| editor.read(cx).working_directory(cx))
239            .unwrap_or_else(temp_dir);
240
241        telemetry::event!(
242            "Kernel Status Changed",
243            kernel_language,
244            kernel_status = KernelStatus::Starting.to_string(),
245            repl_session_id = cx.entity_id().to_string(),
246        );
247
248        let session_view = cx.entity().clone();
249
250        let kernel = match self.kernel_specification.clone() {
251            KernelSpecification::Jupyter(kernel_specification)
252            | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
253                kernel_specification,
254                entity_id,
255                working_directory,
256                self.fs.clone(),
257                session_view,
258                window,
259                cx,
260            ),
261            KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
262                remote_kernel_specification,
263                working_directory,
264                session_view,
265                window,
266                cx,
267            ),
268        };
269
270        let pending_kernel = cx
271            .spawn(async move |this, cx| {
272                let kernel = kernel.await;
273
274                match kernel {
275                    Ok(kernel) => {
276                        this.update(cx, |session, cx| {
277                            session.kernel(Kernel::RunningKernel(kernel), cx);
278                        })
279                        .ok();
280                    }
281                    Err(err) => {
282                        this.update(cx, |session, cx| {
283                            session.kernel_errored(err.to_string(), cx);
284                        })
285                        .ok();
286                    }
287                }
288            })
289            .shared();
290
291        self.kernel(Kernel::StartingKernel(pending_kernel), cx);
292        cx.notify();
293    }
294
295    pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
296        self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
297
298        self.blocks.values().for_each(|block| {
299            block.execution_view.update(cx, |execution_view, cx| {
300                match execution_view.status {
301                    ExecutionStatus::Finished => {
302                        // Do nothing when the output was good
303                    }
304                    _ => {
305                        // All other cases, set the status to errored
306                        execution_view.status =
307                            ExecutionStatus::KernelErrored(error_message.clone())
308                    }
309                }
310                cx.notify();
311            });
312        });
313    }
314
315    fn on_buffer_event(
316        &mut self,
317        buffer: Entity<MultiBuffer>,
318        event: &multi_buffer::Event,
319        cx: &mut Context<Self>,
320    ) {
321        if let multi_buffer::Event::Edited { .. } = event {
322            let snapshot = buffer.read(cx).snapshot(cx);
323
324            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
325
326            self.blocks.retain(|_id, block| {
327                if block.invalidation_anchor.is_valid(&snapshot) {
328                    true
329                } else {
330                    blocks_to_remove.insert(block.block_id);
331                    false
332                }
333            });
334
335            if !blocks_to_remove.is_empty() {
336                self.editor
337                    .update(cx, |editor, cx| {
338                        editor.remove_blocks(blocks_to_remove, None, cx);
339                    })
340                    .ok();
341                cx.notify();
342            }
343        }
344    }
345
346    fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
347        if let Kernel::RunningKernel(kernel) = &mut self.kernel {
348            kernel.request_tx().try_send(message).ok();
349        }
350
351        anyhow::Ok(())
352    }
353
354    pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
355        let blocks_to_remove: HashSet<CustomBlockId> =
356            self.blocks.values().map(|block| block.block_id).collect();
357
358        self.editor
359            .update(cx, |editor, cx| {
360                editor.remove_blocks(blocks_to_remove, None, cx);
361            })
362            .ok();
363
364        self.blocks.clear();
365    }
366
367    pub fn execute(
368        &mut self,
369        code: String,
370        anchor_range: Range<Anchor>,
371        next_cell: Option<Anchor>,
372        move_down: bool,
373        window: &mut Window,
374        cx: &mut Context<Self>,
375    ) {
376        let Some(editor) = self.editor.upgrade() else {
377            return;
378        };
379
380        if code.is_empty() {
381            return;
382        }
383
384        let execute_request = ExecuteRequest {
385            code,
386            ..ExecuteRequest::default()
387        };
388
389        let message: JupyterMessage = execute_request.into();
390
391        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
392
393        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
394
395        self.blocks.retain(|_key, block| {
396            if anchor_range.overlaps(&block.code_range, &buffer) {
397                blocks_to_remove.insert(block.block_id);
398                false
399            } else {
400                true
401            }
402        });
403
404        self.editor
405            .update(cx, |editor, cx| {
406                editor.remove_blocks(blocks_to_remove, None, cx);
407            })
408            .ok();
409
410        let status = match &self.kernel {
411            Kernel::Restarting => ExecutionStatus::Restarting,
412            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
413            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
414            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
415            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
416            Kernel::Shutdown => ExecutionStatus::Shutdown,
417        };
418
419        let parent_message_id = message.header.msg_id.clone();
420        let session_view = cx.entity().downgrade();
421        let weak_editor = self.editor.clone();
422
423        let on_close: CloseBlockFn = Arc::new(
424            move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
425                if let Some(session) = session_view.upgrade() {
426                    session.update(cx, |session, cx| {
427                        session.blocks.remove(&parent_message_id);
428                        cx.notify();
429                    });
430                }
431
432                if let Some(editor) = weak_editor.upgrade() {
433                    editor.update(cx, |editor, cx| {
434                        let mut block_ids = HashSet::default();
435                        block_ids.insert(block_id);
436                        editor.remove_blocks(block_ids, None, cx);
437                    });
438                }
439            },
440        );
441
442        let Ok(editor_block) =
443            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
444        else {
445            return;
446        };
447
448        let new_cursor_pos = if let Some(next_cursor) = next_cell {
449            next_cursor
450        } else {
451            editor_block.invalidation_anchor
452        };
453
454        self.blocks
455            .insert(message.header.msg_id.clone(), editor_block);
456
457        match &self.kernel {
458            Kernel::RunningKernel(_) => {
459                self.send(message, cx).ok();
460            }
461            Kernel::StartingKernel(task) => {
462                // Queue up the execution as a task to run after the kernel starts
463                let task = task.clone();
464                let message = message.clone();
465
466                cx.spawn(async move |this, cx| {
467                    task.await;
468                    this.update(cx, |session, cx| {
469                        session.send(message, cx).ok();
470                    })
471                    .ok();
472                })
473                .detach();
474            }
475            _ => {}
476        }
477
478        if move_down {
479            editor.update(cx, move |editor, cx| {
480                editor.change_selections(
481                    SelectionEffects::scroll(Autoscroll::top_relative(8)),
482                    window,
483                    cx,
484                    |selections| {
485                        selections.select_ranges([new_cursor_pos..new_cursor_pos]);
486                    },
487                );
488            });
489        }
490    }
491
492    pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
493        let parent_message_id = match message.parent_header.as_ref() {
494            Some(header) => &header.msg_id,
495            None => return,
496        };
497
498        match &message.content {
499            JupyterMessageContent::Status(status) => {
500                self.kernel.set_execution_state(&status.execution_state);
501
502                telemetry::event!(
503                    "Kernel Status Changed",
504                    kernel_language = self.kernel_specification.language(),
505                    kernel_status = KernelStatus::from(&self.kernel).to_string(),
506                    repl_session_id = cx.entity_id().to_string(),
507                );
508
509                cx.notify();
510            }
511            JupyterMessageContent::KernelInfoReply(reply) => {
512                self.kernel.set_kernel_info(reply);
513                cx.notify();
514            }
515            JupyterMessageContent::UpdateDisplayData(update) => {
516                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
517                    display_id
518                } else {
519                    return;
520                };
521
522                self.blocks.iter_mut().for_each(|(_, block)| {
523                    block.execution_view.update(cx, |execution_view, cx| {
524                        execution_view.update_display_data(&update.data, &display_id, window, cx);
525                    });
526                });
527                return;
528            }
529            _ => {}
530        }
531
532        if let Some(block) = self.blocks.get_mut(parent_message_id) {
533            block.handle_message(message, window, cx);
534        }
535    }
536
537    pub fn interrupt(&mut self, cx: &mut Context<Self>) {
538        match &mut self.kernel {
539            Kernel::RunningKernel(_kernel) => {
540                self.send(InterruptRequest {}.into(), cx).ok();
541            }
542            Kernel::StartingKernel(_task) => {
543                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
544            }
545            _ => {}
546        }
547    }
548
549    pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
550        if let Kernel::Shutdown = kernel {
551            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
552        }
553
554        let kernel_status = KernelStatus::from(&kernel).to_string();
555        let kernel_language = self.kernel_specification.language();
556
557        telemetry::event!(
558            "Kernel Status Changed",
559            kernel_language,
560            kernel_status,
561            repl_session_id = cx.entity_id().to_string(),
562        );
563
564        self.kernel = kernel;
565    }
566
567    pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
568        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
569
570        match kernel {
571            Kernel::RunningKernel(mut kernel) => {
572                let mut request_tx = kernel.request_tx().clone();
573
574                let forced = kernel.force_shutdown(window, cx);
575
576                cx.spawn(async move |this, cx| {
577                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
578                    request_tx.try_send(message).ok();
579
580                    forced.await.log_err();
581
582                    // Give the kernel a bit of time to clean up
583                    cx.background_executor().timer(Duration::from_secs(3)).await;
584
585                    this.update(cx, |session, cx| {
586                        session.clear_outputs(cx);
587                        session.kernel(Kernel::Shutdown, cx);
588                        cx.notify();
589                    })
590                    .ok();
591                })
592                .detach();
593            }
594            _ => {
595                self.kernel(Kernel::Shutdown, cx);
596            }
597        }
598        cx.notify();
599    }
600
601    pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
602        let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
603
604        match kernel {
605            Kernel::Restarting => {
606                // Do nothing if already restarting
607            }
608            Kernel::RunningKernel(mut kernel) => {
609                let mut request_tx = kernel.request_tx().clone();
610
611                let forced = kernel.force_shutdown(window, cx);
612
613                cx.spawn_in(window, async move |this, cx| {
614                    // Send shutdown request with restart flag
615                    log::debug!("restarting kernel");
616                    let message: JupyterMessage = ShutdownRequest { restart: true }.into();
617                    request_tx.try_send(message).ok();
618
619                    // Wait for kernel to shutdown
620                    cx.background_executor().timer(Duration::from_secs(1)).await;
621
622                    // Force kill the kernel if it hasn't shut down
623                    forced.await.log_err();
624
625                    // Start a new kernel
626                    this.update_in(cx, |session, window, cx| {
627                        // TODO: Differentiate between restart and restart+clear-outputs
628                        session.clear_outputs(cx);
629                        session.start_kernel(window, cx);
630                    })
631                    .ok();
632                })
633                .detach();
634            }
635            _ => {
636                self.clear_outputs(cx);
637                self.start_kernel(window, cx);
638            }
639        }
640        cx.notify();
641    }
642}
643
644pub enum SessionEvent {
645    Shutdown(WeakEntity<Editor>),
646}
647
648impl EventEmitter<SessionEvent> for Session {}
649
650impl Render for Session {
651    fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
652        let (status_text, interrupt_button) = match &self.kernel {
653            Kernel::RunningKernel(kernel) => (
654                kernel
655                    .kernel_info()
656                    .as_ref()
657                    .map(|info| info.language_info.name.clone()),
658                Some(
659                    Button::new("interrupt", "Interrupt")
660                        .style(ButtonStyle::Subtle)
661                        .on_click(cx.listener(move |session, _, _, cx| {
662                            session.interrupt(cx);
663                        })),
664                ),
665            ),
666            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
667            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
668            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
669            Kernel::Shutdown => (Some("Shutdown".into()), None),
670            Kernel::Restarting => (Some("Restarting".into()), None),
671        };
672
673        KernelListItem::new(self.kernel_specification.clone())
674            .status_color(match &self.kernel {
675                Kernel::RunningKernel(kernel) => match kernel.execution_state() {
676                    ExecutionState::Idle => Color::Success,
677                    ExecutionState::Busy => Color::Modified,
678                },
679                Kernel::StartingKernel(_) => Color::Modified,
680                Kernel::ErroredLaunch(_) => Color::Error,
681                Kernel::ShuttingDown => Color::Modified,
682                Kernel::Shutdown => Color::Disabled,
683                Kernel::Restarting => Color::Modified,
684            })
685            .child(Label::new(self.kernel_specification.name()))
686            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
687            .button(
688                Button::new("shutdown", "Shutdown")
689                    .style(ButtonStyle::Subtle)
690                    .disabled(self.kernel.is_shutting_down())
691                    .on_click(cx.listener(move |session, _, window, cx| {
692                        session.shutdown(window, cx);
693                    })),
694            )
695            .buttons(interrupt_button)
696    }
697}