session.rs

  1use crate::components::KernelListItem;
  2use crate::kernels::RemoteRunningKernel;
  3use crate::setup_editor_session_actions;
  4use crate::{
  5    kernels::{Kernel, KernelSpecification, NativeRunningKernel},
  6    outputs::{ExecutionStatus, ExecutionView},
  7    KernelStatus,
  8};
  9use collections::{HashMap, HashSet};
 10use editor::{
 11    display_map::{
 12        BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
 13        RenderBlock,
 14    },
 15    scroll::Autoscroll,
 16    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 17};
 18use futures::FutureExt as _;
 19use gpui::{
 20    div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
 21};
 22use language::Point;
 23use project::Fs;
 24use runtimelib::{
 25    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 26    ShutdownRequest,
 27};
 28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 29use theme::ActiveTheme;
 30use ui::{prelude::*, IconButtonShape, Tooltip};
 31use util::ResultExt as _;
 32
 33pub struct Session {
 34    fs: Arc<dyn Fs>,
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    pub kernel_specification: KernelSpecification,
 39    _buffer_subscription: Subscription,
 40}
 41
 42struct EditorBlock {
 43    code_range: Range<Anchor>,
 44    invalidation_anchor: Anchor,
 45    block_id: CustomBlockId,
 46    execution_view: View<ExecutionView>,
 47}
 48
 49type CloseBlockFn =
 50    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 51
 52impl EditorBlock {
 53    fn new(
 54        editor: WeakView<Editor>,
 55        code_range: Range<Anchor>,
 56        status: ExecutionStatus,
 57        on_close: CloseBlockFn,
 58        cx: &mut ViewContext<Session>,
 59    ) -> anyhow::Result<Self> {
 60        let editor = editor
 61            .upgrade()
 62            .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
 63        let workspace = editor
 64            .read(cx)
 65            .workspace()
 66            .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
 67
 68        let execution_view =
 69            cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
 70
 71        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 72            let buffer = editor.buffer().clone();
 73            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 74            let end_point = code_range.end.to_point(&buffer_snapshot);
 75            let next_row_start = end_point + Point::new(1, 0);
 76            if next_row_start > buffer_snapshot.max_point() {
 77                buffer.update(cx, |buffer, cx| {
 78                    buffer.edit(
 79                        [(
 80                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 81                            "\n",
 82                        )],
 83                        None,
 84                        cx,
 85                    )
 86                });
 87            }
 88
 89            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 90            let block = BlockProperties {
 91                placement: BlockPlacement::Below(code_range.end),
 92                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 93                height: 1,
 94                style: BlockStyle::Sticky,
 95                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 96                priority: 0,
 97            };
 98
 99            let block_id = editor.insert_blocks([block], None, cx)[0];
100            (block_id, invalidation_anchor)
101        });
102
103        anyhow::Ok(Self {
104            code_range,
105            invalidation_anchor,
106            block_id,
107            execution_view,
108        })
109    }
110
111    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
112        self.execution_view.update(cx, |execution_view, cx| {
113            execution_view.push_message(&message.content, cx);
114        });
115    }
116
117    fn create_output_area_renderer(
118        execution_view: View<ExecutionView>,
119        on_close: CloseBlockFn,
120    ) -> RenderBlock {
121        Arc::new(move |cx: &mut BlockContext| {
122            let execution_view = execution_view.clone();
123            let text_style = crate::outputs::plain::text_style(cx);
124
125            let gutter = cx.gutter_dimensions;
126
127            let block_id = cx.block_id;
128            let on_close = on_close.clone();
129
130            let rem_size = cx.rem_size();
131
132            let text_line_height = text_style.line_height_in_pixels(rem_size);
133
134            let close_button = h_flex()
135                .flex_none()
136                .items_center()
137                .justify_center()
138                .absolute()
139                .top(text_line_height / 2.)
140                .right(
141                    // 2px is a magic number to nudge the button just a bit closer to
142                    // the line number start
143                    gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
144                )
145                .w(text_line_height)
146                .h(text_line_height)
147                .child(
148                    IconButton::new("close_output_area", IconName::Close)
149                        .icon_size(IconSize::Small)
150                        .icon_color(Color::Muted)
151                        .size(ButtonSize::Compact)
152                        .shape(IconButtonShape::Square)
153                        .tooltip(|cx| Tooltip::text("Close output area", cx))
154                        .on_click(move |_, cx| {
155                            if let BlockId::Custom(block_id) = block_id {
156                                (on_close)(block_id, cx)
157                            }
158                        }),
159                );
160
161            div()
162                .id(cx.block_id)
163                .block_mouse_down()
164                .flex()
165                .items_start()
166                .min_h(text_line_height)
167                .w_full()
168                .border_y_1()
169                .border_color(cx.theme().colors().border)
170                .bg(cx.theme().colors().background)
171                .child(
172                    div()
173                        .relative()
174                        .w(gutter.full_width())
175                        .h(text_line_height * 2)
176                        .child(close_button),
177                )
178                .child(
179                    div()
180                        .flex_1()
181                        .size_full()
182                        .py(text_line_height / 2.)
183                        .mr(gutter.width)
184                        .child(execution_view),
185                )
186                .into_any_element()
187        })
188    }
189}
190
191impl Session {
192    pub fn new(
193        editor: WeakView<Editor>,
194        fs: Arc<dyn Fs>,
195        kernel_specification: KernelSpecification,
196        cx: &mut ViewContext<Self>,
197    ) -> Self {
198        let subscription = match editor.upgrade() {
199            Some(editor) => {
200                let buffer = editor.read(cx).buffer().clone();
201                cx.subscribe(&buffer, Self::on_buffer_event)
202            }
203            None => Subscription::new(|| {}),
204        };
205
206        let editor_handle = editor.clone();
207
208        editor
209            .update(cx, |editor, _cx| {
210                setup_editor_session_actions(editor, editor_handle);
211            })
212            .ok();
213
214        let mut session = Self {
215            fs,
216            editor,
217            kernel: Kernel::StartingKernel(Task::ready(()).shared()),
218            blocks: HashMap::default(),
219            kernel_specification,
220            _buffer_subscription: subscription,
221        };
222
223        session.start_kernel(cx);
224        session
225    }
226
227    fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
228        let kernel_language = self.kernel_specification.language();
229        let entity_id = self.editor.entity_id();
230        let working_directory = self
231            .editor
232            .upgrade()
233            .and_then(|editor| editor.read(cx).working_directory(cx))
234            .unwrap_or_else(temp_dir);
235
236        telemetry::event!(
237            "Kernel Status Changed",
238            kernel_language,
239            kernel_status = KernelStatus::Starting.to_string(),
240            repl_session_id = cx.entity_id().to_string(),
241        );
242
243        let session_view = cx.view().clone();
244
245        let kernel = match self.kernel_specification.clone() {
246            KernelSpecification::Jupyter(kernel_specification)
247            | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
248                kernel_specification,
249                entity_id,
250                working_directory,
251                self.fs.clone(),
252                session_view,
253                cx,
254            ),
255            KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
256                remote_kernel_specification,
257                working_directory,
258                session_view,
259                cx,
260            ),
261        };
262
263        let pending_kernel = cx
264            .spawn(|this, mut cx| async move {
265                let kernel = kernel.await;
266
267                match kernel {
268                    Ok(kernel) => {
269                        this.update(&mut cx, |session, cx| {
270                            session.kernel(Kernel::RunningKernel(kernel), cx);
271                        })
272                        .ok();
273                    }
274                    Err(err) => {
275                        this.update(&mut cx, |session, cx| {
276                            session.kernel_errored(err.to_string(), cx);
277                        })
278                        .ok();
279                    }
280                }
281            })
282            .shared();
283
284        self.kernel(Kernel::StartingKernel(pending_kernel), cx);
285        cx.notify();
286    }
287
288    pub fn kernel_errored(&mut self, error_message: String, cx: &mut ViewContext<Self>) {
289        self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
290
291        self.blocks.values().for_each(|block| {
292            block.execution_view.update(cx, |execution_view, cx| {
293                match execution_view.status {
294                    ExecutionStatus::Finished => {
295                        // Do nothing when the output was good
296                    }
297                    _ => {
298                        // All other cases, set the status to errored
299                        execution_view.status =
300                            ExecutionStatus::KernelErrored(error_message.clone())
301                    }
302                }
303                cx.notify();
304            });
305        });
306    }
307
308    fn on_buffer_event(
309        &mut self,
310        buffer: Model<MultiBuffer>,
311        event: &multi_buffer::Event,
312        cx: &mut ViewContext<Self>,
313    ) {
314        if let multi_buffer::Event::Edited { .. } = event {
315            let snapshot = buffer.read(cx).snapshot(cx);
316
317            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
318
319            self.blocks.retain(|_id, block| {
320                if block.invalidation_anchor.is_valid(&snapshot) {
321                    true
322                } else {
323                    blocks_to_remove.insert(block.block_id);
324                    false
325                }
326            });
327
328            if !blocks_to_remove.is_empty() {
329                self.editor
330                    .update(cx, |editor, cx| {
331                        editor.remove_blocks(blocks_to_remove, None, cx);
332                    })
333                    .ok();
334                cx.notify();
335            }
336        }
337    }
338
339    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
340        if let Kernel::RunningKernel(kernel) = &mut self.kernel {
341            kernel.request_tx().try_send(message).ok();
342        }
343
344        anyhow::Ok(())
345    }
346
347    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
348        let blocks_to_remove: HashSet<CustomBlockId> =
349            self.blocks.values().map(|block| block.block_id).collect();
350
351        self.editor
352            .update(cx, |editor, cx| {
353                editor.remove_blocks(blocks_to_remove, None, cx);
354            })
355            .ok();
356
357        self.blocks.clear();
358    }
359
360    pub fn execute(
361        &mut self,
362        code: String,
363        anchor_range: Range<Anchor>,
364        next_cell: Option<Anchor>,
365        move_down: bool,
366        cx: &mut ViewContext<Self>,
367    ) {
368        let Some(editor) = self.editor.upgrade() else {
369            return;
370        };
371
372        if code.is_empty() {
373            return;
374        }
375
376        let execute_request = ExecuteRequest {
377            code,
378            ..ExecuteRequest::default()
379        };
380
381        let message: JupyterMessage = execute_request.into();
382
383        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
384
385        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
386
387        self.blocks.retain(|_key, block| {
388            if anchor_range.overlaps(&block.code_range, &buffer) {
389                blocks_to_remove.insert(block.block_id);
390                false
391            } else {
392                true
393            }
394        });
395
396        self.editor
397            .update(cx, |editor, cx| {
398                editor.remove_blocks(blocks_to_remove, None, cx);
399            })
400            .ok();
401
402        let status = match &self.kernel {
403            Kernel::Restarting => ExecutionStatus::Restarting,
404            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
405            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
406            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
407            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
408            Kernel::Shutdown => ExecutionStatus::Shutdown,
409        };
410
411        let parent_message_id = message.header.msg_id.clone();
412        let session_view = cx.view().downgrade();
413        let weak_editor = self.editor.clone();
414
415        let on_close: CloseBlockFn =
416            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
417                if let Some(session) = session_view.upgrade() {
418                    session.update(cx, |session, cx| {
419                        session.blocks.remove(&parent_message_id);
420                        cx.notify();
421                    });
422                }
423
424                if let Some(editor) = weak_editor.upgrade() {
425                    editor.update(cx, |editor, cx| {
426                        let mut block_ids = HashSet::default();
427                        block_ids.insert(block_id);
428                        editor.remove_blocks(block_ids, None, cx);
429                    });
430                }
431            });
432
433        let Ok(editor_block) =
434            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
435        else {
436            return;
437        };
438
439        let new_cursor_pos = if let Some(next_cursor) = next_cell {
440            next_cursor
441        } else {
442            editor_block.invalidation_anchor
443        };
444
445        self.blocks
446            .insert(message.header.msg_id.clone(), editor_block);
447
448        match &self.kernel {
449            Kernel::RunningKernel(_) => {
450                self.send(message, cx).ok();
451            }
452            Kernel::StartingKernel(task) => {
453                // Queue up the execution as a task to run after the kernel starts
454                let task = task.clone();
455                let message = message.clone();
456
457                cx.spawn(|this, mut cx| async move {
458                    task.await;
459                    this.update(&mut cx, |session, cx| {
460                        session.send(message, cx).ok();
461                    })
462                    .ok();
463                })
464                .detach();
465            }
466            _ => {}
467        }
468
469        if move_down {
470            editor.update(cx, move |editor, cx| {
471                editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
472                    selections.select_ranges([new_cursor_pos..new_cursor_pos]);
473                });
474            });
475        }
476    }
477
478    pub fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
479        let parent_message_id = match message.parent_header.as_ref() {
480            Some(header) => &header.msg_id,
481            None => return,
482        };
483
484        match &message.content {
485            JupyterMessageContent::Status(status) => {
486                self.kernel.set_execution_state(&status.execution_state);
487
488                telemetry::event!(
489                    "Kernel Status Changed",
490                    kernel_language = self.kernel_specification.language(),
491                    kernel_status = KernelStatus::from(&self.kernel).to_string(),
492                    repl_session_id = cx.entity_id().to_string(),
493                );
494
495                cx.notify();
496            }
497            JupyterMessageContent::KernelInfoReply(reply) => {
498                self.kernel.set_kernel_info(reply);
499                cx.notify();
500            }
501            JupyterMessageContent::UpdateDisplayData(update) => {
502                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
503                    display_id
504                } else {
505                    return;
506                };
507
508                self.blocks.iter_mut().for_each(|(_, block)| {
509                    block.execution_view.update(cx, |execution_view, cx| {
510                        execution_view.update_display_data(&update.data, &display_id, cx);
511                    });
512                });
513                return;
514            }
515            _ => {}
516        }
517
518        if let Some(block) = self.blocks.get_mut(parent_message_id) {
519            block.handle_message(message, cx);
520        }
521    }
522
523    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
524        match &mut self.kernel {
525            Kernel::RunningKernel(_kernel) => {
526                self.send(InterruptRequest {}.into(), cx).ok();
527            }
528            Kernel::StartingKernel(_task) => {
529                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
530            }
531            _ => {}
532        }
533    }
534
535    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
536        if let Kernel::Shutdown = kernel {
537            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
538        }
539
540        let kernel_status = KernelStatus::from(&kernel).to_string();
541        let kernel_language = self.kernel_specification.language();
542
543        telemetry::event!(
544            "Kernel Status Changed",
545            kernel_language,
546            kernel_status,
547            repl_session_id = cx.entity_id().to_string(),
548        );
549
550        self.kernel = kernel;
551    }
552
553    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
554        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
555
556        match kernel {
557            Kernel::RunningKernel(mut kernel) => {
558                let mut request_tx = kernel.request_tx().clone();
559
560                let forced = kernel.force_shutdown(cx);
561
562                cx.spawn(|this, mut cx| async move {
563                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
564                    request_tx.try_send(message).ok();
565
566                    forced.await.log_err();
567
568                    // Give the kernel a bit of time to clean up
569                    cx.background_executor().timer(Duration::from_secs(3)).await;
570
571                    this.update(&mut cx, |session, cx| {
572                        session.clear_outputs(cx);
573                        session.kernel(Kernel::Shutdown, cx);
574                        cx.notify();
575                    })
576                    .ok();
577                })
578                .detach();
579            }
580            _ => {
581                self.kernel(Kernel::Shutdown, cx);
582            }
583        }
584        cx.notify();
585    }
586
587    pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
588        let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
589
590        match kernel {
591            Kernel::Restarting => {
592                // Do nothing if already restarting
593            }
594            Kernel::RunningKernel(mut kernel) => {
595                let mut request_tx = kernel.request_tx().clone();
596
597                let forced = kernel.force_shutdown(cx);
598
599                cx.spawn(|this, mut cx| async move {
600                    // Send shutdown request with restart flag
601                    log::debug!("restarting kernel");
602                    let message: JupyterMessage = ShutdownRequest { restart: true }.into();
603                    request_tx.try_send(message).ok();
604
605                    // Wait for kernel to shutdown
606                    cx.background_executor().timer(Duration::from_secs(1)).await;
607
608                    // Force kill the kernel if it hasn't shut down
609                    forced.await.log_err();
610
611                    // Start a new kernel
612                    this.update(&mut cx, |session, cx| {
613                        // TODO: Differentiate between restart and restart+clear-outputs
614                        session.clear_outputs(cx);
615                        session.start_kernel(cx);
616                    })
617                    .ok();
618                })
619                .detach();
620            }
621            _ => {
622                self.clear_outputs(cx);
623                self.start_kernel(cx);
624            }
625        }
626        cx.notify();
627    }
628}
629
630pub enum SessionEvent {
631    Shutdown(WeakView<Editor>),
632}
633
634impl EventEmitter<SessionEvent> for Session {}
635
636impl Render for Session {
637    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
638        let (status_text, interrupt_button) = match &self.kernel {
639            Kernel::RunningKernel(kernel) => (
640                kernel
641                    .kernel_info()
642                    .as_ref()
643                    .map(|info| info.language_info.name.clone()),
644                Some(
645                    Button::new("interrupt", "Interrupt")
646                        .style(ButtonStyle::Subtle)
647                        .on_click(cx.listener(move |session, _, cx| {
648                            session.interrupt(cx);
649                        })),
650                ),
651            ),
652            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
653            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
654            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
655            Kernel::Shutdown => (Some("Shutdown".into()), None),
656            Kernel::Restarting => (Some("Restarting".into()), None),
657        };
658
659        KernelListItem::new(self.kernel_specification.clone())
660            .status_color(match &self.kernel {
661                Kernel::RunningKernel(kernel) => match kernel.execution_state() {
662                    ExecutionState::Idle => Color::Success,
663                    ExecutionState::Busy => Color::Modified,
664                },
665                Kernel::StartingKernel(_) => Color::Modified,
666                Kernel::ErroredLaunch(_) => Color::Error,
667                Kernel::ShuttingDown => Color::Modified,
668                Kernel::Shutdown => Color::Disabled,
669                Kernel::Restarting => Color::Modified,
670            })
671            .child(Label::new(self.kernel_specification.name()))
672            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
673            .button(
674                Button::new("shutdown", "Shutdown")
675                    .style(ButtonStyle::Subtle)
676                    .disabled(self.kernel.is_shutting_down())
677                    .on_click(cx.listener(move |session, _, cx| {
678                        session.shutdown(cx);
679                    })),
680            )
681            .buttons(interrupt_button)
682    }
683}