session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 30use theme::ActiveTheme;
 31use ui::{prelude::*, IconButtonShape, Tooltip};
 32
 33pub struct Session {
 34    fs: Arc<dyn Fs>,
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Option<Task<()>>,
 39    process_status_task: Option<Task<()>>,
 40    pub kernel_specification: KernelSpecification,
 41    telemetry: Arc<Telemetry>,
 42    _buffer_subscription: Subscription,
 43}
 44
 45struct EditorBlock {
 46    code_range: Range<Anchor>,
 47    invalidation_anchor: Anchor,
 48    block_id: CustomBlockId,
 49    execution_view: View<ExecutionView>,
 50}
 51
 52type CloseBlockFn =
 53    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 54
 55impl EditorBlock {
 56    fn new(
 57        editor: WeakView<Editor>,
 58        code_range: Range<Anchor>,
 59        status: ExecutionStatus,
 60        on_close: CloseBlockFn,
 61        cx: &mut ViewContext<Session>,
 62    ) -> anyhow::Result<Self> {
 63        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 64
 65        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 66            let buffer = editor.buffer().clone();
 67            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 68            let end_point = code_range.end.to_point(&buffer_snapshot);
 69            let next_row_start = end_point + Point::new(1, 0);
 70            if next_row_start > buffer_snapshot.max_point() {
 71                buffer.update(cx, |buffer, cx| {
 72                    buffer.edit(
 73                        [(
 74                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 75                            "\n",
 76                        )],
 77                        None,
 78                        cx,
 79                    )
 80                });
 81            }
 82
 83            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 84            let block = BlockProperties {
 85                position: code_range.end,
 86                // Take up at least one height for status, allow the editor to determine the real height based on the content from render
 87                height: 1,
 88                style: BlockStyle::Sticky,
 89                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 90                disposition: BlockDisposition::Below,
 91                priority: 0,
 92            };
 93
 94            let block_id = editor.insert_blocks([block], None, cx)[0];
 95            (block_id, invalidation_anchor)
 96        })?;
 97
 98        anyhow::Ok(Self {
 99            code_range,
100            invalidation_anchor,
101            block_id,
102            execution_view,
103        })
104    }
105
106    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
107        self.execution_view.update(cx, |execution_view, cx| {
108            execution_view.push_message(&message.content, cx);
109        });
110    }
111
112    fn create_output_area_renderer(
113        execution_view: View<ExecutionView>,
114        on_close: CloseBlockFn,
115    ) -> RenderBlock {
116        let render = move |cx: &mut BlockContext| {
117            let execution_view = execution_view.clone();
118            let text_style = crate::outputs::plain::text_style(cx);
119
120            let gutter = cx.gutter_dimensions;
121
122            let block_id = cx.block_id;
123            let on_close = on_close.clone();
124
125            let rem_size = cx.rem_size();
126
127            let text_line_height = text_style.line_height_in_pixels(rem_size);
128
129            let close_button = h_flex()
130                .flex_none()
131                .items_center()
132                .justify_center()
133                .absolute()
134                .top(text_line_height / 2.)
135                .right(
136                    // 2px is a magic number to nudge the button just a bit closer to
137                    // the line number start
138                    gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
139                )
140                .w(text_line_height)
141                .h(text_line_height)
142                .child(
143                    IconButton::new(
144                        ("close_output_area", EntityId::from(cx.block_id)),
145                        IconName::Close,
146                    )
147                    .icon_size(IconSize::Small)
148                    .icon_color(Color::Muted)
149                    .size(ButtonSize::Compact)
150                    .shape(IconButtonShape::Square)
151                    .tooltip(|cx| Tooltip::text("Close output area", cx))
152                    .on_click(move |_, cx| {
153                        if let BlockId::Custom(block_id) = block_id {
154                            (on_close)(block_id, cx)
155                        }
156                    }),
157                );
158
159            div()
160                .flex()
161                .items_start()
162                .min_h(text_line_height)
163                .w_full()
164                .border_y_1()
165                .border_color(cx.theme().colors().border)
166                .bg(cx.theme().colors().background)
167                .child(
168                    div()
169                        .relative()
170                        .w(gutter.full_width())
171                        .h(text_line_height * 2)
172                        .child(close_button),
173                )
174                .child(
175                    div()
176                        .flex_1()
177                        .size_full()
178                        .py(text_line_height / 2.)
179                        .mr(gutter.width)
180                        .child(execution_view),
181                )
182                .into_any_element()
183        };
184
185        Box::new(render)
186    }
187}
188
189impl Session {
190    pub fn new(
191        editor: WeakView<Editor>,
192        fs: Arc<dyn Fs>,
193        telemetry: Arc<Telemetry>,
194        kernel_specification: KernelSpecification,
195        cx: &mut ViewContext<Self>,
196    ) -> Self {
197        let subscription = match editor.upgrade() {
198            Some(editor) => {
199                let buffer = editor.read(cx).buffer().clone();
200                cx.subscribe(&buffer, Self::on_buffer_event)
201            }
202            None => Subscription::new(|| {}),
203        };
204
205        let mut session = Self {
206            fs,
207            editor,
208            kernel: Kernel::StartingKernel(Task::ready(()).shared()),
209            messaging_task: None,
210            process_status_task: None,
211            blocks: HashMap::default(),
212            kernel_specification,
213            _buffer_subscription: subscription,
214            telemetry,
215        };
216
217        session.start_kernel(cx);
218        session
219    }
220
221    fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
222        let kernel_language = self.kernel_specification.kernelspec.language.clone();
223        let entity_id = self.editor.entity_id();
224        let working_directory = self
225            .editor
226            .upgrade()
227            .and_then(|editor| editor.read(cx).working_directory(cx))
228            .unwrap_or_else(temp_dir);
229
230        self.telemetry.report_repl_event(
231            kernel_language.clone(),
232            KernelStatus::Starting.to_string(),
233            cx.entity_id().to_string(),
234        );
235
236        let kernel = RunningKernel::new(
237            self.kernel_specification.clone(),
238            entity_id,
239            working_directory,
240            self.fs.clone(),
241            cx,
242        );
243
244        let pending_kernel = cx
245            .spawn(|this, mut cx| async move {
246                let kernel = kernel.await;
247
248                match kernel {
249                    Ok((mut kernel, mut messages_rx)) => {
250                        this.update(&mut cx, |session, cx| {
251                            let stderr = kernel.process.stderr.take();
252
253                            cx.spawn(|_session, mut _cx| async move {
254                                if let None = stderr {
255                                    return;
256                                }
257                                let reader = BufReader::new(stderr.unwrap());
258                                let mut lines = reader.lines();
259                                while let Some(Ok(line)) = lines.next().await {
260                                    // todo!(): Log stdout and stderr to something the session can show
261                                    log::error!("kernel: {}", line);
262                                }
263                            })
264                            .detach();
265
266                            let stdout = kernel.process.stdout.take();
267
268                            cx.spawn(|_session, mut _cx| async move {
269                                if let None = stdout {
270                                    return;
271                                }
272                                let reader = BufReader::new(stdout.unwrap());
273                                let mut lines = reader.lines();
274                                while let Some(Ok(line)) = lines.next().await {
275                                    log::info!("kernel: {}", line);
276                                }
277                            })
278                            .detach();
279
280                            let status = kernel.process.status();
281                            session.kernel(Kernel::RunningKernel(kernel), cx);
282
283                            let process_status_task = cx.spawn(|session, mut cx| async move {
284                                let error_message = match status.await {
285                                    Ok(status) => {
286                                        if status.success() {
287                                            log::info!("kernel process exited successfully");
288                                            return;
289                                        }
290
291                                        format!("kernel process exited with status: {:?}", status)
292                                    }
293                                    Err(err) => {
294                                        format!("kernel process exited with error: {:?}", err)
295                                    }
296                                };
297
298                                log::error!("{}", error_message);
299
300                                session
301                                    .update(&mut cx, |session, cx| {
302                                        session.kernel(
303                                            Kernel::ErroredLaunch(error_message.clone()),
304                                            cx,
305                                        );
306
307                                        session.blocks.values().for_each(|block| {
308                                            block.execution_view.update(
309                                                cx,
310                                                |execution_view, cx| {
311                                                    match execution_view.status {
312                                                        ExecutionStatus::Finished => {
313                                                            // Do nothing when the output was good
314                                                        }
315                                                        _ => {
316                                                            // All other cases, set the status to errored
317                                                            execution_view.status =
318                                                                ExecutionStatus::KernelErrored(
319                                                                    error_message.clone(),
320                                                                )
321                                                        }
322                                                    }
323                                                    cx.notify();
324                                                },
325                                            );
326                                        });
327
328                                        cx.notify();
329                                    })
330                                    .ok();
331                            });
332
333                            session.process_status_task = Some(process_status_task);
334
335                            session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
336                                while let Some(message) = messages_rx.next().await {
337                                    session
338                                        .update(&mut cx, |session, cx| {
339                                            session.route(&message, cx);
340                                        })
341                                        .ok();
342                                }
343                            }));
344
345                            // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
346                            // cx.spawn(|this, mut cx| async move {
347                            //     cx.background_executor()
348                            //         .timer(Duration::from_millis(120))
349                            //         .await;
350                            //     this.update(&mut cx, |this, cx| {
351                            //         this.send(KernelInfoRequest {}.into(), cx).ok();
352                            //     })
353                            //     .ok();
354                            // })
355                            // .detach();
356                        })
357                        .ok();
358                    }
359                    Err(err) => {
360                        this.update(&mut cx, |session, cx| {
361                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
362                        })
363                        .ok();
364                    }
365                }
366            })
367            .shared();
368
369        self.kernel(Kernel::StartingKernel(pending_kernel), cx);
370        cx.notify();
371    }
372
373    fn on_buffer_event(
374        &mut self,
375        buffer: Model<MultiBuffer>,
376        event: &multi_buffer::Event,
377        cx: &mut ViewContext<Self>,
378    ) {
379        if let multi_buffer::Event::Edited { .. } = event {
380            let snapshot = buffer.read(cx).snapshot(cx);
381
382            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
383
384            self.blocks.retain(|_id, block| {
385                if block.invalidation_anchor.is_valid(&snapshot) {
386                    true
387                } else {
388                    blocks_to_remove.insert(block.block_id);
389                    false
390                }
391            });
392
393            if !blocks_to_remove.is_empty() {
394                self.editor
395                    .update(cx, |editor, cx| {
396                        editor.remove_blocks(blocks_to_remove, None, cx);
397                    })
398                    .ok();
399                cx.notify();
400            }
401        }
402    }
403
404    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
405        match &mut self.kernel {
406            Kernel::RunningKernel(kernel) => {
407                kernel.request_tx.try_send(message).ok();
408            }
409            _ => {}
410        }
411
412        anyhow::Ok(())
413    }
414
415    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
416        let blocks_to_remove: HashSet<CustomBlockId> =
417            self.blocks.values().map(|block| block.block_id).collect();
418
419        self.editor
420            .update(cx, |editor, cx| {
421                editor.remove_blocks(blocks_to_remove, None, cx);
422            })
423            .ok();
424
425        self.blocks.clear();
426    }
427
428    pub fn execute(
429        &mut self,
430        code: String,
431        anchor_range: Range<Anchor>,
432        next_cell: Option<Anchor>,
433        move_down: bool,
434        cx: &mut ViewContext<Self>,
435    ) {
436        let Some(editor) = self.editor.upgrade() else {
437            return;
438        };
439
440        if code.is_empty() {
441            return;
442        }
443
444        let execute_request = ExecuteRequest {
445            code,
446            ..ExecuteRequest::default()
447        };
448
449        let message: JupyterMessage = execute_request.into();
450
451        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
452
453        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
454
455        self.blocks.retain(|_key, block| {
456            if anchor_range.overlaps(&block.code_range, &buffer) {
457                blocks_to_remove.insert(block.block_id);
458                false
459            } else {
460                true
461            }
462        });
463
464        self.editor
465            .update(cx, |editor, cx| {
466                editor.remove_blocks(blocks_to_remove, None, cx);
467            })
468            .ok();
469
470        let status = match &self.kernel {
471            Kernel::Restarting => ExecutionStatus::Restarting,
472            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
473            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
474            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
475            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
476            Kernel::Shutdown => ExecutionStatus::Shutdown,
477        };
478
479        let parent_message_id = message.header.msg_id.clone();
480        let session_view = cx.view().downgrade();
481        let weak_editor = self.editor.clone();
482
483        let on_close: CloseBlockFn =
484            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
485                if let Some(session) = session_view.upgrade() {
486                    session.update(cx, |session, cx| {
487                        session.blocks.remove(&parent_message_id);
488                        cx.notify();
489                    });
490                }
491
492                if let Some(editor) = weak_editor.upgrade() {
493                    editor.update(cx, |editor, cx| {
494                        let mut block_ids = HashSet::default();
495                        block_ids.insert(block_id);
496                        editor.remove_blocks(block_ids, None, cx);
497                    });
498                }
499            });
500
501        let Ok(editor_block) =
502            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
503        else {
504            return;
505        };
506
507        let new_cursor_pos = if let Some(next_cursor) = next_cell {
508            next_cursor
509        } else {
510            editor_block.invalidation_anchor
511        };
512
513        self.blocks
514            .insert(message.header.msg_id.clone(), editor_block);
515
516        match &self.kernel {
517            Kernel::RunningKernel(_) => {
518                self.send(message, cx).ok();
519            }
520            Kernel::StartingKernel(task) => {
521                // Queue up the execution as a task to run after the kernel starts
522                let task = task.clone();
523                let message = message.clone();
524
525                cx.spawn(|this, mut cx| async move {
526                    task.await;
527                    this.update(&mut cx, |session, cx| {
528                        session.send(message, cx).ok();
529                    })
530                    .ok();
531                })
532                .detach();
533            }
534            _ => {}
535        }
536
537        if move_down {
538            editor.update(cx, move |editor, cx| {
539                editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
540                    selections.select_ranges([new_cursor_pos..new_cursor_pos]);
541                });
542            });
543        }
544    }
545
546    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
547        let parent_message_id = match message.parent_header.as_ref() {
548            Some(header) => &header.msg_id,
549            None => return,
550        };
551
552        match &message.content {
553            JupyterMessageContent::Status(status) => {
554                self.kernel.set_execution_state(&status.execution_state);
555
556                self.telemetry.report_repl_event(
557                    self.kernel_specification.kernelspec.language.clone(),
558                    KernelStatus::from(&self.kernel).to_string(),
559                    cx.entity_id().to_string(),
560                );
561
562                cx.notify();
563            }
564            JupyterMessageContent::KernelInfoReply(reply) => {
565                self.kernel.set_kernel_info(&reply);
566                cx.notify();
567            }
568            JupyterMessageContent::UpdateDisplayData(update) => {
569                let display_id = if let Some(display_id) = update.transient.display_id.clone() {
570                    display_id
571                } else {
572                    return;
573                };
574
575                self.blocks.iter_mut().for_each(|(_, block)| {
576                    block.execution_view.update(cx, |execution_view, cx| {
577                        execution_view.update_display_data(&update.data, &display_id, cx);
578                    });
579                });
580                return;
581            }
582            _ => {}
583        }
584
585        if let Some(block) = self.blocks.get_mut(parent_message_id) {
586            block.handle_message(&message, cx);
587            return;
588        }
589    }
590
591    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
592        match &mut self.kernel {
593            Kernel::RunningKernel(_kernel) => {
594                self.send(InterruptRequest {}.into(), cx).ok();
595            }
596            Kernel::StartingKernel(_task) => {
597                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
598            }
599            _ => {}
600        }
601    }
602
603    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
604        if let Kernel::Shutdown = kernel {
605            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
606        }
607
608        let kernel_status = KernelStatus::from(&kernel).to_string();
609        let kernel_language = self.kernel_specification.kernelspec.language.clone();
610
611        self.telemetry.report_repl_event(
612            kernel_language,
613            kernel_status,
614            cx.entity_id().to_string(),
615        );
616
617        self.kernel = kernel;
618    }
619
620    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
621        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
622
623        match kernel {
624            Kernel::RunningKernel(mut kernel) => {
625                let mut request_tx = kernel.request_tx.clone();
626
627                cx.spawn(|this, mut cx| async move {
628                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
629                    request_tx.try_send(message).ok();
630
631                    // Give the kernel a bit of time to clean up
632                    cx.background_executor().timer(Duration::from_secs(3)).await;
633
634                    this.update(&mut cx, |session, _cx| {
635                        session.messaging_task.take();
636                        session.process_status_task.take();
637                    })
638                    .ok();
639
640                    kernel.process.kill().ok();
641
642                    this.update(&mut cx, |session, cx| {
643                        session.clear_outputs(cx);
644                        session.kernel(Kernel::Shutdown, cx);
645                        cx.notify();
646                    })
647                    .ok();
648                })
649                .detach();
650            }
651            _ => {
652                self.messaging_task.take();
653                self.process_status_task.take();
654                self.kernel(Kernel::Shutdown, cx);
655            }
656        }
657        cx.notify();
658    }
659
660    pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
661        let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
662
663        match kernel {
664            Kernel::Restarting => {
665                // Do nothing if already restarting
666            }
667            Kernel::RunningKernel(mut kernel) => {
668                let mut request_tx = kernel.request_tx.clone();
669
670                cx.spawn(|this, mut cx| async move {
671                    // Send shutdown request with restart flag
672                    log::debug!("restarting kernel");
673                    let message: JupyterMessage = ShutdownRequest { restart: true }.into();
674                    request_tx.try_send(message).ok();
675
676                    this.update(&mut cx, |session, _cx| {
677                        session.messaging_task.take();
678                        session.process_status_task.take();
679                    })
680                    .ok();
681
682                    // Wait for kernel to shutdown
683                    cx.background_executor().timer(Duration::from_secs(1)).await;
684
685                    // Force kill the kernel if it hasn't shut down
686                    kernel.process.kill().ok();
687
688                    // Start a new kernel
689                    this.update(&mut cx, |session, cx| {
690                        // todo!(): Differentiate between restart and restart+clear-outputs
691                        session.clear_outputs(cx);
692                        session.start_kernel(cx);
693                    })
694                    .ok();
695                })
696                .detach();
697            }
698            _ => {
699                // If it's not already running, we can just clean up and start a new kernel
700                self.messaging_task.take();
701                self.process_status_task.take();
702                self.clear_outputs(cx);
703                self.start_kernel(cx);
704            }
705        }
706        cx.notify();
707    }
708}
709
710pub enum SessionEvent {
711    Shutdown(WeakView<Editor>),
712}
713
714impl EventEmitter<SessionEvent> for Session {}
715
716impl Render for Session {
717    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
718        let (status_text, interrupt_button) = match &self.kernel {
719            Kernel::RunningKernel(kernel) => (
720                kernel
721                    .kernel_info
722                    .as_ref()
723                    .map(|info| info.language_info.name.clone()),
724                Some(
725                    Button::new("interrupt", "Interrupt")
726                        .style(ButtonStyle::Subtle)
727                        .on_click(cx.listener(move |session, _, cx| {
728                            session.interrupt(cx);
729                        })),
730                ),
731            ),
732            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
733            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
734            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
735            Kernel::Shutdown => (Some("Shutdown".into()), None),
736            Kernel::Restarting => (Some("Restarting".into()), None),
737        };
738
739        KernelListItem::new(self.kernel_specification.clone())
740            .status_color(match &self.kernel {
741                Kernel::RunningKernel(kernel) => match kernel.execution_state {
742                    ExecutionState::Idle => Color::Success,
743                    ExecutionState::Busy => Color::Modified,
744                },
745                Kernel::StartingKernel(_) => Color::Modified,
746                Kernel::ErroredLaunch(_) => Color::Error,
747                Kernel::ShuttingDown => Color::Modified,
748                Kernel::Shutdown => Color::Disabled,
749                Kernel::Restarting => Color::Modified,
750            })
751            .child(Label::new(self.kernel_specification.name.clone()))
752            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
753            .button(
754                Button::new("shutdown", "Shutdown")
755                    .style(ButtonStyle::Subtle)
756                    .disabled(self.kernel.is_shutting_down())
757                    .on_click(cx.listener(move |session, _, cx| {
758                        session.shutdown(cx);
759                    })),
760            )
761            .buttons(interrupt_button)
762    }
763}