session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use settings::Settings as _;
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::{ActiveTheme, ThemeSettings};
 32use ui::{prelude::*, IconButtonShape, Tooltip};
 33
 34pub struct Session {
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Task<()>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    code_range: Range<Anchor>,
 46    invalidation_anchor: Anchor,
 47    block_id: CustomBlockId,
 48    execution_view: View<ExecutionView>,
 49}
 50
 51type CloseBlockFn =
 52    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 53
 54impl EditorBlock {
 55    fn new(
 56        editor: WeakView<Editor>,
 57        code_range: Range<Anchor>,
 58        status: ExecutionStatus,
 59        on_close: CloseBlockFn,
 60        cx: &mut ViewContext<Session>,
 61    ) -> anyhow::Result<Self> {
 62        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 63
 64        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 65            let buffer = editor.buffer().clone();
 66            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 67            let end_point = code_range.end.to_point(&buffer_snapshot);
 68            let next_row_start = end_point + Point::new(1, 0);
 69            if next_row_start > buffer_snapshot.max_point() {
 70                buffer.update(cx, |buffer, cx| {
 71                    buffer.edit(
 72                        [(
 73                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 74                            "\n",
 75                        )],
 76                        None,
 77                        cx,
 78                    )
 79                });
 80            }
 81
 82            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 83            let block = BlockProperties {
 84                position: code_range.end,
 85                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 86                height: 1,
 87                style: BlockStyle::Sticky,
 88                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 89                disposition: BlockDisposition::Below,
 90                priority: 0,
 91            };
 92
 93            let block_id = editor.insert_blocks([block], None, cx)[0];
 94            (block_id, invalidation_anchor)
 95        })?;
 96
 97        anyhow::Ok(Self {
 98            code_range,
 99            invalidation_anchor,
100            block_id,
101            execution_view,
102        })
103    }
104
105    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
106        self.execution_view.update(cx, |execution_view, cx| {
107            execution_view.push_message(&message.content, cx);
108        });
109    }
110
111    fn create_output_area_renderer(
112        execution_view: View<ExecutionView>,
113        on_close: CloseBlockFn,
114    ) -> RenderBlock {
115        let render = move |cx: &mut BlockContext| {
116            let execution_view = execution_view.clone();
117            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
118            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
119
120            let gutter = cx.gutter_dimensions;
121            let close_button_size = IconSize::XSmall;
122
123            let block_id = cx.block_id;
124            let on_close = on_close.clone();
125
126            let rem_size = cx.rem_size();
127            let line_height = cx.text_style().line_height_in_pixels(rem_size);
128
129            let (close_button_width, close_button_padding) =
130                close_button_size.square_components(cx);
131
132            div()
133                .min_h(line_height)
134                .flex()
135                .flex_row()
136                .items_start()
137                .w_full()
138                .bg(cx.theme().colors().background)
139                .border_y_1()
140                .border_color(cx.theme().colors().border)
141                .child(
142                    v_flex().min_h(cx.line_height()).justify_center().child(
143                        h_flex()
144                            .w(gutter.full_width())
145                            .justify_end()
146                            .pt(line_height / 2.)
147                            .child(
148                                h_flex()
149                                    .pr(gutter.width / 2. - close_button_width
150                                        + close_button_padding / 2.)
151                                    .child(
152                                        IconButton::new(
153                                            ("close_output_area", EntityId::from(cx.block_id)),
154                                            IconName::Close,
155                                        )
156                                        .shape(IconButtonShape::Square)
157                                        .icon_size(close_button_size)
158                                        .icon_color(Color::Muted)
159                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
160                                        .on_click(
161                                            move |_, cx| {
162                                                if let BlockId::Custom(block_id) = block_id {
163                                                    (on_close)(block_id, cx)
164                                                }
165                                            },
166                                        ),
167                                    ),
168                            ),
169                    ),
170                )
171                .child(
172                    div()
173                        .flex_1()
174                        .size_full()
175                        .my_2()
176                        .mr(gutter.width)
177                        .text_size(text_font_size)
178                        .font_family(text_font)
179                        .child(execution_view),
180                )
181                .into_any_element()
182        };
183
184        Box::new(render)
185    }
186}
187
188impl Session {
189    pub fn new(
190        editor: WeakView<Editor>,
191        fs: Arc<dyn Fs>,
192        telemetry: Arc<Telemetry>,
193        kernel_specification: KernelSpecification,
194        cx: &mut ViewContext<Self>,
195    ) -> Self {
196        let kernel_language = kernel_specification.kernelspec.language.clone();
197
198        telemetry.report_repl_event(
199            kernel_language.clone(),
200            KernelStatus::Starting.to_string(),
201            cx.entity_id().to_string(),
202        );
203
204        let entity_id = editor.entity_id();
205        let working_directory = editor
206            .upgrade()
207            .and_then(|editor| editor.read(cx).working_directory(cx))
208            .unwrap_or_else(temp_dir);
209        let kernel = RunningKernel::new(
210            kernel_specification.clone(),
211            entity_id,
212            working_directory,
213            fs.clone(),
214            cx,
215        );
216
217        let pending_kernel = cx
218            .spawn(|this, mut cx| async move {
219                let kernel = kernel.await;
220
221                match kernel {
222                    Ok((mut kernel, mut messages_rx)) => {
223                        this.update(&mut cx, |session, cx| {
224                            let stderr = kernel.process.stderr.take();
225
226                            cx.spawn(|_session, mut _cx| async move {
227                                if let None = stderr {
228                                    return;
229                                }
230                                let reader = BufReader::new(stderr.unwrap());
231                                let mut lines = reader.lines();
232                                while let Some(Ok(line)) = lines.next().await {
233                                    log::error!("kernel: {}", line);
234                                }
235                            })
236                            .detach();
237
238                            let stdout = kernel.process.stdout.take();
239
240                            cx.spawn(|_session, mut _cx| async move {
241                                if let None = stdout {
242                                    return;
243                                }
244                                let reader = BufReader::new(stdout.unwrap());
245                                let mut lines = reader.lines();
246                                while let Some(Ok(line)) = lines.next().await {
247                                    log::info!("kernel: {}", line);
248                                }
249                            })
250                            .detach();
251
252                            let status = kernel.process.status();
253                            session.kernel(Kernel::RunningKernel(kernel), cx);
254
255                            cx.spawn(|session, mut cx| async move {
256                                let error_message = match status.await {
257                                    Ok(status) => {
258                                        if status.success() {
259                                            log::info!("kernel process exited successfully");
260                                            return;
261                                        }
262
263                                        format!("kernel process exited with status: {:?}", status)
264                                    }
265                                    Err(err) => {
266                                        format!("kernel process exited with error: {:?}", err)
267                                    }
268                                };
269
270                                log::error!("{}", error_message);
271
272                                session
273                                    .update(&mut cx, |session, cx| {
274                                        session.kernel(
275                                            Kernel::ErroredLaunch(error_message.clone()),
276                                            cx,
277                                        );
278
279                                        session.blocks.values().for_each(|block| {
280                                            block.execution_view.update(
281                                                cx,
282                                                |execution_view, cx| {
283                                                    match execution_view.status {
284                                                        ExecutionStatus::Finished => {
285                                                            // Do nothing when the output was good
286                                                        }
287                                                        _ => {
288                                                            // All other cases, set the status to errored
289                                                            execution_view.status =
290                                                                ExecutionStatus::KernelErrored(
291                                                                    error_message.clone(),
292                                                                )
293                                                        }
294                                                    }
295                                                    cx.notify();
296                                                },
297                                            );
298                                        });
299
300                                        cx.notify();
301                                    })
302                                    .ok();
303                            })
304                            .detach();
305
306                            session.messaging_task = cx.spawn(|session, mut cx| async move {
307                                while let Some(message) = messages_rx.next().await {
308                                    session
309                                        .update(&mut cx, |session, cx| {
310                                            session.route(&message, cx);
311                                        })
312                                        .ok();
313                                }
314                            });
315
316                            // todo!(@rgbkrk): send kernelinforequest once our shell channel read/writes are split
317                            // cx.spawn(|this, mut cx| async move {
318                            //     cx.background_executor()
319                            //         .timer(Duration::from_millis(120))
320                            //         .await;
321                            //     this.update(&mut cx, |this, cx| {
322                            //         this.send(KernelInfoRequest {}.into(), cx).ok();
323                            //     })
324                            //     .ok();
325                            // })
326                            // .detach();
327                        })
328                        .ok();
329                    }
330                    Err(err) => {
331                        this.update(&mut cx, |session, cx| {
332                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
333                        })
334                        .ok();
335                    }
336                }
337            })
338            .shared();
339
340        let subscription = match editor.upgrade() {
341            Some(editor) => {
342                let buffer = editor.read(cx).buffer().clone();
343                cx.subscribe(&buffer, Self::on_buffer_event)
344            }
345            None => Subscription::new(|| {}),
346        };
347
348        return Self {
349            editor,
350            kernel: Kernel::StartingKernel(pending_kernel),
351            messaging_task: Task::ready(()),
352            blocks: HashMap::default(),
353            kernel_specification,
354            _buffer_subscription: subscription,
355            telemetry,
356        };
357    }
358
359    fn on_buffer_event(
360        &mut self,
361        buffer: Model<MultiBuffer>,
362        event: &multi_buffer::Event,
363        cx: &mut ViewContext<Self>,
364    ) {
365        if let multi_buffer::Event::Edited { .. } = event {
366            let snapshot = buffer.read(cx).snapshot(cx);
367
368            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
369
370            self.blocks.retain(|_id, block| {
371                if block.invalidation_anchor.is_valid(&snapshot) {
372                    true
373                } else {
374                    blocks_to_remove.insert(block.block_id);
375                    false
376                }
377            });
378
379            if !blocks_to_remove.is_empty() {
380                self.editor
381                    .update(cx, |editor, cx| {
382                        editor.remove_blocks(blocks_to_remove, None, cx);
383                    })
384                    .ok();
385                cx.notify();
386            }
387        }
388    }
389
390    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
391        match &mut self.kernel {
392            Kernel::RunningKernel(kernel) => {
393                kernel.request_tx.try_send(message).ok();
394            }
395            _ => {}
396        }
397
398        anyhow::Ok(())
399    }
400
401    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
402        let blocks_to_remove: HashSet<CustomBlockId> =
403            self.blocks.values().map(|block| block.block_id).collect();
404
405        self.editor
406            .update(cx, |editor, cx| {
407                editor.remove_blocks(blocks_to_remove, None, cx);
408            })
409            .ok();
410
411        self.blocks.clear();
412    }
413
414    pub fn execute(
415        &mut self,
416        code: String,
417        anchor_range: Range<Anchor>,
418        next_cell: Option<Anchor>,
419        move_down: bool,
420        cx: &mut ViewContext<Self>,
421    ) {
422        let Some(editor) = self.editor.upgrade() else {
423            return;
424        };
425
426        if code.is_empty() {
427            return;
428        }
429
430        let execute_request = ExecuteRequest {
431            code,
432            ..ExecuteRequest::default()
433        };
434
435        let message: JupyterMessage = execute_request.into();
436
437        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
438
439        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
440
441        self.blocks.retain(|_key, block| {
442            if anchor_range.overlaps(&block.code_range, &buffer) {
443                blocks_to_remove.insert(block.block_id);
444                false
445            } else {
446                true
447            }
448        });
449
450        self.editor
451            .update(cx, |editor, cx| {
452                editor.remove_blocks(blocks_to_remove, None, cx);
453            })
454            .ok();
455
456        let status = match &self.kernel {
457            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
458            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
459            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
460            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
461            Kernel::Shutdown => ExecutionStatus::Shutdown,
462        };
463
464        let parent_message_id = message.header.msg_id.clone();
465        let session_view = cx.view().downgrade();
466        let weak_editor = self.editor.clone();
467
468        let on_close: CloseBlockFn =
469            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
470                if let Some(session) = session_view.upgrade() {
471                    session.update(cx, |session, cx| {
472                        session.blocks.remove(&parent_message_id);
473                        cx.notify();
474                    });
475                }
476
477                if let Some(editor) = weak_editor.upgrade() {
478                    editor.update(cx, |editor, cx| {
479                        let mut block_ids = HashSet::default();
480                        block_ids.insert(block_id);
481                        editor.remove_blocks(block_ids, None, cx);
482                    });
483                }
484            });
485
486        let Ok(editor_block) =
487            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
488        else {
489            return;
490        };
491
492        let new_cursor_pos = if let Some(next_cursor) = next_cell {
493            next_cursor
494        } else {
495            editor_block.invalidation_anchor
496        };
497
498        self.blocks
499            .insert(message.header.msg_id.clone(), editor_block);
500
501        match &self.kernel {
502            Kernel::RunningKernel(_) => {
503                self.send(message, cx).ok();
504            }
505            Kernel::StartingKernel(task) => {
506                // Queue up the execution as a task to run after the kernel starts
507                let task = task.clone();
508                let message = message.clone();
509
510                cx.spawn(|this, mut cx| async move {
511                    task.await;
512                    this.update(&mut cx, |session, cx| {
513                        session.send(message, cx).ok();
514                    })
515                    .ok();
516                })
517                .detach();
518            }
519            _ => {}
520        }
521
522        if move_down {
523            editor.update(cx, move |editor, cx| {
524                editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
525                    selections.select_ranges([new_cursor_pos..new_cursor_pos]);
526                });
527            });
528        }
529    }
530
531    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
532        let parent_message_id = match message.parent_header.as_ref() {
533            Some(header) => &header.msg_id,
534            None => return,
535        };
536
537        match &message.content {
538            JupyterMessageContent::Status(status) => {
539                self.kernel.set_execution_state(&status.execution_state);
540
541                self.telemetry.report_repl_event(
542                    self.kernel_specification.kernelspec.language.clone(),
543                    KernelStatus::from(&self.kernel).to_string(),
544                    cx.entity_id().to_string(),
545                );
546
547                cx.notify();
548            }
549            JupyterMessageContent::KernelInfoReply(reply) => {
550                self.kernel.set_kernel_info(&reply);
551                cx.notify();
552            }
553            JupyterMessageContent::UpdateDisplayData(update) => {
554                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
555                    display_id
556                } else {
557                    return;
558                };
559
560                self.blocks.iter_mut().for_each(|(_, block)| {
561                    block.execution_view.update(cx, |execution_view, cx| {
562                        execution_view.update_display_data(&update.data, &display_id, cx);
563                    });
564                });
565                return;
566            }
567            _ => {}
568        }
569
570        if let Some(block) = self.blocks.get_mut(parent_message_id) {
571            block.handle_message(&message, cx);
572            return;
573        }
574    }
575
576    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
577        match &mut self.kernel {
578            Kernel::RunningKernel(_kernel) => {
579                self.send(InterruptRequest {}.into(), cx).ok();
580            }
581            Kernel::StartingKernel(_task) => {
582                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
583            }
584            _ => {}
585        }
586    }
587
588    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
589        if let Kernel::Shutdown = kernel {
590            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
591        }
592
593        let kernel_status = KernelStatus::from(&kernel).to_string();
594        let kernel_language = self.kernel_specification.kernelspec.language.clone();
595
596        self.telemetry.report_repl_event(
597            kernel_language,
598            kernel_status,
599            cx.entity_id().to_string(),
600        );
601
602        self.kernel = kernel;
603    }
604
605    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
606        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
607
608        match kernel {
609            Kernel::RunningKernel(mut kernel) => {
610                let mut request_tx = kernel.request_tx.clone();
611
612                cx.spawn(|this, mut cx| async move {
613                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
614                    request_tx.try_send(message).ok();
615
616                    // Give the kernel a bit of time to clean up
617                    cx.background_executor().timer(Duration::from_secs(3)).await;
618
619                    kernel.process.kill().ok();
620
621                    this.update(&mut cx, |session, cx| {
622                        session.clear_outputs(cx);
623                        session.kernel(Kernel::Shutdown, cx);
624                        cx.notify();
625                    })
626                    .ok();
627                })
628                .detach();
629            }
630            Kernel::StartingKernel(_kernel) => {
631                self.kernel = Kernel::Shutdown;
632            }
633            _ => {
634                self.kernel = Kernel::Shutdown;
635            }
636        }
637        cx.notify();
638    }
639}
640
641pub enum SessionEvent {
642    Shutdown(WeakView<Editor>),
643}
644
645impl EventEmitter<SessionEvent> for Session {}
646
647impl Render for Session {
648    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
649        let (status_text, interrupt_button) = match &self.kernel {
650            Kernel::RunningKernel(kernel) => (
651                kernel
652                    .kernel_info
653                    .as_ref()
654                    .map(|info| info.language_info.name.clone()),
655                Some(
656                    Button::new("interrupt", "Interrupt")
657                        .style(ButtonStyle::Subtle)
658                        .on_click(cx.listener(move |session, _, cx| {
659                            session.interrupt(cx);
660                        })),
661                ),
662            ),
663            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
664            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
665            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
666            Kernel::Shutdown => (Some("Shutdown".into()), None),
667        };
668
669        KernelListItem::new(self.kernel_specification.clone())
670            .status_color(match &self.kernel {
671                Kernel::RunningKernel(kernel) => match kernel.execution_state {
672                    ExecutionState::Idle => Color::Success,
673                    ExecutionState::Busy => Color::Modified,
674                },
675                Kernel::StartingKernel(_) => Color::Modified,
676                Kernel::ErroredLaunch(_) => Color::Error,
677                Kernel::ShuttingDown => Color::Modified,
678                Kernel::Shutdown => Color::Disabled,
679            })
680            .child(Label::new(self.kernel_specification.name.clone()))
681            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
682            .button(
683                Button::new("shutdown", "Shutdown")
684                    .style(ButtonStyle::Subtle)
685                    .disabled(self.kernel.is_shutting_down())
686                    .on_click(cx.listener(move |session, _, cx| {
687                        session.shutdown(cx);
688                    })),
689            )
690            .buttons(interrupt_button)
691    }
692}