session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use settings::Settings as _;
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::{ActiveTheme, ThemeSettings};
 32use ui::{prelude::*, IconButtonShape, Tooltip};
 33
 34pub struct Session {
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Task<()>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    editor: WeakView<Editor>,
 46    code_range: Range<Anchor>,
 47    invalidation_anchor: Anchor,
 48    block_id: CustomBlockId,
 49    execution_view: View<ExecutionView>,
 50    on_close: CloseBlockFn,
 51}
 52
 53type CloseBlockFn =
 54    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 55
 56impl EditorBlock {
 57    fn new(
 58        editor: WeakView<Editor>,
 59        code_range: Range<Anchor>,
 60        status: ExecutionStatus,
 61        on_close: CloseBlockFn,
 62        cx: &mut ViewContext<Session>,
 63    ) -> anyhow::Result<Self> {
 64        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 65
 66        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 67            let buffer = editor.buffer().clone();
 68            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 69            let end_point = code_range.end.to_point(&buffer_snapshot);
 70            let next_row_start = end_point + Point::new(1, 0);
 71            if next_row_start > buffer_snapshot.max_point() {
 72                buffer.update(cx, |buffer, cx| {
 73                    buffer.edit(
 74                        [(
 75                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 76                            "\n",
 77                        )],
 78                        None,
 79                        cx,
 80                    )
 81                });
 82            }
 83
 84            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 85            let block = BlockProperties {
 86                position: code_range.end,
 87                height: execution_view.num_lines(cx).saturating_add(1),
 88                style: BlockStyle::Sticky,
 89                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 90                disposition: BlockDisposition::Below,
 91            };
 92
 93            let block_id = editor.insert_blocks([block], None, cx)[0];
 94            (block_id, invalidation_anchor)
 95        })?;
 96
 97        anyhow::Ok(Self {
 98            editor,
 99            code_range,
100            invalidation_anchor,
101            block_id,
102            execution_view,
103            on_close,
104        })
105    }
106
107    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
108        self.execution_view.update(cx, |execution_view, cx| {
109            execution_view.push_message(&message.content, cx);
110        });
111
112        self.editor
113            .update(cx, |editor, cx| {
114                let mut replacements = HashMap::default();
115
116                replacements.insert(
117                    self.block_id,
118                    (
119                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
120                        Self::create_output_area_renderer(
121                            self.execution_view.clone(),
122                            self.on_close.clone(),
123                        ),
124                    ),
125                );
126                editor.replace_blocks(replacements, None, cx);
127            })
128            .ok();
129    }
130
131    fn create_output_area_renderer(
132        execution_view: View<ExecutionView>,
133        on_close: CloseBlockFn,
134    ) -> RenderBlock {
135        let render = move |cx: &mut BlockContext| {
136            let execution_view = execution_view.clone();
137            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
138            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
139
140            let gutter = cx.gutter_dimensions;
141            let close_button_size = IconSize::XSmall;
142
143            let block_id = cx.block_id;
144            let on_close = on_close.clone();
145
146            let rem_size = cx.rem_size();
147            let line_height = cx.text_style().line_height_in_pixels(rem_size);
148
149            let (close_button_width, close_button_padding) =
150                close_button_size.square_components(cx);
151
152            div()
153                .min_h(line_height)
154                .flex()
155                .flex_row()
156                .items_start()
157                .w_full()
158                .bg(cx.theme().colors().background)
159                .border_y_1()
160                .border_color(cx.theme().colors().border)
161                .child(
162                    v_flex().min_h(cx.line_height()).justify_center().child(
163                        h_flex()
164                            .w(gutter.full_width())
165                            .justify_end()
166                            .pt(line_height / 2.)
167                            .child(
168                                h_flex()
169                                    .pr(gutter.width / 2. - close_button_width
170                                        + close_button_padding / 2.)
171                                    .child(
172                                        IconButton::new(
173                                            ("close_output_area", EntityId::from(cx.block_id)),
174                                            IconName::Close,
175                                        )
176                                        .shape(IconButtonShape::Square)
177                                        .icon_size(close_button_size)
178                                        .icon_color(Color::Muted)
179                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
180                                        .on_click(
181                                            move |_, cx| {
182                                                if let BlockId::Custom(block_id) = block_id {
183                                                    (on_close)(block_id, cx)
184                                                }
185                                            },
186                                        ),
187                                    ),
188                            ),
189                    ),
190                )
191                .child(
192                    div()
193                        .flex_1()
194                        .size_full()
195                        .my_2()
196                        .mr(gutter.width)
197                        .text_size(text_font_size)
198                        .font_family(text_font)
199                        .child(execution_view),
200                )
201                .into_any_element()
202        };
203
204        Box::new(render)
205    }
206}
207
208impl Session {
209    pub fn new(
210        editor: WeakView<Editor>,
211        fs: Arc<dyn Fs>,
212        telemetry: Arc<Telemetry>,
213        kernel_specification: KernelSpecification,
214        cx: &mut ViewContext<Self>,
215    ) -> Self {
216        let kernel_language = kernel_specification.kernelspec.language.clone();
217
218        telemetry.report_repl_event(
219            kernel_language.clone(),
220            KernelStatus::Starting.to_string(),
221            cx.entity_id().to_string(),
222        );
223
224        let entity_id = editor.entity_id();
225        let working_directory = editor
226            .upgrade()
227            .and_then(|editor| editor.read(cx).working_directory(cx))
228            .unwrap_or_else(temp_dir);
229        let kernel = RunningKernel::new(
230            kernel_specification.clone(),
231            entity_id,
232            working_directory,
233            fs.clone(),
234            cx,
235        );
236
237        let pending_kernel = cx
238            .spawn(|this, mut cx| async move {
239                let kernel = kernel.await;
240
241                match kernel {
242                    Ok((mut kernel, mut messages_rx)) => {
243                        this.update(&mut cx, |session, cx| {
244                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
245
246                            let stderr = kernel.process.stderr.take();
247
248                            cx.spawn(|_session, mut _cx| async move {
249                                if let None = stderr {
250                                    return;
251                                }
252                                let reader = BufReader::new(stderr.unwrap());
253                                let mut lines = reader.lines();
254                                while let Some(Ok(line)) = lines.next().await {
255                                    log::error!("kernel: {}", line);
256                                }
257                            })
258                            .detach();
259
260                            let stdout = kernel.process.stderr.take();
261
262                            cx.spawn(|_session, mut _cx| async move {
263                                if let None = stdout {
264                                    return;
265                                }
266                                let reader = BufReader::new(stdout.unwrap());
267                                let mut lines = reader.lines();
268                                while let Some(Ok(line)) = lines.next().await {
269                                    log::info!("kernel: {}", line);
270                                }
271                            })
272                            .detach();
273
274                            let status = kernel.process.status();
275                            session.kernel(Kernel::RunningKernel(kernel), cx);
276
277                            cx.spawn(|session, mut cx| async move {
278                                let error_message = match status.await {
279                                    Ok(status) => {
280                                        if status.success() {
281                                            log::info!("kernel process exited successfully");
282                                            return;
283                                        }
284
285                                        format!("kernel process exited with status: {:?}", status)
286                                    }
287                                    Err(err) => {
288                                        format!("kernel process exited with error: {:?}", err)
289                                    }
290                                };
291
292                                log::error!("{}", error_message);
293
294                                session
295                                    .update(&mut cx, |session, cx| {
296                                        session.kernel(
297                                            Kernel::ErroredLaunch(error_message.clone()),
298                                            cx,
299                                        );
300
301                                        session.blocks.values().for_each(|block| {
302                                            block.execution_view.update(
303                                                cx,
304                                                |execution_view, cx| {
305                                                    match execution_view.status {
306                                                        ExecutionStatus::Finished => {
307                                                            // Do nothing when the output was good
308                                                        }
309                                                        _ => {
310                                                            // All other cases, set the status to errored
311                                                            execution_view.status =
312                                                                ExecutionStatus::KernelErrored(
313                                                                    error_message.clone(),
314                                                                )
315                                                        }
316                                                    }
317                                                    cx.notify();
318                                                },
319                                            );
320                                        });
321
322                                        cx.notify();
323                                    })
324                                    .ok();
325                            })
326                            .detach();
327
328                            session.messaging_task = cx.spawn(|session, mut cx| async move {
329                                while let Some(message) = messages_rx.next().await {
330                                    session
331                                        .update(&mut cx, |session, cx| {
332                                            session.route(&message, cx);
333                                        })
334                                        .ok();
335                                }
336                            });
337                        })
338                        .ok();
339                    }
340                    Err(err) => {
341                        this.update(&mut cx, |session, cx| {
342                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
343                        })
344                        .ok();
345                    }
346                }
347            })
348            .shared();
349
350        let subscription = match editor.upgrade() {
351            Some(editor) => {
352                let buffer = editor.read(cx).buffer().clone();
353                cx.subscribe(&buffer, Self::on_buffer_event)
354            }
355            None => Subscription::new(|| {}),
356        };
357
358        return Self {
359            editor,
360            kernel: Kernel::StartingKernel(pending_kernel),
361            messaging_task: Task::ready(()),
362            blocks: HashMap::default(),
363            kernel_specification,
364            _buffer_subscription: subscription,
365            telemetry,
366        };
367    }
368
369    fn on_buffer_event(
370        &mut self,
371        buffer: Model<MultiBuffer>,
372        event: &multi_buffer::Event,
373        cx: &mut ViewContext<Self>,
374    ) {
375        if let multi_buffer::Event::Edited { .. } = event {
376            let snapshot = buffer.read(cx).snapshot(cx);
377
378            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
379
380            self.blocks.retain(|_id, block| {
381                if block.invalidation_anchor.is_valid(&snapshot) {
382                    true
383                } else {
384                    blocks_to_remove.insert(block.block_id);
385                    false
386                }
387            });
388
389            if !blocks_to_remove.is_empty() {
390                self.editor
391                    .update(cx, |editor, cx| {
392                        editor.remove_blocks(blocks_to_remove, None, cx);
393                    })
394                    .ok();
395                cx.notify();
396            }
397        }
398    }
399
400    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
401        match &mut self.kernel {
402            Kernel::RunningKernel(kernel) => {
403                kernel.request_tx.try_send(message).ok();
404            }
405            _ => {}
406        }
407
408        anyhow::Ok(())
409    }
410
411    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
412        let blocks_to_remove: HashSet<CustomBlockId> =
413            self.blocks.values().map(|block| block.block_id).collect();
414
415        self.editor
416            .update(cx, |editor, cx| {
417                editor.remove_blocks(blocks_to_remove, None, cx);
418            })
419            .ok();
420
421        self.blocks.clear();
422    }
423
424    pub fn execute(
425        &mut self,
426        code: String,
427        anchor_range: Range<Anchor>,
428        next_cell: Option<Anchor>,
429        cx: &mut ViewContext<Self>,
430    ) {
431        let Some(editor) = self.editor.upgrade() else {
432            return;
433        };
434
435        if code.is_empty() {
436            return;
437        }
438
439        let execute_request = ExecuteRequest {
440            code,
441            ..ExecuteRequest::default()
442        };
443
444        let message: JupyterMessage = execute_request.into();
445
446        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
447
448        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
449
450        self.blocks.retain(|_key, block| {
451            if anchor_range.overlaps(&block.code_range, &buffer) {
452                blocks_to_remove.insert(block.block_id);
453                false
454            } else {
455                true
456            }
457        });
458
459        self.editor
460            .update(cx, |editor, cx| {
461                editor.remove_blocks(blocks_to_remove, None, cx);
462            })
463            .ok();
464
465        let status = match &self.kernel {
466            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
467            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
468            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
469            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
470            Kernel::Shutdown => ExecutionStatus::Shutdown,
471        };
472
473        let parent_message_id = message.header.msg_id.clone();
474        let session_view = cx.view().downgrade();
475        let weak_editor = self.editor.clone();
476
477        let on_close: CloseBlockFn =
478            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
479                if let Some(session) = session_view.upgrade() {
480                    session.update(cx, |session, cx| {
481                        session.blocks.remove(&parent_message_id);
482                        cx.notify();
483                    });
484                }
485
486                if let Some(editor) = weak_editor.upgrade() {
487                    editor.update(cx, |editor, cx| {
488                        let mut block_ids = HashSet::default();
489                        block_ids.insert(block_id);
490                        editor.remove_blocks(block_ids, None, cx);
491                    });
492                }
493            });
494
495        let Ok(editor_block) =
496            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
497        else {
498            return;
499        };
500
501        let new_cursor_pos = if let Some(next_cursor) = next_cell {
502            next_cursor
503        } else {
504            editor_block.invalidation_anchor
505        };
506
507        self.blocks
508            .insert(message.header.msg_id.clone(), editor_block);
509
510        match &self.kernel {
511            Kernel::RunningKernel(_) => {
512                self.send(message, cx).ok();
513            }
514            Kernel::StartingKernel(task) => {
515                // Queue up the execution as a task to run after the kernel starts
516                let task = task.clone();
517                let message = message.clone();
518
519                cx.spawn(|this, mut cx| async move {
520                    task.await;
521                    this.update(&mut cx, |session, cx| {
522                        session.send(message, cx).ok();
523                    })
524                    .ok();
525                })
526                .detach();
527            }
528            _ => {}
529        }
530
531        // Now move the cursor to after the block
532        editor.update(cx, move |editor, cx| {
533            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
534                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
535            });
536        });
537    }
538
539    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
540        let parent_message_id = match message.parent_header.as_ref() {
541            Some(header) => &header.msg_id,
542            None => return,
543        };
544
545        match &message.content {
546            JupyterMessageContent::Status(status) => {
547                self.kernel.set_execution_state(&status.execution_state);
548
549                self.telemetry.report_repl_event(
550                    self.kernel_specification.kernelspec.language.clone(),
551                    KernelStatus::from(&self.kernel).to_string(),
552                    cx.entity_id().to_string(),
553                );
554
555                cx.notify();
556            }
557            JupyterMessageContent::KernelInfoReply(reply) => {
558                self.kernel.set_kernel_info(&reply);
559                cx.notify();
560            }
561            _ => {}
562        }
563
564        if let Some(block) = self.blocks.get_mut(parent_message_id) {
565            block.handle_message(&message, cx);
566            return;
567        }
568    }
569
570    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
571        match &mut self.kernel {
572            Kernel::RunningKernel(_kernel) => {
573                self.send(InterruptRequest {}.into(), cx).ok();
574            }
575            Kernel::StartingKernel(_task) => {
576                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
577            }
578            _ => {}
579        }
580    }
581
582    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
583        if let Kernel::Shutdown = kernel {
584            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
585        }
586
587        let kernel_status = KernelStatus::from(&kernel).to_string();
588        let kernel_language = self.kernel_specification.kernelspec.language.clone();
589
590        self.telemetry.report_repl_event(
591            kernel_language,
592            kernel_status,
593            cx.entity_id().to_string(),
594        );
595
596        self.kernel = kernel;
597    }
598
599    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
600        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
601
602        match kernel {
603            Kernel::RunningKernel(mut kernel) => {
604                let mut request_tx = kernel.request_tx.clone();
605
606                cx.spawn(|this, mut cx| async move {
607                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
608                    request_tx.try_send(message).ok();
609
610                    // Give the kernel a bit of time to clean up
611                    cx.background_executor().timer(Duration::from_secs(3)).await;
612
613                    kernel.process.kill().ok();
614
615                    this.update(&mut cx, |session, cx| {
616                        session.clear_outputs(cx);
617                        session.kernel(Kernel::Shutdown, cx);
618                        cx.notify();
619                    })
620                    .ok();
621                })
622                .detach();
623            }
624            Kernel::StartingKernel(_kernel) => {
625                self.kernel = Kernel::Shutdown;
626            }
627            _ => {
628                self.kernel = Kernel::Shutdown;
629            }
630        }
631        cx.notify();
632    }
633}
634
635pub enum SessionEvent {
636    Shutdown(WeakView<Editor>),
637}
638
639impl EventEmitter<SessionEvent> for Session {}
640
641impl Render for Session {
642    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
643        let (status_text, interrupt_button) = match &self.kernel {
644            Kernel::RunningKernel(kernel) => (
645                kernel
646                    .kernel_info
647                    .as_ref()
648                    .map(|info| info.language_info.name.clone()),
649                Some(
650                    Button::new("interrupt", "Interrupt")
651                        .style(ButtonStyle::Subtle)
652                        .on_click(cx.listener(move |session, _, cx| {
653                            session.interrupt(cx);
654                        })),
655                ),
656            ),
657            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
658            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
659            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
660            Kernel::Shutdown => (Some("Shutdown".into()), None),
661        };
662
663        KernelListItem::new(self.kernel_specification.clone())
664            .status_color(match &self.kernel {
665                Kernel::RunningKernel(kernel) => match kernel.execution_state {
666                    ExecutionState::Idle => Color::Success,
667                    ExecutionState::Busy => Color::Modified,
668                },
669                Kernel::StartingKernel(_) => Color::Modified,
670                Kernel::ErroredLaunch(_) => Color::Error,
671                Kernel::ShuttingDown => Color::Modified,
672                Kernel::Shutdown => Color::Disabled,
673            })
674            .child(Label::new(self.kernel_specification.name.clone()))
675            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
676            .button(
677                Button::new("shutdown", "Shutdown")
678                    .style(ButtonStyle::Subtle)
679                    .disabled(self.kernel.is_shutting_down())
680                    .on_click(cx.listener(move |session, _, cx| {
681                        session.shutdown(cx);
682                    })),
683            )
684            .buttons(interrupt_button)
685    }
686}