session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use settings::Settings as _;
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::{ActiveTheme, ThemeSettings};
 32use ui::{prelude::*, IconButtonShape, Tooltip};
 33
 34pub struct Session {
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Task<()>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: View<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakView<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut ViewContext<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 63
 64        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 65            let buffer = editor.buffer().clone();
 66            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 67            let end_point = code_range.end.to_point(&buffer_snapshot);
 68            let next_row_start = end_point + Point::new(1, 0);
 69            if next_row_start > buffer_snapshot.max_point() {
 70                buffer.update(cx, |buffer, cx| {
 71                    buffer.edit(
 72                        [(
 73                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 74                            "\n",
 75                        )],
 76                        None,
 77                        cx,
 78                    )
 79                });
 80            }
 81
 82            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 83            let block = BlockProperties {
 84                position: code_range.end,
 85                height: (execution_view.num_lines(cx) + 1) as u32,
 86                style: BlockStyle::Sticky,
 87                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 88                disposition: BlockDisposition::Below,
 89            };
 90
 91            let block_id = editor.insert_blocks([block], None, cx)[0];
 92            (block_id, invalidation_anchor)
 93        })?;
 94
 95        anyhow::Ok(Self {
 96            code_range,
 97            invalidation_anchor,
 98            block_id,
 99            execution_view,
100        })
101    }
102
103    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104        self.execution_view.update(cx, |execution_view, cx| {
105            execution_view.push_message(&message.content, cx);
106        });
107    }
108
109    fn create_output_area_renderer(
110        execution_view: View<ExecutionView>,
111        on_close: CloseBlockFn,
112    ) -> RenderBlock {
113        let render = move |cx: &mut BlockContext| {
114            let execution_view = execution_view.clone();
115            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
116            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
117
118            let gutter = cx.gutter_dimensions;
119            let close_button_size = IconSize::XSmall;
120
121            let block_id = cx.block_id;
122            let on_close = on_close.clone();
123
124            let rem_size = cx.rem_size();
125            let line_height = cx.text_style().line_height_in_pixels(rem_size);
126
127            let (close_button_width, close_button_padding) =
128                close_button_size.square_components(cx);
129
130            div()
131                .min_h(line_height)
132                .flex()
133                .flex_row()
134                .items_start()
135                .w_full()
136                .bg(cx.theme().colors().background)
137                .border_y_1()
138                .border_color(cx.theme().colors().border)
139                .child(
140                    v_flex().min_h(cx.line_height()).justify_center().child(
141                        h_flex()
142                            .w(gutter.full_width())
143                            .justify_end()
144                            .pt(line_height / 2.)
145                            .child(
146                                h_flex()
147                                    .pr(gutter.width / 2. - close_button_width
148                                        + close_button_padding / 2.)
149                                    .child(
150                                        IconButton::new(
151                                            ("close_output_area", EntityId::from(cx.block_id)),
152                                            IconName::Close,
153                                        )
154                                        .shape(IconButtonShape::Square)
155                                        .icon_size(close_button_size)
156                                        .icon_color(Color::Muted)
157                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
158                                        .on_click(
159                                            move |_, cx| {
160                                                if let BlockId::Custom(block_id) = block_id {
161                                                    (on_close)(block_id, cx)
162                                                }
163                                            },
164                                        ),
165                                    ),
166                            ),
167                    ),
168                )
169                .child(
170                    div()
171                        .flex_1()
172                        .size_full()
173                        .my_2()
174                        .mr(gutter.width)
175                        .text_size(text_font_size)
176                        .font_family(text_font)
177                        .child(execution_view),
178                )
179                .into_any_element()
180        };
181
182        Box::new(render)
183    }
184}
185
186impl Session {
187    pub fn new(
188        editor: WeakView<Editor>,
189        fs: Arc<dyn Fs>,
190        telemetry: Arc<Telemetry>,
191        kernel_specification: KernelSpecification,
192        cx: &mut ViewContext<Self>,
193    ) -> Self {
194        let kernel_language = kernel_specification.kernelspec.language.clone();
195
196        telemetry.report_repl_event(
197            kernel_language.clone(),
198            KernelStatus::Starting.to_string(),
199            cx.entity_id().to_string(),
200        );
201
202        let entity_id = editor.entity_id();
203        let working_directory = editor
204            .upgrade()
205            .and_then(|editor| editor.read(cx).working_directory(cx))
206            .unwrap_or_else(temp_dir);
207        let kernel = RunningKernel::new(
208            kernel_specification.clone(),
209            entity_id,
210            working_directory,
211            fs.clone(),
212            cx,
213        );
214
215        let pending_kernel = cx
216            .spawn(|this, mut cx| async move {
217                let kernel = kernel.await;
218
219                match kernel {
220                    Ok((mut kernel, mut messages_rx)) => {
221                        this.update(&mut cx, |session, cx| {
222                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
223
224                            let stderr = kernel.process.stderr.take();
225
226                            cx.spawn(|_session, mut _cx| async move {
227                                if let None = stderr {
228                                    return;
229                                }
230                                let reader = BufReader::new(stderr.unwrap());
231                                let mut lines = reader.lines();
232                                while let Some(Ok(line)) = lines.next().await {
233                                    log::error!("kernel: {}", line);
234                                }
235                            })
236                            .detach();
237
238                            let stdout = kernel.process.stderr.take();
239
240                            cx.spawn(|_session, mut _cx| async move {
241                                if let None = stdout {
242                                    return;
243                                }
244                                let reader = BufReader::new(stdout.unwrap());
245                                let mut lines = reader.lines();
246                                while let Some(Ok(line)) = lines.next().await {
247                                    log::info!("kernel: {}", line);
248                                }
249                            })
250                            .detach();
251
252                            let status = kernel.process.status();
253                            session.kernel(Kernel::RunningKernel(kernel), cx);
254
255                            cx.spawn(|session, mut cx| async move {
256                                let error_message = match status.await {
257                                    Ok(status) => {
258                                        if status.success() {
259                                            log::info!("kernel process exited successfully");
260                                            return;
261                                        }
262
263                                        format!("kernel process exited with status: {:?}", status)
264                                    }
265                                    Err(err) => {
266                                        format!("kernel process exited with error: {:?}", err)
267                                    }
268                                };
269
270                                log::error!("{}", error_message);
271
272                                session
273                                    .update(&mut cx, |session, cx| {
274                                        session.kernel(
275                                            Kernel::ErroredLaunch(error_message.clone()),
276                                            cx,
277                                        );
278
279                                        session.blocks.values().for_each(|block| {
280                                            block.execution_view.update(
281                                                cx,
282                                                |execution_view, cx| {
283                                                    match execution_view.status {
284                                                        ExecutionStatus::Finished => {
285                                                            // Do nothing when the output was good
286                                                        }
287                                                        _ => {
288                                                            // All other cases, set the status to errored
289                                                            execution_view.status =
290                                                                ExecutionStatus::KernelErrored(
291                                                                    error_message.clone(),
292                                                                )
293                                                        }
294                                                    }
295                                                    cx.notify();
296                                                },
297                                            );
298                                        });
299
300                                        cx.notify();
301                                    })
302                                    .ok();
303                            })
304                            .detach();
305
306                            session.messaging_task = cx.spawn(|session, mut cx| async move {
307                                while let Some(message) = messages_rx.next().await {
308                                    session
309                                        .update(&mut cx, |session, cx| {
310                                            session.route(&message, cx);
311                                        })
312                                        .ok();
313                                }
314                            });
315
316                            // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
317                            // cx.spawn(|this, mut cx| async move {
318                            //     cx.background_executor()
319                            //         .timer(Duration::from_millis(120))
320                            //         .await;
321                            //     this.update(&mut cx, |this, cx| {
322                            //         this.send(KernelInfoRequest {}.into(), cx).ok();
323                            //     })
324                            //     .ok();
325                            // })
326                            // .detach();
327                        })
328                        .ok();
329                    }
330                    Err(err) => {
331                        this.update(&mut cx, |session, cx| {
332                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
333                        })
334                        .ok();
335                    }
336                }
337            })
338            .shared();
339
340        let subscription = match editor.upgrade() {
341            Some(editor) => {
342                let buffer = editor.read(cx).buffer().clone();
343                cx.subscribe(&buffer, Self::on_buffer_event)
344            }
345            None => Subscription::new(|| {}),
346        };
347
348        return Self {
349            editor,
350            kernel: Kernel::StartingKernel(pending_kernel),
351            messaging_task: Task::ready(()),
352            blocks: HashMap::default(),
353            kernel_specification,
354            _buffer_subscription: subscription,
355            telemetry,
356        };
357    }
358
359    fn on_buffer_event(
360        &mut self,
361        buffer: Model<MultiBuffer>,
362        event: &multi_buffer::Event,
363        cx: &mut ViewContext<Self>,
364    ) {
365        if let multi_buffer::Event::Edited { .. } = event {
366            let snapshot = buffer.read(cx).snapshot(cx);
367
368            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
369
370            self.blocks.retain(|_id, block| {
371                if block.invalidation_anchor.is_valid(&snapshot) {
372                    true
373                } else {
374                    blocks_to_remove.insert(block.block_id);
375                    false
376                }
377            });
378
379            if !blocks_to_remove.is_empty() {
380                self.editor
381                    .update(cx, |editor, cx| {
382                        editor.remove_blocks(blocks_to_remove, None, cx);
383                    })
384                    .ok();
385                cx.notify();
386            }
387        }
388    }
389
390    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
391        match &mut self.kernel {
392            Kernel::RunningKernel(kernel) => {
393                kernel.request_tx.try_send(message).ok();
394            }
395            _ => {}
396        }
397
398        anyhow::Ok(())
399    }
400
401    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
402        let blocks_to_remove: HashSet<CustomBlockId> =
403            self.blocks.values().map(|block| block.block_id).collect();
404
405        self.editor
406            .update(cx, |editor, cx| {
407                editor.remove_blocks(blocks_to_remove, None, cx);
408            })
409            .ok();
410
411        self.blocks.clear();
412    }
413
414    pub fn execute(
415        &mut self,
416        code: String,
417        anchor_range: Range<Anchor>,
418        next_cell: Option<Anchor>,
419        cx: &mut ViewContext<Self>,
420    ) {
421        let Some(editor) = self.editor.upgrade() else {
422            return;
423        };
424
425        if code.is_empty() {
426            return;
427        }
428
429        let execute_request = ExecuteRequest {
430            code,
431            ..ExecuteRequest::default()
432        };
433
434        let message: JupyterMessage = execute_request.into();
435
436        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
437
438        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
439
440        self.blocks.retain(|_key, block| {
441            if anchor_range.overlaps(&block.code_range, &buffer) {
442                blocks_to_remove.insert(block.block_id);
443                false
444            } else {
445                true
446            }
447        });
448
449        self.editor
450            .update(cx, |editor, cx| {
451                editor.remove_blocks(blocks_to_remove, None, cx);
452            })
453            .ok();
454
455        let status = match &self.kernel {
456            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
457            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
458            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
459            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
460            Kernel::Shutdown => ExecutionStatus::Shutdown,
461        };
462
463        let parent_message_id = message.header.msg_id.clone();
464        let session_view = cx.view().downgrade();
465        let weak_editor = self.editor.clone();
466
467        let on_close: CloseBlockFn =
468            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
469                if let Some(session) = session_view.upgrade() {
470                    session.update(cx, |session, cx| {
471                        session.blocks.remove(&parent_message_id);
472                        cx.notify();
473                    });
474                }
475
476                if let Some(editor) = weak_editor.upgrade() {
477                    editor.update(cx, |editor, cx| {
478                        let mut block_ids = HashSet::default();
479                        block_ids.insert(block_id);
480                        editor.remove_blocks(block_ids, None, cx);
481                    });
482                }
483            });
484
485        let Ok(editor_block) =
486            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
487        else {
488            return;
489        };
490
491        let new_cursor_pos = if let Some(next_cursor) = next_cell {
492            next_cursor
493        } else {
494            editor_block.invalidation_anchor
495        };
496
497        self.blocks
498            .insert(message.header.msg_id.clone(), editor_block);
499
500        match &self.kernel {
501            Kernel::RunningKernel(_) => {
502                self.send(message, cx).ok();
503            }
504            Kernel::StartingKernel(task) => {
505                // Queue up the execution as a task to run after the kernel starts
506                let task = task.clone();
507                let message = message.clone();
508
509                cx.spawn(|this, mut cx| async move {
510                    task.await;
511                    this.update(&mut cx, |session, cx| {
512                        session.send(message, cx).ok();
513                    })
514                    .ok();
515                })
516                .detach();
517            }
518            _ => {}
519        }
520
521        // Now move the cursor to after the block
522        editor.update(cx, move |editor, cx| {
523            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
524                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
525            });
526        });
527    }
528
529    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
530        let parent_message_id = match message.parent_header.as_ref() {
531            Some(header) => &header.msg_id,
532            None => return,
533        };
534
535        match &message.content {
536            JupyterMessageContent::Status(status) => {
537                self.kernel.set_execution_state(&status.execution_state);
538
539                self.telemetry.report_repl_event(
540                    self.kernel_specification.kernelspec.language.clone(),
541                    KernelStatus::from(&self.kernel).to_string(),
542                    cx.entity_id().to_string(),
543                );
544
545                cx.notify();
546            }
547            JupyterMessageContent::KernelInfoReply(reply) => {
548                self.kernel.set_kernel_info(&reply);
549                cx.notify();
550            }
551            _ => {}
552        }
553
554        if let Some(block) = self.blocks.get_mut(parent_message_id) {
555            block.handle_message(&message, cx);
556            return;
557        }
558    }
559
560    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
561        match &mut self.kernel {
562            Kernel::RunningKernel(_kernel) => {
563                self.send(InterruptRequest {}.into(), cx).ok();
564            }
565            Kernel::StartingKernel(_task) => {
566                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
567            }
568            _ => {}
569        }
570    }
571
572    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
573        if let Kernel::Shutdown = kernel {
574            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
575        }
576
577        let kernel_status = KernelStatus::from(&kernel).to_string();
578        let kernel_language = self.kernel_specification.kernelspec.language.clone();
579
580        self.telemetry.report_repl_event(
581            kernel_language,
582            kernel_status,
583            cx.entity_id().to_string(),
584        );
585
586        self.kernel = kernel;
587    }
588
589    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
590        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
591
592        match kernel {
593            Kernel::RunningKernel(mut kernel) => {
594                let mut request_tx = kernel.request_tx.clone();
595
596                cx.spawn(|this, mut cx| async move {
597                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
598                    request_tx.try_send(message).ok();
599
600                    // Give the kernel a bit of time to clean up
601                    cx.background_executor().timer(Duration::from_secs(3)).await;
602
603                    kernel.process.kill().ok();
604
605                    this.update(&mut cx, |session, cx| {
606                        session.clear_outputs(cx);
607                        session.kernel(Kernel::Shutdown, cx);
608                        cx.notify();
609                    })
610                    .ok();
611                })
612                .detach();
613            }
614            Kernel::StartingKernel(_kernel) => {
615                self.kernel = Kernel::Shutdown;
616            }
617            _ => {
618                self.kernel = Kernel::Shutdown;
619            }
620        }
621        cx.notify();
622    }
623}
624
625pub enum SessionEvent {
626    Shutdown(WeakView<Editor>),
627}
628
629impl EventEmitter<SessionEvent> for Session {}
630
631impl Render for Session {
632    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
633        let (status_text, interrupt_button) = match &self.kernel {
634            Kernel::RunningKernel(kernel) => (
635                kernel
636                    .kernel_info
637                    .as_ref()
638                    .map(|info| info.language_info.name.clone()),
639                Some(
640                    Button::new("interrupt", "Interrupt")
641                        .style(ButtonStyle::Subtle)
642                        .on_click(cx.listener(move |session, _, cx| {
643                            session.interrupt(cx);
644                        })),
645                ),
646            ),
647            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
648            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
649            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
650            Kernel::Shutdown => (Some("Shutdown".into()), None),
651        };
652
653        KernelListItem::new(self.kernel_specification.clone())
654            .status_color(match &self.kernel {
655                Kernel::RunningKernel(kernel) => match kernel.execution_state {
656                    ExecutionState::Idle => Color::Success,
657                    ExecutionState::Busy => Color::Modified,
658                },
659                Kernel::StartingKernel(_) => Color::Modified,
660                Kernel::ErroredLaunch(_) => Color::Error,
661                Kernel::ShuttingDown => Color::Modified,
662                Kernel::Shutdown => Color::Disabled,
663            })
664            .child(Label::new(self.kernel_specification.name.clone()))
665            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
666            .button(
667                Button::new("shutdown", "Shutdown")
668                    .style(ButtonStyle::Subtle)
669                    .disabled(self.kernel.is_shutting_down())
670                    .on_click(cx.listener(move |session, _, cx| {
671                        session.shutdown(cx);
672                    })),
673            )
674            .buttons(interrupt_button)
675    }
676}