session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use settings::Settings as _;
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::{ActiveTheme, ThemeSettings};
 32use ui::{prelude::*, IconButtonShape, Tooltip};
 33
 34pub struct Session {
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Task<()>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: View<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakView<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut ViewContext<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 63
 64        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 65            let buffer = editor.buffer().clone();
 66            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 67            let end_point = code_range.end.to_point(&buffer_snapshot);
 68            let next_row_start = end_point + Point::new(1, 0);
 69            if next_row_start > buffer_snapshot.max_point() {
 70                buffer.update(cx, |buffer, cx| {
 71                    buffer.edit(
 72                        [(
 73                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 74                            "\n",
 75                        )],
 76                        None,
 77                        cx,
 78                    )
 79                });
 80            }
 81
 82            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 83            let block = BlockProperties {
 84                position: code_range.end,
 85                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 86                height: 1,
 87                style: BlockStyle::Sticky,
 88                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 89                disposition: BlockDisposition::Below,
 90            };
 91
 92            let block_id = editor.insert_blocks([block], None, cx)[0];
 93            (block_id, invalidation_anchor)
 94        })?;
 95
 96        anyhow::Ok(Self {
 97            code_range,
 98            invalidation_anchor,
 99            block_id,
100            execution_view,
101        })
102    }
103
104    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
105        self.execution_view.update(cx, |execution_view, cx| {
106            execution_view.push_message(&message.content, cx);
107        });
108    }
109
110    fn create_output_area_renderer(
111        execution_view: View<ExecutionView>,
112        on_close: CloseBlockFn,
113    ) -> RenderBlock {
114        let render = move |cx: &mut BlockContext| {
115            let execution_view = execution_view.clone();
116            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118
119            let gutter = cx.gutter_dimensions;
120            let close_button_size = IconSize::XSmall;
121
122            let block_id = cx.block_id;
123            let on_close = on_close.clone();
124
125            let rem_size = cx.rem_size();
126            let line_height = cx.text_style().line_height_in_pixels(rem_size);
127
128            let (close_button_width, close_button_padding) =
129                close_button_size.square_components(cx);
130
131            div()
132                .min_h(line_height)
133                .flex()
134                .flex_row()
135                .items_start()
136                .w_full()
137                .bg(cx.theme().colors().background)
138                .border_y_1()
139                .border_color(cx.theme().colors().border)
140                .child(
141                    v_flex().min_h(cx.line_height()).justify_center().child(
142                        h_flex()
143                            .w(gutter.full_width())
144                            .justify_end()
145                            .pt(line_height / 2.)
146                            .child(
147                                h_flex()
148                                    .pr(gutter.width / 2. - close_button_width
149                                        + close_button_padding / 2.)
150                                    .child(
151                                        IconButton::new(
152                                            ("close_output_area", EntityId::from(cx.block_id)),
153                                            IconName::Close,
154                                        )
155                                        .shape(IconButtonShape::Square)
156                                        .icon_size(close_button_size)
157                                        .icon_color(Color::Muted)
158                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
159                                        .on_click(
160                                            move |_, cx| {
161                                                if let BlockId::Custom(block_id) = block_id {
162                                                    (on_close)(block_id, cx)
163                                                }
164                                            },
165                                        ),
166                                    ),
167                            ),
168                    ),
169                )
170                .child(
171                    div()
172                        .flex_1()
173                        .size_full()
174                        .my_2()
175                        .mr(gutter.width)
176                        .text_size(text_font_size)
177                        .font_family(text_font)
178                        .child(execution_view),
179                )
180                .into_any_element()
181        };
182
183        Box::new(render)
184    }
185}
186
187impl Session {
188    pub fn new(
189        editor: WeakView<Editor>,
190        fs: Arc<dyn Fs>,
191        telemetry: Arc<Telemetry>,
192        kernel_specification: KernelSpecification,
193        cx: &mut ViewContext<Self>,
194    ) -> Self {
195        let kernel_language = kernel_specification.kernelspec.language.clone();
196
197        telemetry.report_repl_event(
198            kernel_language.clone(),
199            KernelStatus::Starting.to_string(),
200            cx.entity_id().to_string(),
201        );
202
203        let entity_id = editor.entity_id();
204        let working_directory = editor
205            .upgrade()
206            .and_then(|editor| editor.read(cx).working_directory(cx))
207            .unwrap_or_else(temp_dir);
208        let kernel = RunningKernel::new(
209            kernel_specification.clone(),
210            entity_id,
211            working_directory,
212            fs.clone(),
213            cx,
214        );
215
216        let pending_kernel = cx
217            .spawn(|this, mut cx| async move {
218                let kernel = kernel.await;
219
220                match kernel {
221                    Ok((mut kernel, mut messages_rx)) => {
222                        this.update(&mut cx, |session, cx| {
223                            let stderr = kernel.process.stderr.take();
224
225                            cx.spawn(|_session, mut _cx| async move {
226                                if let None = stderr {
227                                    return;
228                                }
229                                let reader = BufReader::new(stderr.unwrap());
230                                let mut lines = reader.lines();
231                                while let Some(Ok(line)) = lines.next().await {
232                                    log::error!("kernel: {}", line);
233                                }
234                            })
235                            .detach();
236
237                            let stdout = kernel.process.stdout.take();
238
239                            cx.spawn(|_session, mut _cx| async move {
240                                if let None = stdout {
241                                    return;
242                                }
243                                let reader = BufReader::new(stdout.unwrap());
244                                let mut lines = reader.lines();
245                                while let Some(Ok(line)) = lines.next().await {
246                                    log::info!("kernel: {}", line);
247                                }
248                            })
249                            .detach();
250
251                            let status = kernel.process.status();
252                            session.kernel(Kernel::RunningKernel(kernel), cx);
253
254                            cx.spawn(|session, mut cx| async move {
255                                let error_message = match status.await {
256                                    Ok(status) => {
257                                        if status.success() {
258                                            log::info!("kernel process exited successfully");
259                                            return;
260                                        }
261
262                                        format!("kernel process exited with status: {:?}", status)
263                                    }
264                                    Err(err) => {
265                                        format!("kernel process exited with error: {:?}", err)
266                                    }
267                                };
268
269                                log::error!("{}", error_message);
270
271                                session
272                                    .update(&mut cx, |session, cx| {
273                                        session.kernel(
274                                            Kernel::ErroredLaunch(error_message.clone()),
275                                            cx,
276                                        );
277
278                                        session.blocks.values().for_each(|block| {
279                                            block.execution_view.update(
280                                                cx,
281                                                |execution_view, cx| {
282                                                    match execution_view.status {
283                                                        ExecutionStatus::Finished => {
284                                                            // Do nothing when the output was good
285                                                        }
286                                                        _ => {
287                                                            // All other cases, set the status to errored
288                                                            execution_view.status =
289                                                                ExecutionStatus::KernelErrored(
290                                                                    error_message.clone(),
291                                                                )
292                                                        }
293                                                    }
294                                                    cx.notify();
295                                                },
296                                            );
297                                        });
298
299                                        cx.notify();
300                                    })
301                                    .ok();
302                            })
303                            .detach();
304
305                            session.messaging_task = cx.spawn(|session, mut cx| async move {
306                                while let Some(message) = messages_rx.next().await {
307                                    session
308                                        .update(&mut cx, |session, cx| {
309                                            session.route(&message, cx);
310                                        })
311                                        .ok();
312                                }
313                            });
314
315                            // todo!(@rgbkrk): send kernelinforequest once our shell channel read/writes are split
316                            // cx.spawn(|this, mut cx| async move {
317                            //     cx.background_executor()
318                            //         .timer(Duration::from_millis(120))
319                            //         .await;
320                            //     this.update(&mut cx, |this, cx| {
321                            //         this.send(KernelInfoRequest {}.into(), cx).ok();
322                            //     })
323                            //     .ok();
324                            // })
325                            // .detach();
326                        })
327                        .ok();
328                    }
329                    Err(err) => {
330                        this.update(&mut cx, |session, cx| {
331                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
332                        })
333                        .ok();
334                    }
335                }
336            })
337            .shared();
338
339        let subscription = match editor.upgrade() {
340            Some(editor) => {
341                let buffer = editor.read(cx).buffer().clone();
342                cx.subscribe(&buffer, Self::on_buffer_event)
343            }
344            None => Subscription::new(|| {}),
345        };
346
347        return Self {
348            editor,
349            kernel: Kernel::StartingKernel(pending_kernel),
350            messaging_task: Task::ready(()),
351            blocks: HashMap::default(),
352            kernel_specification,
353            _buffer_subscription: subscription,
354            telemetry,
355        };
356    }
357
358    fn on_buffer_event(
359        &mut self,
360        buffer: Model<MultiBuffer>,
361        event: &multi_buffer::Event,
362        cx: &mut ViewContext<Self>,
363    ) {
364        if let multi_buffer::Event::Edited { .. } = event {
365            let snapshot = buffer.read(cx).snapshot(cx);
366
367            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
368
369            self.blocks.retain(|_id, block| {
370                if block.invalidation_anchor.is_valid(&snapshot) {
371                    true
372                } else {
373                    blocks_to_remove.insert(block.block_id);
374                    false
375                }
376            });
377
378            if !blocks_to_remove.is_empty() {
379                self.editor
380                    .update(cx, |editor, cx| {
381                        editor.remove_blocks(blocks_to_remove, None, cx);
382                    })
383                    .ok();
384                cx.notify();
385            }
386        }
387    }
388
389    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
390        match &mut self.kernel {
391            Kernel::RunningKernel(kernel) => {
392                kernel.request_tx.try_send(message).ok();
393            }
394            _ => {}
395        }
396
397        anyhow::Ok(())
398    }
399
400    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
401        let blocks_to_remove: HashSet<CustomBlockId> =
402            self.blocks.values().map(|block| block.block_id).collect();
403
404        self.editor
405            .update(cx, |editor, cx| {
406                editor.remove_blocks(blocks_to_remove, None, cx);
407            })
408            .ok();
409
410        self.blocks.clear();
411    }
412
413    pub fn execute(
414        &mut self,
415        code: String,
416        anchor_range: Range<Anchor>,
417        next_cell: Option<Anchor>,
418        move_down: bool,
419        cx: &mut ViewContext<Self>,
420    ) {
421        let Some(editor) = self.editor.upgrade() else {
422            return;
423        };
424
425        if code.is_empty() {
426            return;
427        }
428
429        let execute_request = ExecuteRequest {
430            code,
431            ..ExecuteRequest::default()
432        };
433
434        let message: JupyterMessage = execute_request.into();
435
436        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
437
438        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
439
440        self.blocks.retain(|_key, block| {
441            if anchor_range.overlaps(&block.code_range, &buffer) {
442                blocks_to_remove.insert(block.block_id);
443                false
444            } else {
445                true
446            }
447        });
448
449        self.editor
450            .update(cx, |editor, cx| {
451                editor.remove_blocks(blocks_to_remove, None, cx);
452            })
453            .ok();
454
455        let status = match &self.kernel {
456            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
457            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
458            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
459            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
460            Kernel::Shutdown => ExecutionStatus::Shutdown,
461        };
462
463        let parent_message_id = message.header.msg_id.clone();
464        let session_view = cx.view().downgrade();
465        let weak_editor = self.editor.clone();
466
467        let on_close: CloseBlockFn =
468            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
469                if let Some(session) = session_view.upgrade() {
470                    session.update(cx, |session, cx| {
471                        session.blocks.remove(&parent_message_id);
472                        cx.notify();
473                    });
474                }
475
476                if let Some(editor) = weak_editor.upgrade() {
477                    editor.update(cx, |editor, cx| {
478                        let mut block_ids = HashSet::default();
479                        block_ids.insert(block_id);
480                        editor.remove_blocks(block_ids, None, cx);
481                    });
482                }
483            });
484
485        let Ok(editor_block) =
486            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
487        else {
488            return;
489        };
490
491        let new_cursor_pos = if let Some(next_cursor) = next_cell {
492            next_cursor
493        } else {
494            editor_block.invalidation_anchor
495        };
496
497        self.blocks
498            .insert(message.header.msg_id.clone(), editor_block);
499
500        match &self.kernel {
501            Kernel::RunningKernel(_) => {
502                self.send(message, cx).ok();
503            }
504            Kernel::StartingKernel(task) => {
505                // Queue up the execution as a task to run after the kernel starts
506                let task = task.clone();
507                let message = message.clone();
508
509                cx.spawn(|this, mut cx| async move {
510                    task.await;
511                    this.update(&mut cx, |session, cx| {
512                        session.send(message, cx).ok();
513                    })
514                    .ok();
515                })
516                .detach();
517            }
518            _ => {}
519        }
520
521        if move_down {
522            editor.update(cx, move |editor, cx| {
523                editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
524                    selections.select_ranges([new_cursor_pos..new_cursor_pos]);
525                });
526            });
527        }
528    }
529
530    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
531        let parent_message_id = match message.parent_header.as_ref() {
532            Some(header) => &header.msg_id,
533            None => return,
534        };
535
536        match &message.content {
537            JupyterMessageContent::Status(status) => {
538                self.kernel.set_execution_state(&status.execution_state);
539
540                self.telemetry.report_repl_event(
541                    self.kernel_specification.kernelspec.language.clone(),
542                    KernelStatus::from(&self.kernel).to_string(),
543                    cx.entity_id().to_string(),
544                );
545
546                cx.notify();
547            }
548            JupyterMessageContent::KernelInfoReply(reply) => {
549                self.kernel.set_kernel_info(&reply);
550                cx.notify();
551            }
552            JupyterMessageContent::UpdateDisplayData(update) => {
553                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
554                    display_id
555                } else {
556                    return;
557                };
558
559                self.blocks.iter_mut().for_each(|(_, block)| {
560                    block.execution_view.update(cx, |execution_view, cx| {
561                        execution_view.update_display_data(&update.data, &display_id, cx);
562                    });
563                });
564                return;
565            }
566            _ => {}
567        }
568
569        if let Some(block) = self.blocks.get_mut(parent_message_id) {
570            block.handle_message(&message, cx);
571            return;
572        }
573    }
574
575    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
576        match &mut self.kernel {
577            Kernel::RunningKernel(_kernel) => {
578                self.send(InterruptRequest {}.into(), cx).ok();
579            }
580            Kernel::StartingKernel(_task) => {
581                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
582            }
583            _ => {}
584        }
585    }
586
587    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
588        if let Kernel::Shutdown = kernel {
589            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
590        }
591
592        let kernel_status = KernelStatus::from(&kernel).to_string();
593        let kernel_language = self.kernel_specification.kernelspec.language.clone();
594
595        self.telemetry.report_repl_event(
596            kernel_language,
597            kernel_status,
598            cx.entity_id().to_string(),
599        );
600
601        self.kernel = kernel;
602    }
603
604    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
605        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
606
607        match kernel {
608            Kernel::RunningKernel(mut kernel) => {
609                let mut request_tx = kernel.request_tx.clone();
610
611                cx.spawn(|this, mut cx| async move {
612                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
613                    request_tx.try_send(message).ok();
614
615                    // Give the kernel a bit of time to clean up
616                    cx.background_executor().timer(Duration::from_secs(3)).await;
617
618                    kernel.process.kill().ok();
619
620                    this.update(&mut cx, |session, cx| {
621                        session.clear_outputs(cx);
622                        session.kernel(Kernel::Shutdown, cx);
623                        cx.notify();
624                    })
625                    .ok();
626                })
627                .detach();
628            }
629            Kernel::StartingKernel(_kernel) => {
630                self.kernel = Kernel::Shutdown;
631            }
632            _ => {
633                self.kernel = Kernel::Shutdown;
634            }
635        }
636        cx.notify();
637    }
638}
639
640pub enum SessionEvent {
641    Shutdown(WeakView<Editor>),
642}
643
644impl EventEmitter<SessionEvent> for Session {}
645
646impl Render for Session {
647    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
648        let (status_text, interrupt_button) = match &self.kernel {
649            Kernel::RunningKernel(kernel) => (
650                kernel
651                    .kernel_info
652                    .as_ref()
653                    .map(|info| info.language_info.name.clone()),
654                Some(
655                    Button::new("interrupt", "Interrupt")
656                        .style(ButtonStyle::Subtle)
657                        .on_click(cx.listener(move |session, _, cx| {
658                            session.interrupt(cx);
659                        })),
660                ),
661            ),
662            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
663            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
664            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
665            Kernel::Shutdown => (Some("Shutdown".into()), None),
666        };
667
668        KernelListItem::new(self.kernel_specification.clone())
669            .status_color(match &self.kernel {
670                Kernel::RunningKernel(kernel) => match kernel.execution_state {
671                    ExecutionState::Idle => Color::Success,
672                    ExecutionState::Busy => Color::Modified,
673                },
674                Kernel::StartingKernel(_) => Color::Modified,
675                Kernel::ErroredLaunch(_) => Color::Error,
676                Kernel::ShuttingDown => Color::Modified,
677                Kernel::Shutdown => Color::Disabled,
678            })
679            .child(Label::new(self.kernel_specification.name.clone()))
680            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
681            .button(
682                Button::new("shutdown", "Shutdown")
683                    .style(ButtonStyle::Subtle)
684                    .disabled(self.kernel.is_shutting_down())
685                    .on_click(cx.listener(move |session, _, cx| {
686                        session.shutdown(cx);
687                    })),
688            )
689            .buttons(interrupt_button)
690    }
691}