session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use settings::Settings as _;
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::{ActiveTheme, ThemeSettings};
 32use ui::{prelude::*, IconButtonShape, Tooltip};
 33
 34pub struct Session {
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Task<()>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: View<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakView<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut ViewContext<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 63
 64        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 65            let buffer = editor.buffer().clone();
 66            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 67            let end_point = code_range.end.to_point(&buffer_snapshot);
 68            let next_row_start = end_point + Point::new(1, 0);
 69            if next_row_start > buffer_snapshot.max_point() {
 70                buffer.update(cx, |buffer, cx| {
 71                    buffer.edit(
 72                        [(
 73                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 74                            "\n",
 75                        )],
 76                        None,
 77                        cx,
 78                    )
 79                });
 80            }
 81
 82            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 83            let block = BlockProperties {
 84                position: code_range.end,
 85                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 86                height: 1,
 87                style: BlockStyle::Sticky,
 88                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 89                disposition: BlockDisposition::Below,
 90            };
 91
 92            let block_id = editor.insert_blocks([block], None, cx)[0];
 93            (block_id, invalidation_anchor)
 94        })?;
 95
 96        anyhow::Ok(Self {
 97            code_range,
 98            invalidation_anchor,
 99            block_id,
100            execution_view,
101        })
102    }
103
104    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
105        self.execution_view.update(cx, |execution_view, cx| {
106            execution_view.push_message(&message.content, cx);
107        });
108    }
109
110    fn create_output_area_renderer(
111        execution_view: View<ExecutionView>,
112        on_close: CloseBlockFn,
113    ) -> RenderBlock {
114        let render = move |cx: &mut BlockContext| {
115            let execution_view = execution_view.clone();
116            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118
119            let gutter = cx.gutter_dimensions;
120            let close_button_size = IconSize::XSmall;
121
122            let block_id = cx.block_id;
123            let on_close = on_close.clone();
124
125            let rem_size = cx.rem_size();
126            let line_height = cx.text_style().line_height_in_pixels(rem_size);
127
128            let (close_button_width, close_button_padding) =
129                close_button_size.square_components(cx);
130
131            div()
132                .min_h(line_height)
133                .flex()
134                .flex_row()
135                .items_start()
136                .w_full()
137                .bg(cx.theme().colors().background)
138                .border_y_1()
139                .border_color(cx.theme().colors().border)
140                .child(
141                    v_flex().min_h(cx.line_height()).justify_center().child(
142                        h_flex()
143                            .w(gutter.full_width())
144                            .justify_end()
145                            .pt(line_height / 2.)
146                            .child(
147                                h_flex()
148                                    .pr(gutter.width / 2. - close_button_width
149                                        + close_button_padding / 2.)
150                                    .child(
151                                        IconButton::new(
152                                            ("close_output_area", EntityId::from(cx.block_id)),
153                                            IconName::Close,
154                                        )
155                                        .shape(IconButtonShape::Square)
156                                        .icon_size(close_button_size)
157                                        .icon_color(Color::Muted)
158                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
159                                        .on_click(
160                                            move |_, cx| {
161                                                if let BlockId::Custom(block_id) = block_id {
162                                                    (on_close)(block_id, cx)
163                                                }
164                                            },
165                                        ),
166                                    ),
167                            ),
168                    ),
169                )
170                .child(
171                    div()
172                        .flex_1()
173                        .size_full()
174                        .my_2()
175                        .mr(gutter.width)
176                        .text_size(text_font_size)
177                        .font_family(text_font)
178                        .child(execution_view),
179                )
180                .into_any_element()
181        };
182
183        Box::new(render)
184    }
185}
186
187impl Session {
188    pub fn new(
189        editor: WeakView<Editor>,
190        fs: Arc<dyn Fs>,
191        telemetry: Arc<Telemetry>,
192        kernel_specification: KernelSpecification,
193        cx: &mut ViewContext<Self>,
194    ) -> Self {
195        let kernel_language = kernel_specification.kernelspec.language.clone();
196
197        telemetry.report_repl_event(
198            kernel_language.clone(),
199            KernelStatus::Starting.to_string(),
200            cx.entity_id().to_string(),
201        );
202
203        let entity_id = editor.entity_id();
204        let working_directory = editor
205            .upgrade()
206            .and_then(|editor| editor.read(cx).working_directory(cx))
207            .unwrap_or_else(temp_dir);
208        let kernel = RunningKernel::new(
209            kernel_specification.clone(),
210            entity_id,
211            working_directory,
212            fs.clone(),
213            cx,
214        );
215
216        let pending_kernel = cx
217            .spawn(|this, mut cx| async move {
218                let kernel = kernel.await;
219
220                match kernel {
221                    Ok((mut kernel, mut messages_rx)) => {
222                        this.update(&mut cx, |session, cx| {
223                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
224
225                            let stderr = kernel.process.stderr.take();
226
227                            cx.spawn(|_session, mut _cx| async move {
228                                if let None = stderr {
229                                    return;
230                                }
231                                let reader = BufReader::new(stderr.unwrap());
232                                let mut lines = reader.lines();
233                                while let Some(Ok(line)) = lines.next().await {
234                                    log::error!("kernel: {}", line);
235                                }
236                            })
237                            .detach();
238
239                            let stdout = kernel.process.stderr.take();
240
241                            cx.spawn(|_session, mut _cx| async move {
242                                if let None = stdout {
243                                    return;
244                                }
245                                let reader = BufReader::new(stdout.unwrap());
246                                let mut lines = reader.lines();
247                                while let Some(Ok(line)) = lines.next().await {
248                                    log::info!("kernel: {}", line);
249                                }
250                            })
251                            .detach();
252
253                            let status = kernel.process.status();
254                            session.kernel(Kernel::RunningKernel(kernel), cx);
255
256                            cx.spawn(|session, mut cx| async move {
257                                let error_message = match status.await {
258                                    Ok(status) => {
259                                        if status.success() {
260                                            log::info!("kernel process exited successfully");
261                                            return;
262                                        }
263
264                                        format!("kernel process exited with status: {:?}", status)
265                                    }
266                                    Err(err) => {
267                                        format!("kernel process exited with error: {:?}", err)
268                                    }
269                                };
270
271                                log::error!("{}", error_message);
272
273                                session
274                                    .update(&mut cx, |session, cx| {
275                                        session.kernel(
276                                            Kernel::ErroredLaunch(error_message.clone()),
277                                            cx,
278                                        );
279
280                                        session.blocks.values().for_each(|block| {
281                                            block.execution_view.update(
282                                                cx,
283                                                |execution_view, cx| {
284                                                    match execution_view.status {
285                                                        ExecutionStatus::Finished => {
286                                                            // Do nothing when the output was good
287                                                        }
288                                                        _ => {
289                                                            // All other cases, set the status to errored
290                                                            execution_view.status =
291                                                                ExecutionStatus::KernelErrored(
292                                                                    error_message.clone(),
293                                                                )
294                                                        }
295                                                    }
296                                                    cx.notify();
297                                                },
298                                            );
299                                        });
300
301                                        cx.notify();
302                                    })
303                                    .ok();
304                            })
305                            .detach();
306
307                            session.messaging_task = cx.spawn(|session, mut cx| async move {
308                                while let Some(message) = messages_rx.next().await {
309                                    session
310                                        .update(&mut cx, |session, cx| {
311                                            session.route(&message, cx);
312                                        })
313                                        .ok();
314                                }
315                            });
316
317                            // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
318                            // cx.spawn(|this, mut cx| async move {
319                            //     cx.background_executor()
320                            //         .timer(Duration::from_millis(120))
321                            //         .await;
322                            //     this.update(&mut cx, |this, cx| {
323                            //         this.send(KernelInfoRequest {}.into(), cx).ok();
324                            //     })
325                            //     .ok();
326                            // })
327                            // .detach();
328                        })
329                        .ok();
330                    }
331                    Err(err) => {
332                        this.update(&mut cx, |session, cx| {
333                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
334                        })
335                        .ok();
336                    }
337                }
338            })
339            .shared();
340
341        let subscription = match editor.upgrade() {
342            Some(editor) => {
343                let buffer = editor.read(cx).buffer().clone();
344                cx.subscribe(&buffer, Self::on_buffer_event)
345            }
346            None => Subscription::new(|| {}),
347        };
348
349        return Self {
350            editor,
351            kernel: Kernel::StartingKernel(pending_kernel),
352            messaging_task: Task::ready(()),
353            blocks: HashMap::default(),
354            kernel_specification,
355            _buffer_subscription: subscription,
356            telemetry,
357        };
358    }
359
360    fn on_buffer_event(
361        &mut self,
362        buffer: Model<MultiBuffer>,
363        event: &multi_buffer::Event,
364        cx: &mut ViewContext<Self>,
365    ) {
366        if let multi_buffer::Event::Edited { .. } = event {
367            let snapshot = buffer.read(cx).snapshot(cx);
368
369            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
370
371            self.blocks.retain(|_id, block| {
372                if block.invalidation_anchor.is_valid(&snapshot) {
373                    true
374                } else {
375                    blocks_to_remove.insert(block.block_id);
376                    false
377                }
378            });
379
380            if !blocks_to_remove.is_empty() {
381                self.editor
382                    .update(cx, |editor, cx| {
383                        editor.remove_blocks(blocks_to_remove, None, cx);
384                    })
385                    .ok();
386                cx.notify();
387            }
388        }
389    }
390
391    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
392        match &mut self.kernel {
393            Kernel::RunningKernel(kernel) => {
394                kernel.request_tx.try_send(message).ok();
395            }
396            _ => {}
397        }
398
399        anyhow::Ok(())
400    }
401
402    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
403        let blocks_to_remove: HashSet<CustomBlockId> =
404            self.blocks.values().map(|block| block.block_id).collect();
405
406        self.editor
407            .update(cx, |editor, cx| {
408                editor.remove_blocks(blocks_to_remove, None, cx);
409            })
410            .ok();
411
412        self.blocks.clear();
413    }
414
415    pub fn execute(
416        &mut self,
417        code: String,
418        anchor_range: Range<Anchor>,
419        next_cell: Option<Anchor>,
420        cx: &mut ViewContext<Self>,
421    ) {
422        let Some(editor) = self.editor.upgrade() else {
423            return;
424        };
425
426        if code.is_empty() {
427            return;
428        }
429
430        let execute_request = ExecuteRequest {
431            code,
432            ..ExecuteRequest::default()
433        };
434
435        let message: JupyterMessage = execute_request.into();
436
437        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
438
439        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
440
441        self.blocks.retain(|_key, block| {
442            if anchor_range.overlaps(&block.code_range, &buffer) {
443                blocks_to_remove.insert(block.block_id);
444                false
445            } else {
446                true
447            }
448        });
449
450        self.editor
451            .update(cx, |editor, cx| {
452                editor.remove_blocks(blocks_to_remove, None, cx);
453            })
454            .ok();
455
456        let status = match &self.kernel {
457            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
458            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
459            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
460            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
461            Kernel::Shutdown => ExecutionStatus::Shutdown,
462        };
463
464        let parent_message_id = message.header.msg_id.clone();
465        let session_view = cx.view().downgrade();
466        let weak_editor = self.editor.clone();
467
468        let on_close: CloseBlockFn =
469            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
470                if let Some(session) = session_view.upgrade() {
471                    session.update(cx, |session, cx| {
472                        session.blocks.remove(&parent_message_id);
473                        cx.notify();
474                    });
475                }
476
477                if let Some(editor) = weak_editor.upgrade() {
478                    editor.update(cx, |editor, cx| {
479                        let mut block_ids = HashSet::default();
480                        block_ids.insert(block_id);
481                        editor.remove_blocks(block_ids, None, cx);
482                    });
483                }
484            });
485
486        let Ok(editor_block) =
487            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
488        else {
489            return;
490        };
491
492        let new_cursor_pos = if let Some(next_cursor) = next_cell {
493            next_cursor
494        } else {
495            editor_block.invalidation_anchor
496        };
497
498        self.blocks
499            .insert(message.header.msg_id.clone(), editor_block);
500
501        match &self.kernel {
502            Kernel::RunningKernel(_) => {
503                self.send(message, cx).ok();
504            }
505            Kernel::StartingKernel(task) => {
506                // Queue up the execution as a task to run after the kernel starts
507                let task = task.clone();
508                let message = message.clone();
509
510                cx.spawn(|this, mut cx| async move {
511                    task.await;
512                    this.update(&mut cx, |session, cx| {
513                        session.send(message, cx).ok();
514                    })
515                    .ok();
516                })
517                .detach();
518            }
519            _ => {}
520        }
521
522        // Now move the cursor to after the block
523        editor.update(cx, move |editor, cx| {
524            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
525                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
526            });
527        });
528    }
529
530    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
531        let parent_message_id = match message.parent_header.as_ref() {
532            Some(header) => &header.msg_id,
533            None => return,
534        };
535
536        match &message.content {
537            JupyterMessageContent::Status(status) => {
538                self.kernel.set_execution_state(&status.execution_state);
539
540                self.telemetry.report_repl_event(
541                    self.kernel_specification.kernelspec.language.clone(),
542                    KernelStatus::from(&self.kernel).to_string(),
543                    cx.entity_id().to_string(),
544                );
545
546                cx.notify();
547            }
548            JupyterMessageContent::KernelInfoReply(reply) => {
549                self.kernel.set_kernel_info(&reply);
550                cx.notify();
551            }
552            JupyterMessageContent::UpdateDisplayData(update) => {
553                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
554                    display_id
555                } else {
556                    return;
557                };
558
559                self.blocks.iter_mut().for_each(|(_, block)| {
560                    block.execution_view.update(cx, |execution_view, cx| {
561                        execution_view.update_display_data(&update.data, &display_id, cx);
562                    });
563                });
564                return;
565            }
566            _ => {}
567        }
568
569        if let Some(block) = self.blocks.get_mut(parent_message_id) {
570            block.handle_message(&message, cx);
571            return;
572        }
573    }
574
575    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
576        match &mut self.kernel {
577            Kernel::RunningKernel(_kernel) => {
578                self.send(InterruptRequest {}.into(), cx).ok();
579            }
580            Kernel::StartingKernel(_task) => {
581                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
582            }
583            _ => {}
584        }
585    }
586
587    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
588        if let Kernel::Shutdown = kernel {
589            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
590        }
591
592        let kernel_status = KernelStatus::from(&kernel).to_string();
593        let kernel_language = self.kernel_specification.kernelspec.language.clone();
594
595        self.telemetry.report_repl_event(
596            kernel_language,
597            kernel_status,
598            cx.entity_id().to_string(),
599        );
600
601        self.kernel = kernel;
602    }
603
604    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
605        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
606
607        match kernel {
608            Kernel::RunningKernel(mut kernel) => {
609                let mut request_tx = kernel.request_tx.clone();
610
611                cx.spawn(|this, mut cx| async move {
612                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
613                    request_tx.try_send(message).ok();
614
615                    // Give the kernel a bit of time to clean up
616                    cx.background_executor().timer(Duration::from_secs(3)).await;
617
618                    kernel.process.kill().ok();
619
620                    this.update(&mut cx, |session, cx| {
621                        session.clear_outputs(cx);
622                        session.kernel(Kernel::Shutdown, cx);
623                        cx.notify();
624                    })
625                    .ok();
626                })
627                .detach();
628            }
629            Kernel::StartingKernel(_kernel) => {
630                self.kernel = Kernel::Shutdown;
631            }
632            _ => {
633                self.kernel = Kernel::Shutdown;
634            }
635        }
636        cx.notify();
637    }
638}
639
640pub enum SessionEvent {
641    Shutdown(WeakView<Editor>),
642}
643
644impl EventEmitter<SessionEvent> for Session {}
645
646impl Render for Session {
647    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
648        let (status_text, interrupt_button) = match &self.kernel {
649            Kernel::RunningKernel(kernel) => (
650                kernel
651                    .kernel_info
652                    .as_ref()
653                    .map(|info| info.language_info.name.clone()),
654                Some(
655                    Button::new("interrupt", "Interrupt")
656                        .style(ButtonStyle::Subtle)
657                        .on_click(cx.listener(move |session, _, cx| {
658                            session.interrupt(cx);
659                        })),
660                ),
661            ),
662            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
663            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
664            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
665            Kernel::Shutdown => (Some("Shutdown".into()), None),
666        };
667
668        KernelListItem::new(self.kernel_specification.clone())
669            .status_color(match &self.kernel {
670                Kernel::RunningKernel(kernel) => match kernel.execution_state {
671                    ExecutionState::Idle => Color::Success,
672                    ExecutionState::Busy => Color::Modified,
673                },
674                Kernel::StartingKernel(_) => Color::Modified,
675                Kernel::ErroredLaunch(_) => Color::Error,
676                Kernel::ShuttingDown => Color::Modified,
677                Kernel::Shutdown => Color::Disabled,
678            })
679            .child(Label::new(self.kernel_specification.name.clone()))
680            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
681            .button(
682                Button::new("shutdown", "Shutdown")
683                    .style(ButtonStyle::Subtle)
684                    .disabled(self.kernel.is_shutting_down())
685                    .on_click(cx.listener(move |session, _, cx| {
686                        session.shutdown(cx);
687                    })),
688            )
689            .buttons(interrupt_button)
690    }
691}